日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Routine Load

更新時(shí)間:

Routine Load支持提交一個(gè)常駐的導(dǎo)入作業(yè),不斷地從指定的數(shù)據(jù)源讀取數(shù)據(jù),將數(shù)據(jù)持續(xù)地導(dǎo)入至云數(shù)據(jù)庫(kù) SelectDB 版中。本文介紹如何通過(guò)Routine Load將Kafka中的數(shù)據(jù)導(dǎo)入至云數(shù)據(jù)庫(kù) SelectDB 版實(shí)例。

前提條件

  • 支持的數(shù)據(jù)源:目前僅支持Kafka數(shù)據(jù)源,可通過(guò)無(wú)認(rèn)證方式或PLAIN/SSL/Kerberos等認(rèn)證方式連接Kafka。

  • 支持的消息格式:CSVJSON格式。CSV的格式,每一個(gè)Message為一行,且行尾不包含換行符。

注意事項(xiàng)

默認(rèn)支持Kafka 0.10.0.0(含)以上版本。如果使用Kafka 0.10.0.0以下版本(0.9.0,0.8.2,0.8.1,0.8.0),需要Kafka兼容舊版本,具體操作有以下兩種方式:

  • 將BE的配置kafka_broker_version_fallback的值設(shè)置為要兼容的舊版本。

  • 創(chuàng)建Routine Load的時(shí)候直接設(shè)置property.broker.version.fallback的值為要兼容的舊版本。

說(shuō)明

使用兼容舊版本的代價(jià)在于,Routine Load的部分新特性可能無(wú)法使用,例如根據(jù)時(shí)間設(shè)置Kafka分區(qū)的offset。

創(chuàng)建導(dǎo)入作業(yè)

使用Routine Load功能時(shí),首先需創(chuàng)建一個(gè)Routine Load作業(yè)。該作業(yè)將通過(guò)例行調(diào)度持續(xù)發(fā)送任務(wù),每個(gè)任務(wù)會(huì)消耗一定數(shù)量的Kafka消息。

語(yǔ)法

CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]

參數(shù)說(shuō)明

參數(shù)名稱(chēng)

參數(shù)說(shuō)明

[db.]job_name

導(dǎo)入作業(yè)的名稱(chēng)。在同一個(gè)Database內(nèi),相同名稱(chēng)的job只能運(yùn)行一個(gè)。

tbl_name

指定導(dǎo)入的表的名稱(chēng)。

merge_type

指定數(shù)據(jù)合并類(lèi)型。默認(rèn)為APPEND,表示導(dǎo)入的數(shù)據(jù)都是普通的追加寫(xiě)操作。MERGEDELETE類(lèi)型僅適用于Unique Key模型表。其中MERGE類(lèi)型需要配合[DELETE ON]語(yǔ)句使用,以標(biāo)注Delete Flag列。而DELETE類(lèi)型則表示導(dǎo)入的所有數(shù)據(jù)皆為刪除數(shù)據(jù)。

load_properties

指定導(dǎo)入數(shù)據(jù)處理相關(guān)參數(shù)。詳細(xì)參數(shù)說(shuō)明,請(qǐng)參見(jiàn)load_properties

job_properties

指定導(dǎo)入作業(yè)相關(guān)參數(shù)。詳細(xì)參數(shù)說(shuō)明,請(qǐng)參見(jiàn)job_properties參數(shù)說(shuō)明

data_source_properties

指定數(shù)據(jù)源的類(lèi)型。詳細(xì)參數(shù)說(shuō)明,請(qǐng)參見(jiàn)data_source_properties參數(shù)說(shuō)明

load_properties參數(shù)說(shuō)明

[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]

參數(shù)名稱(chēng)

示例值

參數(shù)說(shuō)明

column_separator

COLUMNS TERMINATED BY ","

指定列分隔符,默認(rèn)為\t

columns_mapping

(k1,k2,tmpk1,k3=tmpk1+1)

指定文件列和表中列的映射關(guān)系,以及各種列轉(zhuǎn)換等。詳細(xì)說(shuō)明請(qǐng)參見(jiàn)數(shù)據(jù)轉(zhuǎn)化

preceding_filter

無(wú)

指定過(guò)濾原始數(shù)據(jù)條件。詳細(xì)說(shuō)明請(qǐng)參見(jiàn)數(shù)據(jù)轉(zhuǎn)化

where_predicates

WHERE k1>100 and k2=1000

指定條件對(duì)導(dǎo)入的數(shù)據(jù)進(jìn)行過(guò)濾。詳細(xì)說(shuō)明請(qǐng)參見(jiàn)數(shù)據(jù)轉(zhuǎn)化

partitions

PARTITION(p1,p2,p3)

指定導(dǎo)入目的表的哪些Partition中。如果不指定,則會(huì)自動(dòng)導(dǎo)入到對(duì)應(yīng)的Partition中。

DELETE ON

DELETE ON v3>100

用于指定導(dǎo)入數(shù)據(jù)中表示Delete Flag的列和計(jì)算關(guān)系。

說(shuō)明

需配合MEREGE導(dǎo)入模式一起使用,僅適用于Unique Key模型的表。

ORDER BY

無(wú)

用于指定導(dǎo)入數(shù)據(jù)中表示Sequence Col的列。其功能為導(dǎo)入數(shù)據(jù)時(shí)保證數(shù)據(jù)順序。

說(shuō)明

僅適用于對(duì)Unique Key模型的表。

job_properties參數(shù)說(shuō)明

PROPERTIES (
    "key1" = "val1",
    "key2" = "val2"
)
說(shuō)明

這三個(gè)參數(shù)max_batch_interval、max_batch_rows和max_batch_size用于控制子任務(wù)的執(zhí)行時(shí)間和處理量。當(dāng)任意一個(gè)參數(shù)達(dá)到設(shè)定閾值時(shí),任務(wù)將終止。

參數(shù)名稱(chēng)

示例值

參數(shù)說(shuō)明

desired_concurrent_number

"desired_concurrent_number" = "3"

指定期望并發(fā)度。大于0,默認(rèn)為3。一個(gè)例行導(dǎo)入作業(yè)會(huì)被分成多個(gè)子任務(wù)執(zhí)行。這個(gè)參數(shù)用于指定一個(gè)作業(yè)最多有多少任務(wù)可以同時(shí)執(zhí)行。

說(shuō)明
  1. 這個(gè)并發(fā)度并不是實(shí)際的并發(fā)度,實(shí)際的并發(fā)度,會(huì)通過(guò)集群的節(jié)點(diǎn)數(shù)、負(fù)載情況,以及數(shù)據(jù)源的情況綜合判斷。

  2. 適當(dāng)提高并發(fā)可利用分布式集群加速,但過(guò)高會(huì)導(dǎo)致大量小文件寫(xiě)入,建議取值為集群核數(shù) / 16

max_batch_interval

"max_batch_interval" = "20"

指定每個(gè)子任務(wù)最大執(zhí)行時(shí)間,單位是秒,默認(rèn)為10,取值范圍為5~60秒。

max_batch_rows

"max_batch_rows" = "300000"

指定每個(gè)子任務(wù)最多讀取的行數(shù)。默認(rèn)是200000,取值范圍大于等于200000。

max_batch_size

"max_batch_size" = "209715200"

指定每個(gè)子任務(wù)最多讀取的字節(jié)數(shù)。單位是字節(jié),默認(rèn)為104857600,即100 MB。取值范圍為100 MB~1 GB。

max_error_number

"max_error_number"="3"

指定采樣窗口內(nèi),允許的最大錯(cuò)誤行數(shù)。默認(rèn)為0,即不允許有錯(cuò)誤行。取值范圍大于等于0。

采樣窗口為max_batch_rows*10。即如果在采樣窗口內(nèi),錯(cuò)誤行數(shù)大于,則會(huì)導(dǎo)致例行作業(yè)被暫停,需要人工介入檢查數(shù)據(jù)質(zhì)量問(wèn)題。

說(shuō)明

where條件過(guò)濾掉的行不算錯(cuò)誤行。

strict_mode

"strict_mode"="true"

指定是否開(kāi)啟嚴(yán)格模式,默認(rèn)為false。開(kāi)啟后,非空原始數(shù)據(jù)的列類(lèi)型變換如果結(jié)果為NULL,則會(huì)被過(guò)濾。指定方式為嚴(yán)格模式時(shí),即對(duì)于導(dǎo)入過(guò)程中的列類(lèi)型轉(zhuǎn)換進(jìn)行嚴(yán)格過(guò)濾。嚴(yán)格過(guò)濾的策略如下:

  • 對(duì)于列類(lèi)型轉(zhuǎn)換來(lái)說(shuō),如果strict mode為true,則錯(cuò)誤的數(shù)據(jù)將被過(guò)濾。錯(cuò)誤數(shù)據(jù)即原始數(shù)據(jù)并不為空值,在參與列類(lèi)型轉(zhuǎn)換后結(jié)果為空值的這一類(lèi)數(shù)據(jù)。

  • 對(duì)于導(dǎo)入的某列由函數(shù)變換生成時(shí),strict mode對(duì)其不產(chǎn)生影響。

  • 對(duì)于導(dǎo)入的某列類(lèi)型包含范圍限制的,如果原始數(shù)據(jù)能正常通過(guò)類(lèi)型轉(zhuǎn)換,但無(wú)法通過(guò)范圍限制的,strict mode對(duì)其也不產(chǎn)生影響。例如:如果類(lèi)型是decimal(1,0),原始數(shù)據(jù)為10,則屬于可以通過(guò)類(lèi)型轉(zhuǎn)換但不在列聲明的范圍內(nèi)。這種數(shù)據(jù)strict對(duì)其不產(chǎn)生影響。

timezone

"timezone" = "Africa/Abidjan"

指定導(dǎo)入作業(yè)所使用的時(shí)區(qū)。默認(rèn)為使用Session的時(shí)區(qū)作為參數(shù)。

說(shuō)明

該參數(shù)會(huì)影響所有導(dǎo)入涉及的和時(shí)區(qū)有關(guān)的函數(shù)結(jié)果。

format

"format" = "json"

指定導(dǎo)入數(shù)據(jù)格式,默認(rèn)為CSV,支持JSON格式。

jsonpaths

-H "jsonpaths:[\"$.k2\",\"$.k1\"]"

當(dāng)導(dǎo)入數(shù)據(jù)格式為JSON時(shí),可通過(guò)jsonpaths指定抽取JSON數(shù)據(jù)中的字段。

strip_outer_array

-H "strip_outer_array:true"

當(dāng)導(dǎo)入數(shù)據(jù)格式為JSON時(shí),strip_outer_arraytrue表示JSON數(shù)據(jù)以數(shù)組的形式展現(xiàn),數(shù)據(jù)中的每一個(gè)元素將被視為一行數(shù)據(jù)。默認(rèn)為false

json_root

-H "json_root:$.RECORDS"

當(dāng)導(dǎo)入數(shù)據(jù)格式為JSON時(shí),可以通過(guò)json_root指定JSON數(shù)據(jù)的根節(jié)點(diǎn)。SelectDB將通過(guò)json_root抽取根節(jié)點(diǎn)的元素進(jìn)行解析。默認(rèn)為空。

send_batch_parallelism

無(wú)

指定設(shè)置發(fā)送批處理數(shù)據(jù)的并行度,如果并行度的值超過(guò)BE配置中的max_send_batch_parallelism_per_job,那么作為協(xié)調(diào)點(diǎn)的BE將使用max_send_batch_parallelism_per_job的值。

load_to_single_tablet

無(wú)

指定是否只導(dǎo)入數(shù)據(jù)到對(duì)應(yīng)分區(qū)的一個(gè)tablet,默認(rèn)值為false。該參數(shù)只允許向?qū)в衦andom分區(qū)的Duplicate表導(dǎo)入數(shù)據(jù)的時(shí)設(shè)置。

嚴(yán)格模式(strict mode)與原始數(shù)據(jù)(source data)的導(dǎo)入關(guān)系

例如列類(lèi)型為T(mén)inyInt。當(dāng)表中的列允許導(dǎo)入空值時(shí)導(dǎo)入關(guān)系如下。

source data

source data example

string to int

strict_mode

result

空值

\N

N/A

true or false

NULL

not null

aaa or 2000

NULL

true

invalid data(filtered)

not null

aaa

NULL

false

NULL

not null

1

1

true or false

correct data

例如列類(lèi)型為Decimal(1,0)。當(dāng)表中的列允許導(dǎo)入空值時(shí)導(dǎo)入關(guān)系如下。

source data

source data example

string to int

strict_mode

result

空值

\N

N/A

true or false

NULL

not null

aaa

NULL

true

invalid data(filtered)

not null

aaa

NULL

false

NULL

not null

1 or 10

1

true or false

correct data

說(shuō)明

10雖然是一個(gè)超過(guò)范圍的值,但是因?yàn)槠漕?lèi)型符合Decimal的要求,所以strict mode對(duì)其不產(chǎn)生影響。10最后會(huì)在其他ETL處理流程中被過(guò)濾。但不會(huì)被strict mode過(guò)濾。

data_source_properties參數(shù)說(shuō)明

FROM KAFKA
(
    "key1" = "val1",
    "key2" = "val2"
)

參數(shù)名稱(chēng)

參數(shù)說(shuō)明

kafka_broker_list

指定Kafka的Broker連接信息。格式為ip:host。多個(gè)Broker之間以逗號(hào)分隔。

格式:"kafka_broker_list"="broker1:9092,broker2:9092"

kafka_topic

指定訂閱的Kafka的Topic。

格式:"kafka_topic"="my_topic"

kafka_partitions/kafka_offsets

指定訂閱的Kafka Partition,以及對(duì)應(yīng)的每個(gè)Partition的起始Offset。如果指定時(shí)間,則會(huì)從大于等于該時(shí)間的最近一個(gè)Offset處開(kāi)始消費(fèi)。

Offset可以指定從大于等于0的具體Offset,或者:

  • OFFSET_BEGINNING:從有數(shù)據(jù)的位置開(kāi)始訂閱。

  • OFFSET_END:從末尾開(kāi)始訂閱。

  • 時(shí)間格式,如:"2021-05-22 11:00:00"

如果沒(méi)有指定,則默認(rèn)從OFFSET_END開(kāi)始訂閱Topic下的所有Partition。

示例如下。

"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00"
重要

時(shí)間格式不能和OFFSET格式混用。

property

指定自定義Kafka參數(shù)。功能等同于Kafka shell中"--property"參數(shù)。

當(dāng)參數(shù)的value為一個(gè)文件時(shí),需要在value前加上關(guān)鍵詞:"FILE:"。

Property參數(shù)說(shuō)明

  • 使用SSL連接Kafka時(shí),需要指定以下參數(shù):

    "property.security.protocol" = "ssl",
    "property.ssl.ca.location" = "FILE:ca.pem",
    "property.ssl.certificate.location" = "FILE:client.pem",
    "property.ssl.key.location" = "FILE:client.key",
    "property.ssl.key.password" = "abcdefg"

    其中property.security.protocolproperty.ssl.ca.location為必選項(xiàng),用于指明連接方式為SSL,以及CA證書(shū)的位置。

    如果Kafka Server端開(kāi)啟了Client認(rèn)證,則還需設(shè)置以下參數(shù)。

    "property.ssl.certificate.location"
    "property.ssl.key.location"
    "property.ssl.key.password"

    分別用于指定Client的public key、private key、private key的密碼。

  • 指定Kafka Partition的默認(rèn)起始o(jì)ffset。

    沒(méi)有指定kafka_partitions/kafka_offsets,默認(rèn)消費(fèi)所有分區(qū)。此時(shí)可以通過(guò)kafka_default_offsets指定起始o(jì)ffset。默認(rèn)為OFFSET_END,即從末尾開(kāi)始訂閱。

    "property.kafka_default_offsets" = "OFFSET_BEGINNING"

更多支持的自定義參數(shù),請(qǐng)參閱librdkafka的官方CONFIGURATION文檔中,Client端的配置項(xiàng)。例如以下參數(shù)。

"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"

使用示例

創(chuàng)建Routine Load簡(jiǎn)單作業(yè)

  1. 創(chuàng)建待導(dǎo)入的SelectDB數(shù)據(jù)表,示例如下。

    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50),
        url varchar(500)
    )
    UNIQUE KEY(`id`, `name`)
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
  2. 分別設(shè)置不同的參數(shù)導(dǎo)入數(shù)據(jù),示例如下。

    • 為example_db的test_table創(chuàng)建一個(gè)名為test1的Kafka Routine Load任務(wù)。指定列分隔符的group.id和client.id,設(shè)置自動(dòng)默認(rèn)消費(fèi)所有分區(qū),且從有數(shù)據(jù)的位置(OFFSET_BEGINNING)開(kāi)始訂閱,示例如下。

      CREATE ROUTINE LOAD example_db.test1 ON test_table
      COLUMNS TERMINATED BY ",",
      COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      );
    • 為example_db的test_table創(chuàng)建一個(gè)名為test2的Kafka Routine Load任務(wù)。導(dǎo)入任務(wù)為嚴(yán)格模式,示例如下。

      CREATE ROUTINE LOAD example_db.test2 ON test_table
      COLUMNS TERMINATED BY ",",
      COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "true"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      );
    • 從指定的時(shí)間點(diǎn)開(kāi)始消費(fèi),示例如下。

      CREATE ROUTINE LOAD example_db.test4 ON test_table
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "30",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200"
      ) FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offset" = "2024-01-21 10:00:00"
      );

導(dǎo)入JSON格式數(shù)據(jù)

Routine Load導(dǎo)入的JSON格式僅支持以下兩種。

  • 只有一條記錄,且為JSON對(duì)象。

    當(dāng)使用單表導(dǎo)入(即通過(guò)ON TABLE_NAME指定表名)時(shí),JSON數(shù)據(jù)格式如下。

    {"key1":"value1","key2":"value2","key3":"value3"}

    當(dāng)使用動(dòng)態(tài)或多表導(dǎo)入Routine Load(即不指定具體的表名)時(shí),JSON數(shù)據(jù)格式如下。

    table_name|{"key1":"value1","key2":"value2","key3":"value3"}
  • JSON數(shù)組,數(shù)組中可含多條記錄。

    當(dāng)使用單表導(dǎo)入(即通過(guò)ON TABLE_NAME指定表名)時(shí),JSON數(shù)據(jù)格式如下。

    [
        {   
            "key1":"value11",
            "key2":"value12",
            "key3":"value13",
            "key4":14
        },
        {
            "key1":"value21",
            "key2":"value22",
            "key3":"value23",
            "key4":24
        },
        {
            "key1":"value31",
            "key2":"value32",
            "key3":"value33",
            "key4":34
        }
    ]

    當(dāng)使用動(dòng)態(tài)或多表導(dǎo)入(即不指定具體的表名)時(shí),JSON數(shù)據(jù)格式如下。

       table_name|[
        {   
            "key1":"value11",
            "key2":"value12",
            "key3":"value13",
            "key4":14
        },
        {
            "key1":"value21",
            "key2":"value22",
            "key3":"value23",
            "key4":24
        },
        {
            "key1":"value31",
            "key2":"value32",
            "key3":"value33",
            "key4":34
        }
    ]

導(dǎo)入JSON格式數(shù)據(jù),示例如下。

  1. 創(chuàng)建待導(dǎo)入的SelectDB數(shù)據(jù)表,示例如下。

    CREATE TABLE `example_tbl` (
       `category` varchar(24) NULL COMMENT "",
       `author` varchar(24) NULL COMMENT "",
       `timestamp` bigint(20) NULL COMMENT "",
       `dt` int(11) NULL COMMENT "",
       `price` double REPLACE
    ) ENGINE=OLAP
    AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
    COMMENT "OLAP"
    PARTITION BY RANGE(`dt`)
    (
      PARTITION p0 VALUES [("-2147483648"), ("20230509")),
        PARTITION p20200509 VALUES [("20230509"), ("20231010")),
        PARTITION p20200510 VALUES [("20231010"), ("20231211")),
        PARTITION p20200511 VALUES [("20231211"), ("20240512"))
    )
    DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4;
  2. 分別使用兩個(gè)類(lèi)型的JSON數(shù)據(jù)記錄到Topic里:

    {
        "category":"value1331",
        "author":"value1233",
        "timestamp":1700346050,
        "price":1413
    }
    [
        {
            "category":"value13z2",
            "author":"vaelue13",
            "timestamp":1705645251,
            "price":14330
        },
        {
            "category":"lvalue211",
            "author":"lvalue122",
            "timestamp":1684448450,
            "price":24440
        }
    ]
  3. 以不同模式導(dǎo)入JSON數(shù)據(jù),示例如下。

    • 以簡(jiǎn)單模式導(dǎo)入JSON數(shù)據(jù)。

      CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl
      COLUMNS(category,price,author)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false",
          "format" = "json"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "kafka_partitions" = "0,1,2",
          "kafka_offsets" = "0,0,0"
       );
    • 精準(zhǔn)導(dǎo)入JSON格式數(shù)據(jù)。

      CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl
      COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false",
          "format" = "json",
          "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
          "strip_outer_array" = "true"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "kafka_partitions" = "0,1,2",
          "kafka_offsets" = "0,0,0"
      );
      說(shuō)明

      表里的分區(qū)字段dt在示例數(shù)據(jù)里并沒(méi)有,而是在Routine Load語(yǔ)句里通過(guò)dt=from_unixtime(timestamp,'%Y%m%d')轉(zhuǎn)換出來(lái)的。

訪問(wèn)不同驗(yàn)證方式的Kafka集群

根據(jù)Kafka集群驗(yàn)證方式的不同,訪問(wèn)的方式示例如下。

  1. 訪問(wèn)SSL認(rèn)證的Kafka集群。

    訪問(wèn)SSL認(rèn)證的Kafka集群需要您提供用于認(rèn)證Kafka Broker公鑰的證書(shū)文件(ca.pem)。如果Kafka集群同時(shí)開(kāi)啟了客戶端認(rèn)證,則還需提供客戶端的公鑰(client.pem)、密鑰文件(client.key),以及密鑰密碼。這里所需的文件需要先通過(guò)CREATE FILE命令上傳到SelectDB中,并且Catalog名稱(chēng)為kafka。

    1. 上傳文件,示例如下。

      CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
      CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
      CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
    2. 創(chuàng)建Routine Load作業(yè),示例如下。

      CREATE ROUTINE LOAD db1.job1 on tbl1
      PROPERTIES
      (
          "desired_concurrent_number"="1"
      )
      FROM KAFKA
      (
          "kafka_broker_list"= "broker1:9091,broker2:9091",
          "kafka_topic" = "my_topic",
          "property.security.protocol" = "ssl",
          "property.ssl.ca.location" = "FILE:ca.pem",
          "property.ssl.certificate.location" = "FILE:client.pem",
          "property.ssl.key.location" = "FILE:client.key",
          "property.ssl.key.password" = "abcdefg"
      );
      說(shuō)明

      SelectDB通過(guò)Kafka的C++ APIlibrdkafka來(lái)訪問(wèn)Kafka集群。librdkafka所支持的參數(shù)可以參閱Configuration properties

  2. 訪問(wèn)PLAIN認(rèn)證的Kafka集群。

    訪問(wèn)開(kāi)啟PLAIN認(rèn)證的Kafka集群,需要增加配置如下。

    1. property.security.protocol=SASL_PLAINTEXT:使用SASL plaintext。

    2. property.sasl.mechanism=PLAIN:設(shè)置SASL的認(rèn)證方式為PLAIN。

    3. property.sasl.username=admin:設(shè)置SASL的用戶名。

    4. property.sasl.password=admin:設(shè)置SASL的密碼。

    創(chuàng)建Routine Load作業(yè),示例如下。

    CREATE ROUTINE LOAD db1.job1 on tbl1
    PROPERTIES (
    "desired_concurrent_number"="1",
     )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092",
        "kafka_topic" = "my_topic",
        "property.security.protocol"="SASL_PLAINTEXT",
        "property.sasl.mechanism"="PLAIN",
        "property.sasl.username"="admin",
        "property.sasl.password"="admin"
    );
    
  3. 訪問(wèn)Kerberos認(rèn)證的Kafka集群。

    訪問(wèn)開(kāi)啟kerberos認(rèn)證的Kafka集群,需要增加配置如下。

    1. security.protocol=SASL_PLAINTEXT:使用SASL plaintext。

    2. sasl.kerberos.service.name=$SERVICENAME:設(shè)置broker servicename。

    3. sasl.kerberos.keytab=/etc/security/keytabs/${CLIENT_NAME}.keytab:設(shè)置Keytab本地文件路徑。

    4. sasl.kerberos.principal=${CLIENT_NAME}/${CLIENT_HOST}:設(shè)置SelectDB連接Kafka時(shí)使用的Kerberos主體。

    創(chuàng)建Routine Load作業(yè),示例如下。

    CREATE ROUTINE LOAD db1.job1 on tbl1
    PROPERTIES (
    "desired_concurrent_number"="1",
     )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092",
        "kafka_topic" = "my_topic",
        "property.security.protocol" = "SASL_PLAINTEXT",
        "property.sasl.kerberos.service.name" = "kafka",
        "property.sasl.kerberos.keytab" = "/etc/krb5.keytab",
        "property.sasl.kerberos.principal" = "id@your.com"
    );
    說(shuō)明
    • 若要使SelectDB訪問(wèn)開(kāi)啟kerberos認(rèn)證方式的Kafka集群,需要在SelectDB集群所有運(yùn)行節(jié)點(diǎn)上部署Kerberos客戶端kinit,并配置krb5.conf,填寫(xiě)KDC服務(wù)信息等。

    • 配置property.sasl.kerberos.keytab的值需要指定Keytab本地文件的絕對(duì)路徑,并允許SelectDB進(jìn)程訪問(wèn)該本地文件。

修改導(dǎo)入作業(yè)

修改已經(jīng)創(chuàng)建的例行導(dǎo)入作業(yè)。只能修改處于PAUSED狀態(tài)的作業(yè)。

語(yǔ)法

ALTER ROUTINE LOAD FOR <job_name>
[job_properties]
FROM <data_source>
[data_source_properties]

參數(shù)說(shuō)明

參數(shù)名稱(chēng)

參數(shù)說(shuō)明

[db.]job_name

指定要修改的作業(yè)名稱(chēng)。

tbl_name

指定需要導(dǎo)入的表的名稱(chēng)。

job_properties

指定需要修改的作業(yè)參數(shù)。目前僅支持修改的參數(shù)如下。

  • desired_concurrent_number

  • max_error_number

  • max_batch_interval

  • max_batch_rows

  • max_batch_size

  • jsonpaths

  • json_root

  • strip_outer_array

  • strict_mode

  • timezone

  • num_as_string

  • fuzzy_parse

data_source

指定數(shù)據(jù)源的類(lèi)型。當(dāng)前支持:KAFKA

data_source_properties

指定數(shù)據(jù)源的相關(guān)屬性。目前僅支持的屬性如下。

  1. kafka_partitions

  2. kafka_offsets

  3. kafka_broker_list

  4. kafka_topic

  5. 自定義property,如property.group.id

說(shuō)明

kafka_partitionskafka_offsets用于修改待消費(fèi)的kafka partition 的offset,僅能修改當(dāng)前已經(jīng)消費(fèi)的partition。不能新增partition。

使用示例

  • desired_concurrent_number修改為1,示例如下。

    ALTER ROUTINE LOAD FOR db1.label1
    PROPERTIES
    (
        "desired_concurrent_number" = "1"
    );
  • desired_concurrent_number修改為10,修改partition的offset,修改group id,示例如下。

    ALTER ROUTINE LOAD FOR db1.label1
    PROPERTIES
    (
        "desired_concurrent_number" = "10"
    )
    FROM kafka
    (
        "kafka_partitions" = "0, 1, 2",
        "kafka_offsets" = "100, 200, 100",
        "property.group.id" = "new_group"
    );

暫停導(dǎo)入作業(yè)

暫停一個(gè)Routine Load作業(yè)。被暫停的作業(yè)可以通過(guò)RESUME命令重新運(yùn)行。

語(yǔ)法

PAUSE [ALL] ROUTINE LOAD FOR <job_name>;

參數(shù)說(shuō)明

參數(shù)名稱(chēng)

參數(shù)說(shuō)明

[db.]job_name

指定要暫停的作業(yè)名稱(chēng)。

使用示例

  • 暫停名稱(chēng)為test1的Routine Load作業(yè),示例如下。

    PAUSE ROUTINE LOAD FOR test1;
  • 暫停所有Routine Load作業(yè),示例如下。

    PAUSE ALL ROUTINE LOAD;

恢復(fù)導(dǎo)入作業(yè)

恢復(fù)一個(gè)被暫停的Routine Load作業(yè)。恢復(fù)的作業(yè)將繼續(xù)從之前已消費(fèi)的offset繼續(xù)消費(fèi)。

語(yǔ)法

RESUME [ALL] ROUTINE LOAD FOR <job_name>

參數(shù)說(shuō)明

參數(shù)名稱(chēng)

參數(shù)說(shuō)明

[db.]job_name

指定要恢復(fù)的作業(yè)名稱(chēng)。

使用示例

  • 恢復(fù)名稱(chēng)為test1的Routine Load作業(yè),示例如下。

    RESUME ROUTINE LOAD FOR test1;
  • 恢復(fù)所有Routine Load作業(yè),示例如下。

    RESUME ALL ROUTINE LOAD;

停止導(dǎo)入作業(yè)

停止一個(gè)Routine Load作業(yè)。被停止的作業(yè)無(wú)法再重新運(yùn)行。停止導(dǎo)入后,已導(dǎo)入數(shù)據(jù)不會(huì)回滾。

語(yǔ)法

STOP ROUTINE LOAD FOR <job_name>;

參數(shù)說(shuō)明

參數(shù)名稱(chēng)

參數(shù)說(shuō)明

[db.]job_name

指定要停止的作業(yè)名稱(chēng)。

使用示例

停止名稱(chēng)為test1的Routine Load作業(yè),示例如下。

STOP ROUTINE LOAD FOR test1;

查看導(dǎo)入作業(yè)

Routine Load作業(yè)運(yùn)行狀態(tài)需要通過(guò)SHOW ROUTINE LOAD命令查看。

語(yǔ)法

SHOW [ALL] ROUTINE LOAD [FOR job_name];

參數(shù)說(shuō)明

參數(shù)名稱(chēng)

參數(shù)說(shuō)明

[db.]job_name

指定要查看的作業(yè)名稱(chēng)。

說(shuō)明

如果導(dǎo)入數(shù)據(jù)格式不合法,詳細(xì)的錯(cuò)誤信息會(huì)記錄在ErrorLogUrls中。注意其中包含多個(gè)鏈接,拷貝其中任意一個(gè)鏈接在瀏覽器中查看即可。

使用示例

  • 展示名稱(chēng)為test1的所有Routine Load作業(yè)(包括已停止或取消的作業(yè))。結(jié)果為一行或多行。

    SHOW ALL ROUTINE LOAD FOR test1;
  • 展示名稱(chēng)為test1的當(dāng)前正在運(yùn)行的Routine Load作業(yè)。

    SHOW ROUTINE LOAD FOR test1;
  • 顯示example_db下,所有的Routine Load作業(yè)(包括已停止或取消的作業(yè))。結(jié)果為一行或多行。

    use example_db;
    SHOW ALL ROUTINE LOAD;
  • 顯示example_db下,所有正在運(yùn)行的Routine Load作業(yè)。

    use example_db;
    SHOW ROUTINE LOAD;
  • 顯示example_db下,名稱(chēng)為test1的當(dāng)前正在運(yùn)行的Routine Load作業(yè)。

    SHOW ROUTINE LOAD FOR example_db.test1;
  • 顯示example_db下,名稱(chēng)為test1的所有Routine Load作業(yè)(包括已停止或取消的作業(yè))。結(jié)果為一行或多行。

    SHOW ALL ROUTINE LOAD FOR example_db.test1;

相關(guān)系統(tǒng)配置?

相關(guān)系統(tǒng)配置參數(shù)會(huì)影響Routine Load的使用。

  • max_routine_load_task_concurrent_num

    FE配置項(xiàng),默認(rèn)為5,可以運(yùn)行時(shí)修改。該參數(shù)限制了一個(gè)例行導(dǎo)入作業(yè)最大的子任務(wù)并發(fā)數(shù)。建議維持默認(rèn)值。設(shè)置過(guò)大,可能導(dǎo)致同時(shí)并發(fā)的任務(wù)數(shù)過(guò)多,占用集群資源。

  • max_routine_load_task_num_per_be

    FE配置項(xiàng),默認(rèn)為5,可以運(yùn)行時(shí)修改。該參數(shù)限制了每個(gè)BE節(jié)點(diǎn)最多并發(fā)執(zhí)行的子任務(wù)個(gè)數(shù)。建議維持默認(rèn)值。如果設(shè)置過(guò)大,可能導(dǎo)致并發(fā)任務(wù)數(shù)過(guò)多,占用集群資源。

  • max_routine_load_job_num

    FE配置項(xiàng),默認(rèn)為100,可以運(yùn)行時(shí)修改。該參數(shù)限制Routine Load作業(yè)的總數(shù),包括NEED_SCHEDULED,RUNNING,PAUSE這些狀態(tài)。超過(guò)后,不能再提交新的作業(yè)。

  • max_consumer_num_per_group

    BE 配置項(xiàng),默認(rèn)為3。該參數(shù)表示一個(gè)子任務(wù)中最多生成幾個(gè)Consumer進(jìn)行數(shù)據(jù)消費(fèi)。對(duì)于Kafka數(shù)據(jù)源,一個(gè)Consumer可能消費(fèi)一個(gè)或多個(gè)Kafka Partition。假設(shè)一個(gè)任務(wù)需要消費(fèi)6個(gè)Kafka Partition,則會(huì)生成3個(gè)Consumer,每個(gè)Consumer消費(fèi)2個(gè)Partition。如果只有2個(gè)Partition,則只會(huì)生成2個(gè)Consumer,每個(gè)Consumer消費(fèi)1個(gè)Partition。

  • max_tolerable_backend_down_num

    FE配置項(xiàng),默認(rèn)值是0。在滿足某些條件下,SelectDB可令PAUSED的任務(wù)重新調(diào)度,變成RUNNING。該參數(shù)為0代表只有所有BE節(jié)點(diǎn)是alive狀態(tài)才允許重新調(diào)度。

  • period_of_auto_resume_min

    FE配置項(xiàng),默認(rèn)是5分鐘。該項(xiàng)意味著當(dāng)SelectDB重新調(diào)度任務(wù)時(shí),只會(huì)在5分鐘這個(gè)周期內(nèi)最多嘗試3次。如果3次都失敗則鎖定當(dāng)前任務(wù),后續(xù)不再進(jìn)行調(diào)度,但可通過(guò)人為干預(yù),進(jìn)行手動(dòng)恢復(fù)。

其他說(shuō)明?

  • Routine Load作業(yè)和ALTER TABLE操作的關(guān)系。

    • Routine Load不會(huì)阻塞Schema變更和ROLLUP操作。但Schema變更完成后,列映射關(guān)系無(wú)法匹配,則會(huì)導(dǎo)致作業(yè)的錯(cuò)誤數(shù)據(jù)激增,最終導(dǎo)致作業(yè)暫停。建議通過(guò)在Routine Load作業(yè)中顯式指定列映射關(guān)系,并且通過(guò)增加Nullable列或帶Default值的列來(lái)減少這些問(wèn)題。

    • 刪除表的Partition可能會(huì)導(dǎo)致導(dǎo)入數(shù)據(jù)無(wú)法找到對(duì)應(yīng)的Partition,導(dǎo)致作業(yè)進(jìn)入暫停狀態(tài)。

  • Routine Load作業(yè)和其他導(dǎo)入作業(yè)的關(guān)系(LOAD,DELETE,INSERT)。

    • Routine Load和其他LOAD作業(yè)以及INSERT操作沒(méi)有沖突。

    • 當(dāng)執(zhí)行DELETE操作時(shí),對(duì)應(yīng)表分區(qū)不能有任何正在執(zhí)行的導(dǎo)入任務(wù)。因此在執(zhí)行DELETE操作前,需要暫停Routine Load作業(yè),并等待已下發(fā)的任務(wù)全部完成后,方可以執(zhí)行DELETE。

  • Routine Load作業(yè)和DROP DATABASE或DROP TABLE操作的關(guān)系。

    當(dāng)Routine Load對(duì)應(yīng)的database或table被刪除后,作業(yè)會(huì)自動(dòng)CANCEL。

  • Kafka類(lèi)型的Routine Load作業(yè)和Kafka topic的關(guān)系。

    當(dāng)創(chuàng)建例行導(dǎo)入聲明的kafka_topic在Kafka集群中不存在時(shí):

    • Kafka集群的Broker設(shè)置了auto.create.topics.enable=true,則topic會(huì)先被自動(dòng)創(chuàng)建,自動(dòng)創(chuàng)建的Partition個(gè)數(shù)是由您的Kafka集群中的Broker配置num.partitions決定的。例行作業(yè)會(huì)不斷讀取該topic的數(shù)據(jù)。

    • Kafka集群的Broker設(shè)置了auto.create.topics.enable=false,則topic不會(huì)被自動(dòng)創(chuàng)建,例行作業(yè)會(huì)在沒(méi)有讀取任何數(shù)據(jù)之前就被暫停,狀態(tài)置為PAUSED。

    所以,如果您希望當(dāng)Kafka topic不存在的時(shí)候,被例行作業(yè)自動(dòng)創(chuàng)建的話,只需要將Kafka集群中的Broker設(shè)置auto.create.topics.enable=true即可。

  • 當(dāng)環(huán)境中存在網(wǎng)段和域名解析的隔離措施,因此需要注意以下問(wèn)題。

    • 創(chuàng)建Routine Load任務(wù)中指定的Broker list必須能夠被SelectDB服務(wù)訪問(wèn)。

    • Kafka中如果配置了advertised.listeners,advertised.listeners中的地址必須能夠被SelectDB服務(wù)訪問(wèn)。

  • 指定消費(fèi)的Partition和Offset。

    SelectDB支持指定Partition,Offset和時(shí)間點(diǎn)進(jìn)行消費(fèi)。這里說(shuō)明下對(duì)應(yīng)參數(shù)的配置關(guān)系。

    三個(gè)相關(guān)參數(shù)如下。

    • kafka_partitions:指定待消費(fèi)的Partition列表,如:"0,1,2,3"。

    • kafka_offsets:指定每個(gè)分區(qū)的起始o(jì)ffset,必須和kafka_partitions列表個(gè)數(shù)對(duì)應(yīng)。如:"1000,1000,2000,2000"

    • property.kafka_default_offset:指定分區(qū)默認(rèn)的起始Offset。

    在創(chuàng)建導(dǎo)入作業(yè)時(shí),這三個(gè)參數(shù)可以有以下五種組合方式。

    方式

    kafka_partitions

    kafka_offsets

    property.kafka_default_offset

    行為

    1

    No

    No

    No

    系統(tǒng)會(huì)自動(dòng)查找Topic對(duì)應(yīng)的所有分區(qū)并從OFFSET_END開(kāi)始消費(fèi)。

    2

    No

    No

    Yes

    系統(tǒng)會(huì)自動(dòng)查找Topic對(duì)應(yīng)的所有分區(qū)并從default offset指定的位置開(kāi)始消費(fèi)。

    3

    Yes

    No

    No

    系統(tǒng)會(huì)從指定分區(qū)的OFFSET_END開(kāi)始消費(fèi)。

    4

    Yes

    Yes

    No

    系統(tǒng)會(huì)從指定分區(qū)的指定Offset處開(kāi)始消費(fèi)。

    5

    Yes

    No

    Yes

    系統(tǒng)會(huì)從指定分區(qū),default Offset指定的位置開(kāi)始消費(fèi)。

  • STOP和PAUSE的區(qū)別。

    FE會(huì)自動(dòng)定期清理STOP狀態(tài)的Routine Load,而PAUSE狀態(tài)的則可以再次被恢復(fù)啟用。