Routine Load
Routine Load支持提交一個(gè)常駐的導(dǎo)入作業(yè),不斷地從指定的數(shù)據(jù)源讀取數(shù)據(jù),將數(shù)據(jù)持續(xù)地導(dǎo)入至云數(shù)據(jù)庫(kù) SelectDB 版中。本文介紹如何通過(guò)Routine Load將Kafka中的數(shù)據(jù)導(dǎo)入至云數(shù)據(jù)庫(kù) SelectDB 版實(shí)例。
前提條件
支持的數(shù)據(jù)源:目前僅支持Kafka數(shù)據(jù)源,可通過(guò)無(wú)認(rèn)證方式或PLAIN/SSL/Kerberos等認(rèn)證方式連接Kafka。
支持的消息格式:
CSV
或JSON
格式。CSV的格式,每一個(gè)Message為一行,且行尾不包含換行符。
注意事項(xiàng)
默認(rèn)支持Kafka 0.10.0.0(含)以上版本。如果使用Kafka 0.10.0.0以下版本(0.9.0,0.8.2,0.8.1,0.8.0),需要Kafka兼容舊版本,具體操作有以下兩種方式:
將BE的配置
kafka_broker_version_fallback
的值設(shè)置為要兼容的舊版本。創(chuàng)建Routine Load的時(shí)候直接設(shè)置
property.broker.version.fallback
的值為要兼容的舊版本。
使用兼容舊版本的代價(jià)在于,Routine Load的部分新特性可能無(wú)法使用,例如根據(jù)時(shí)間設(shè)置Kafka分區(qū)的offset。
創(chuàng)建導(dǎo)入作業(yè)
使用Routine Load功能時(shí),首先需創(chuàng)建一個(gè)Routine Load作業(yè)。該作業(yè)將通過(guò)例行調(diào)度持續(xù)發(fā)送任務(wù),每個(gè)任務(wù)會(huì)消耗一定數(shù)量的Kafka消息。
語(yǔ)法
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]
參數(shù)說(shuō)明
參數(shù)名稱(chēng) | 參數(shù)說(shuō)明 |
| 導(dǎo)入作業(yè)的名稱(chēng)。在同一個(gè)Database內(nèi),相同名稱(chēng)的job只能運(yùn)行一個(gè)。 |
| 指定導(dǎo)入的表的名稱(chēng)。 |
| 指定數(shù)據(jù)合并類(lèi)型。默認(rèn)為 |
| 指定導(dǎo)入數(shù)據(jù)處理相關(guān)參數(shù)。詳細(xì)參數(shù)說(shuō)明,請(qǐng)參見(jiàn)load_properties。 |
| 指定導(dǎo)入作業(yè)相關(guān)參數(shù)。詳細(xì)參數(shù)說(shuō)明,請(qǐng)參見(jiàn)job_properties參數(shù)說(shuō)明。 |
| 指定數(shù)據(jù)源的類(lèi)型。詳細(xì)參數(shù)說(shuō)明,請(qǐng)參見(jiàn)data_source_properties參數(shù)說(shuō)明。 |
load_properties參數(shù)說(shuō)明
[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]
參數(shù)名稱(chēng) | 示例值 | 參數(shù)說(shuō)明 |
| COLUMNS TERMINATED BY "," | 指定列分隔符,默認(rèn)為 |
| (k1,k2,tmpk1,k3=tmpk1+1) | 指定文件列和表中列的映射關(guān)系,以及各種列轉(zhuǎn)換等。詳細(xì)說(shuō)明請(qǐng)參見(jiàn)數(shù)據(jù)轉(zhuǎn)化。 |
| 無(wú) | 指定過(guò)濾原始數(shù)據(jù)條件。詳細(xì)說(shuō)明請(qǐng)參見(jiàn)數(shù)據(jù)轉(zhuǎn)化。 |
| WHERE k1>100 and k2=1000 | 指定條件對(duì)導(dǎo)入的數(shù)據(jù)進(jìn)行過(guò)濾。詳細(xì)說(shuō)明請(qǐng)參見(jiàn)數(shù)據(jù)轉(zhuǎn)化。 |
| PARTITION(p1,p2,p3) | 指定導(dǎo)入目的表的哪些Partition中。如果不指定,則會(huì)自動(dòng)導(dǎo)入到對(duì)應(yīng)的Partition中。 |
| DELETE ON v3>100 | 用于指定導(dǎo)入數(shù)據(jù)中表示Delete Flag的列和計(jì)算關(guān)系。 說(shuō)明 需配合MEREGE導(dǎo)入模式一起使用,僅適用于Unique Key模型的表。 |
| 無(wú) | 用于指定導(dǎo)入數(shù)據(jù)中表示Sequence Col的列。其功能為導(dǎo)入數(shù)據(jù)時(shí)保證數(shù)據(jù)順序。 說(shuō)明 僅適用于對(duì)Unique Key模型的表。 |
job_properties參數(shù)說(shuō)明
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)
這三個(gè)參數(shù)max_batch_interval、max_batch_rows和max_batch_size用于控制子任務(wù)的執(zhí)行時(shí)間和處理量。當(dāng)任意一個(gè)參數(shù)達(dá)到設(shè)定閾值時(shí),任務(wù)將終止。
參數(shù)名稱(chēng) | 示例值 | 參數(shù)說(shuō)明 |
| "desired_concurrent_number" = "3" | 指定期望并發(fā)度。大于0,默認(rèn)為 說(shuō)明
|
| "max_batch_interval" = "20" | 指定每個(gè)子任務(wù)最大執(zhí)行時(shí)間,單位是秒,默認(rèn)為 |
| "max_batch_rows" = "300000" | 指定每個(gè)子任務(wù)最多讀取的行數(shù)。默認(rèn)是 |
| "max_batch_size" = "209715200" | 指定每個(gè)子任務(wù)最多讀取的字節(jié)數(shù)。單位是字節(jié),默認(rèn)為 |
| "max_error_number"="3" | 指定采樣窗口內(nèi),允許的最大錯(cuò)誤行數(shù)。默認(rèn)為 采樣窗口為 說(shuō)明 被 |
| "strict_mode"="true" | 指定是否開(kāi)啟嚴(yán)格模式,默認(rèn)為
|
| "timezone" = "Africa/Abidjan" | 指定導(dǎo)入作業(yè)所使用的時(shí)區(qū)。默認(rèn)為使用Session的時(shí)區(qū)作為參數(shù)。 說(shuō)明 該參數(shù)會(huì)影響所有導(dǎo)入涉及的和時(shí)區(qū)有關(guān)的函數(shù)結(jié)果。 |
| "format" = "json" | 指定導(dǎo)入數(shù)據(jù)格式,默認(rèn)為 |
| -H "jsonpaths:[\"$.k2\",\"$.k1\"]" | 當(dāng)導(dǎo)入數(shù)據(jù)格式為 |
| -H "strip_outer_array:true" | 當(dāng)導(dǎo)入數(shù)據(jù)格式為 |
| -H "json_root:$.RECORDS" | 當(dāng)導(dǎo)入數(shù)據(jù)格式為JSON時(shí),可以通過(guò) |
| 無(wú) | 指定設(shè)置發(fā)送批處理數(shù)據(jù)的并行度,如果并行度的值超過(guò)BE配置中的 |
| 無(wú) | 指定是否只導(dǎo)入數(shù)據(jù)到對(duì)應(yīng)分區(qū)的一個(gè)tablet,默認(rèn)值為false。該參數(shù)只允許向?qū)в衦andom分區(qū)的Duplicate表導(dǎo)入數(shù)據(jù)的時(shí)設(shè)置。 |
嚴(yán)格模式(strict mode)與原始數(shù)據(jù)(source data)的導(dǎo)入關(guān)系
例如列類(lèi)型為T(mén)inyInt。當(dāng)表中的列允許導(dǎo)入空值時(shí)導(dǎo)入關(guān)系如下。
source data | source data example | string to int | strict_mode | result |
空值 | \N | N/A | true or false | NULL |
not null | aaa or 2000 | NULL | true | invalid data(filtered) |
not null | aaa | NULL | false | NULL |
not null | 1 | 1 | true or false | correct data |
例如列類(lèi)型為Decimal(1,0)。當(dāng)表中的列允許導(dǎo)入空值時(shí)導(dǎo)入關(guān)系如下。
source data | source data example | string to int | strict_mode | result |
空值 | \N | N/A | true or false | NULL |
not null | aaa | NULL | true | invalid data(filtered) |
not null | aaa | NULL | false | NULL |
not null | 1 or 10 | 1 | true or false | correct data |
10雖然是一個(gè)超過(guò)范圍的值,但是因?yàn)槠漕?lèi)型符合Decimal的要求,所以strict mode對(duì)其不產(chǎn)生影響。10最后會(huì)在其他ETL處理流程中被過(guò)濾。但不會(huì)被strict mode過(guò)濾。
data_source_properties參數(shù)說(shuō)明
FROM KAFKA
(
"key1" = "val1",
"key2" = "val2"
)
參數(shù)名稱(chēng) | 參數(shù)說(shuō)明 |
| 指定Kafka的Broker連接信息。格式為 格式: |
| 指定訂閱的Kafka的Topic。 格式: |
| 指定訂閱的Kafka Partition,以及對(duì)應(yīng)的每個(gè)Partition的起始Offset。如果指定時(shí)間,則會(huì)從大于等于該時(shí)間的最近一個(gè)Offset處開(kāi)始消費(fèi)。 Offset可以指定從大于等于0的具體Offset,或者:
如果沒(méi)有指定,則默認(rèn)從 示例如下。
重要 時(shí)間格式不能和OFFSET格式混用。 |
| 指定自定義Kafka參數(shù)。功能等同于Kafka shell中"--property"參數(shù)。 當(dāng)參數(shù)的value為一個(gè)文件時(shí),需要在value前加上關(guān)鍵詞:" |
Property參數(shù)說(shuō)明
使用SSL連接Kafka時(shí),需要指定以下參數(shù):
"property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg"
其中
property.security.protocol
和property.ssl.ca.location
為必選項(xiàng),用于指明連接方式為SSL,以及CA證書(shū)的位置。如果Kafka Server端開(kāi)啟了Client認(rèn)證,則還需設(shè)置以下參數(shù)。
"property.ssl.certificate.location" "property.ssl.key.location" "property.ssl.key.password"
分別用于指定Client的public key、private key、private key的密碼。
指定Kafka Partition的默認(rèn)起始o(jì)ffset。
沒(méi)有指定
kafka_partitions/kafka_offsets
,默認(rèn)消費(fèi)所有分區(qū)。此時(shí)可以通過(guò)kafka_default_offsets
指定起始o(jì)ffset。默認(rèn)為OFFSET_END
,即從末尾開(kāi)始訂閱。"property.kafka_default_offsets" = "OFFSET_BEGINNING"
更多支持的自定義參數(shù),請(qǐng)參閱librdkafka的官方CONFIGURATION文檔中,Client端的配置項(xiàng)。例如以下參數(shù)。
"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"
使用示例
創(chuàng)建Routine Load簡(jiǎn)單作業(yè)
創(chuàng)建待導(dǎo)入的SelectDB數(shù)據(jù)表,示例如下。
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1");
分別設(shè)置不同的參數(shù)導(dǎo)入數(shù)據(jù),示例如下。
為example_db的test_table創(chuàng)建一個(gè)名為test1的Kafka Routine Load任務(wù)。指定列分隔符的group.id和client.id,設(shè)置自動(dòng)默認(rèn)消費(fèi)所有分區(qū),且從有數(shù)據(jù)的位置(OFFSET_BEGINNING)開(kāi)始訂閱,示例如下。
CREATE ROUTINE LOAD example_db.test1 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
為example_db的test_table創(chuàng)建一個(gè)名為test2的Kafka Routine Load任務(wù)。導(dǎo)入任務(wù)為嚴(yán)格模式,示例如下。
CREATE ROUTINE LOAD example_db.test2 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );
從指定的時(shí)間點(diǎn)開(kāi)始消費(fèi),示例如下。
CREATE ROUTINE LOAD example_db.test4 ON test_table PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "30", "max_batch_rows" = "300000", "max_batch_size" = "209715200" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.kafka_default_offset" = "2024-01-21 10:00:00" );
導(dǎo)入JSON格式數(shù)據(jù)
Routine Load導(dǎo)入的JSON格式僅支持以下兩種。
只有一條記錄,且為JSON對(duì)象。
當(dāng)使用單表導(dǎo)入(即通過(guò)ON TABLE_NAME指定表名)時(shí),JSON數(shù)據(jù)格式如下。
{"key1":"value1","key2":"value2","key3":"value3"}
當(dāng)使用動(dòng)態(tài)或多表導(dǎo)入Routine Load(即不指定具體的表名)時(shí),JSON數(shù)據(jù)格式如下。
table_name|{"key1":"value1","key2":"value2","key3":"value3"}
JSON數(shù)組,數(shù)組中可含多條記錄。
當(dāng)使用單表導(dǎo)入(即通過(guò)ON TABLE_NAME指定表名)時(shí),JSON數(shù)據(jù)格式如下。
[ { "key1":"value11", "key2":"value12", "key3":"value13", "key4":14 }, { "key1":"value21", "key2":"value22", "key3":"value23", "key4":24 }, { "key1":"value31", "key2":"value32", "key3":"value33", "key4":34 } ]
當(dāng)使用動(dòng)態(tài)或多表導(dǎo)入(即不指定具體的表名)時(shí),JSON數(shù)據(jù)格式如下。
table_name|[ { "key1":"value11", "key2":"value12", "key3":"value13", "key4":14 }, { "key1":"value21", "key2":"value22", "key3":"value23", "key4":24 }, { "key1":"value31", "key2":"value32", "key3":"value33", "key4":34 } ]
導(dǎo)入JSON格式數(shù)據(jù),示例如下。
創(chuàng)建待導(dǎo)入的SelectDB數(shù)據(jù)表,示例如下。
CREATE TABLE `example_tbl` ( `category` varchar(24) NULL COMMENT "", `author` varchar(24) NULL COMMENT "", `timestamp` bigint(20) NULL COMMENT "", `dt` int(11) NULL COMMENT "", `price` double REPLACE ) ENGINE=OLAP AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`) COMMENT "OLAP" PARTITION BY RANGE(`dt`) ( PARTITION p0 VALUES [("-2147483648"), ("20230509")), PARTITION p20200509 VALUES [("20230509"), ("20231010")), PARTITION p20200510 VALUES [("20231010"), ("20231211")), PARTITION p20200511 VALUES [("20231211"), ("20240512")) ) DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4;
分別使用兩個(gè)類(lèi)型的JSON數(shù)據(jù)記錄到Topic里:
{ "category":"value1331", "author":"value1233", "timestamp":1700346050, "price":1413 }
[ { "category":"value13z2", "author":"vaelue13", "timestamp":1705645251, "price":14330 }, { "category":"lvalue211", "author":"lvalue122", "timestamp":1684448450, "price":24440 } ]
以不同模式導(dǎo)入JSON數(shù)據(jù),示例如下。
以簡(jiǎn)單模式導(dǎo)入JSON數(shù)據(jù)。
CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl COLUMNS(category,price,author) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );
精準(zhǔn)導(dǎo)入JSON格式數(shù)據(jù)。
CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json", "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", "strip_outer_array" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );
說(shuō)明表里的分區(qū)字段
dt
在示例數(shù)據(jù)里并沒(méi)有,而是在Routine Load語(yǔ)句里通過(guò)dt=from_unixtime(timestamp,'%Y%m%d')
轉(zhuǎn)換出來(lái)的。
訪問(wèn)不同驗(yàn)證方式的Kafka集群
根據(jù)Kafka集群驗(yàn)證方式的不同,訪問(wèn)的方式示例如下。
訪問(wèn)SSL認(rèn)證的Kafka集群。
訪問(wèn)SSL認(rèn)證的Kafka集群需要您提供用于認(rèn)證Kafka Broker公鑰的證書(shū)文件(ca.pem)。如果Kafka集群同時(shí)開(kāi)啟了客戶端認(rèn)證,則還需提供客戶端的公鑰(client.pem)、密鑰文件(client.key),以及密鑰密碼。這里所需的文件需要先通過(guò)
CREATE FILE
命令上傳到SelectDB中,并且Catalog名稱(chēng)為kafka。上傳文件,示例如下。
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka"); CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
創(chuàng)建Routine Load作業(yè),示例如下。
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1" ) FROM KAFKA ( "kafka_broker_list"= "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg" );
說(shuō)明SelectDB通過(guò)Kafka的C++ API
librdkafka
來(lái)訪問(wèn)Kafka集群。librdkafka
所支持的參數(shù)可以參閱Configuration properties。
訪問(wèn)PLAIN認(rèn)證的Kafka集群。
訪問(wèn)開(kāi)啟PLAIN認(rèn)證的Kafka集群,需要增加配置如下。
property.security.protocol=SASL_PLAINTEXT:使用SASL plaintext。
property.sasl.mechanism=PLAIN:設(shè)置SASL的認(rèn)證方式為PLAIN。
property.sasl.username=admin:設(shè)置SASL的用戶名。
property.sasl.password=admin:設(shè)置SASL的密碼。
創(chuàng)建Routine Load作業(yè),示例如下。
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1", ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.security.protocol"="SASL_PLAINTEXT", "property.sasl.mechanism"="PLAIN", "property.sasl.username"="admin", "property.sasl.password"="admin" );
訪問(wèn)Kerberos認(rèn)證的Kafka集群。
訪問(wèn)開(kāi)啟kerberos認(rèn)證的Kafka集群,需要增加配置如下。
security.protocol=SASL_PLAINTEXT:使用SASL plaintext。
sasl.kerberos.service.name=$SERVICENAME:設(shè)置broker servicename。
sasl.kerberos.keytab=/etc/security/keytabs/${CLIENT_NAME}.keytab:設(shè)置Keytab本地文件路徑。
sasl.kerberos.principal=${CLIENT_NAME}/${CLIENT_HOST}:設(shè)置SelectDB連接Kafka時(shí)使用的Kerberos主體。
創(chuàng)建Routine Load作業(yè),示例如下。
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1", ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.security.protocol" = "SASL_PLAINTEXT", "property.sasl.kerberos.service.name" = "kafka", "property.sasl.kerberos.keytab" = "/etc/krb5.keytab", "property.sasl.kerberos.principal" = "id@your.com" );
說(shuō)明若要使SelectDB訪問(wèn)開(kāi)啟kerberos認(rèn)證方式的Kafka集群,需要在SelectDB集群所有運(yùn)行節(jié)點(diǎn)上部署Kerberos客戶端kinit,并配置krb5.conf,填寫(xiě)KDC服務(wù)信息等。
配置
property.sasl.kerberos.keytab
的值需要指定Keytab本地文件的絕對(duì)路徑,并允許SelectDB進(jìn)程訪問(wèn)該本地文件。
修改導(dǎo)入作業(yè)
修改已經(jīng)創(chuàng)建的例行導(dǎo)入作業(yè)。只能修改處于PAUSED
狀態(tài)的作業(yè)。
語(yǔ)法
ALTER ROUTINE LOAD FOR <job_name>
[job_properties]
FROM <data_source>
[data_source_properties]
參數(shù)說(shuō)明
參數(shù)名稱(chēng) | 參數(shù)說(shuō)明 |
[db.]job_name | 指定要修改的作業(yè)名稱(chēng)。 |
tbl_name | 指定需要導(dǎo)入的表的名稱(chēng)。 |
job_properties | 指定需要修改的作業(yè)參數(shù)。目前僅支持修改的參數(shù)如下。
|
data_source | 指定數(shù)據(jù)源的類(lèi)型。當(dāng)前支持: |
data_source_properties | 指定數(shù)據(jù)源的相關(guān)屬性。目前僅支持的屬性如下。
說(shuō)明
|
使用示例
將
desired_concurrent_number
修改為1,示例如下。ALTER ROUTINE LOAD FOR db1.label1 PROPERTIES ( "desired_concurrent_number" = "1" );
將
desired_concurrent_number
修改為10,修改partition的offset,修改group id,示例如下。ALTER ROUTINE LOAD FOR db1.label1 PROPERTIES ( "desired_concurrent_number" = "10" ) FROM kafka ( "kafka_partitions" = "0, 1, 2", "kafka_offsets" = "100, 200, 100", "property.group.id" = "new_group" );
暫停導(dǎo)入作業(yè)
暫停一個(gè)Routine Load作業(yè)。被暫停的作業(yè)可以通過(guò)RESUME
命令重新運(yùn)行。
語(yǔ)法
PAUSE [ALL] ROUTINE LOAD FOR <job_name>;
參數(shù)說(shuō)明
參數(shù)名稱(chēng) | 參數(shù)說(shuō)明 |
[db.]job_name | 指定要暫停的作業(yè)名稱(chēng)。 |
使用示例
暫停名稱(chēng)為test1的Routine Load作業(yè),示例如下。
PAUSE ROUTINE LOAD FOR test1;
暫停所有Routine Load作業(yè),示例如下。
PAUSE ALL ROUTINE LOAD;
恢復(fù)導(dǎo)入作業(yè)
恢復(fù)一個(gè)被暫停的Routine Load作業(yè)。恢復(fù)的作業(yè)將繼續(xù)從之前已消費(fèi)的offset繼續(xù)消費(fèi)。
語(yǔ)法
RESUME [ALL] ROUTINE LOAD FOR <job_name>
參數(shù)說(shuō)明
參數(shù)名稱(chēng) | 參數(shù)說(shuō)明 |
[db.]job_name | 指定要恢復(fù)的作業(yè)名稱(chēng)。 |
使用示例
恢復(fù)名稱(chēng)為test1的Routine Load作業(yè),示例如下。
RESUME ROUTINE LOAD FOR test1;
恢復(fù)所有Routine Load作業(yè),示例如下。
RESUME ALL ROUTINE LOAD;
停止導(dǎo)入作業(yè)
停止一個(gè)Routine Load作業(yè)。被停止的作業(yè)無(wú)法再重新運(yùn)行。停止導(dǎo)入后,已導(dǎo)入數(shù)據(jù)不會(huì)回滾。
語(yǔ)法
STOP ROUTINE LOAD FOR <job_name>;
參數(shù)說(shuō)明
參數(shù)名稱(chēng) | 參數(shù)說(shuō)明 |
[db.]job_name | 指定要停止的作業(yè)名稱(chēng)。 |
使用示例
停止名稱(chēng)為test1的Routine Load作業(yè),示例如下。
STOP ROUTINE LOAD FOR test1;
查看導(dǎo)入作業(yè)
Routine Load作業(yè)運(yùn)行狀態(tài)需要通過(guò)SHOW ROUTINE LOAD
命令查看。
語(yǔ)法
SHOW [ALL] ROUTINE LOAD [FOR job_name];
參數(shù)說(shuō)明
參數(shù)名稱(chēng) | 參數(shù)說(shuō)明 |
[db.]job_name | 指定要查看的作業(yè)名稱(chēng)。 |
如果導(dǎo)入數(shù)據(jù)格式不合法,詳細(xì)的錯(cuò)誤信息會(huì)記錄在ErrorLogUrls中。注意其中包含多個(gè)鏈接,拷貝其中任意一個(gè)鏈接在瀏覽器中查看即可。
使用示例
展示名稱(chēng)為test1的所有Routine Load作業(yè)(包括已停止或取消的作業(yè))。結(jié)果為一行或多行。
SHOW ALL ROUTINE LOAD FOR test1;
展示名稱(chēng)為test1的當(dāng)前正在運(yùn)行的Routine Load作業(yè)。
SHOW ROUTINE LOAD FOR test1;
顯示example_db下,所有的Routine Load作業(yè)(包括已停止或取消的作業(yè))。結(jié)果為一行或多行。
use example_db; SHOW ALL ROUTINE LOAD;
顯示example_db下,所有正在運(yùn)行的Routine Load作業(yè)。
use example_db; SHOW ROUTINE LOAD;
顯示example_db下,名稱(chēng)為test1的當(dāng)前正在運(yùn)行的Routine Load作業(yè)。
SHOW ROUTINE LOAD FOR example_db.test1;
顯示example_db下,名稱(chēng)為test1的所有Routine Load作業(yè)(包括已停止或取消的作業(yè))。結(jié)果為一行或多行。
SHOW ALL ROUTINE LOAD FOR example_db.test1;
相關(guān)系統(tǒng)配置?
相關(guān)系統(tǒng)配置參數(shù)會(huì)影響Routine Load的使用。
max_routine_load_task_concurrent_num
FE配置項(xiàng),默認(rèn)為5,可以運(yùn)行時(shí)修改。該參數(shù)限制了一個(gè)例行導(dǎo)入作業(yè)最大的子任務(wù)并發(fā)數(shù)。建議維持默認(rèn)值。設(shè)置過(guò)大,可能導(dǎo)致同時(shí)并發(fā)的任務(wù)數(shù)過(guò)多,占用集群資源。
max_routine_load_task_num_per_be
FE配置項(xiàng),默認(rèn)為5,可以運(yùn)行時(shí)修改。該參數(shù)限制了每個(gè)BE節(jié)點(diǎn)最多并發(fā)執(zhí)行的子任務(wù)個(gè)數(shù)。建議維持默認(rèn)值。如果設(shè)置過(guò)大,可能導(dǎo)致并發(fā)任務(wù)數(shù)過(guò)多,占用集群資源。
max_routine_load_job_num
FE配置項(xiàng),默認(rèn)為100,可以運(yùn)行時(shí)修改。該參數(shù)限制Routine Load作業(yè)的總數(shù),包括NEED_SCHEDULED,RUNNING,PAUSE這些狀態(tài)。超過(guò)后,不能再提交新的作業(yè)。
max_consumer_num_per_group
BE 配置項(xiàng),默認(rèn)為3。該參數(shù)表示一個(gè)子任務(wù)中最多生成幾個(gè)Consumer進(jìn)行數(shù)據(jù)消費(fèi)。對(duì)于Kafka數(shù)據(jù)源,一個(gè)Consumer可能消費(fèi)一個(gè)或多個(gè)Kafka Partition。假設(shè)一個(gè)任務(wù)需要消費(fèi)6個(gè)Kafka Partition,則會(huì)生成3個(gè)Consumer,每個(gè)Consumer消費(fèi)2個(gè)Partition。如果只有2個(gè)Partition,則只會(huì)生成2個(gè)Consumer,每個(gè)Consumer消費(fèi)1個(gè)Partition。
max_tolerable_backend_down_num
FE配置項(xiàng),默認(rèn)值是0。在滿足某些條件下,SelectDB可令PAUSED的任務(wù)重新調(diào)度,變成RUNNING。該參數(shù)為0代表只有所有BE節(jié)點(diǎn)是alive狀態(tài)才允許重新調(diào)度。
period_of_auto_resume_min
FE配置項(xiàng),默認(rèn)是5分鐘。該項(xiàng)意味著當(dāng)SelectDB重新調(diào)度任務(wù)時(shí),只會(huì)在5分鐘這個(gè)周期內(nèi)最多嘗試3次。如果3次都失敗則鎖定當(dāng)前任務(wù),后續(xù)不再進(jìn)行調(diào)度,但可通過(guò)人為干預(yù),進(jìn)行手動(dòng)恢復(fù)。
其他說(shuō)明?
Routine Load作業(yè)和ALTER TABLE操作的關(guān)系。
Routine Load不會(huì)阻塞Schema變更和ROLLUP操作。但Schema變更完成后,列映射關(guān)系無(wú)法匹配,則會(huì)導(dǎo)致作業(yè)的錯(cuò)誤數(shù)據(jù)激增,最終導(dǎo)致作業(yè)暫停。建議通過(guò)在Routine Load作業(yè)中顯式指定列映射關(guān)系,并且通過(guò)增加Nullable列或帶Default值的列來(lái)減少這些問(wèn)題。
刪除表的Partition可能會(huì)導(dǎo)致導(dǎo)入數(shù)據(jù)無(wú)法找到對(duì)應(yīng)的Partition,導(dǎo)致作業(yè)進(jìn)入暫停狀態(tài)。
Routine Load作業(yè)和其他導(dǎo)入作業(yè)的關(guān)系(LOAD,DELETE,INSERT)。
Routine Load和其他LOAD作業(yè)以及INSERT操作沒(méi)有沖突。
當(dāng)執(zhí)行DELETE操作時(shí),對(duì)應(yīng)表分區(qū)不能有任何正在執(zhí)行的導(dǎo)入任務(wù)。因此在執(zhí)行DELETE操作前,需要暫停Routine Load作業(yè),并等待已下發(fā)的任務(wù)全部完成后,方可以執(zhí)行DELETE。
Routine Load作業(yè)和DROP DATABASE或DROP TABLE操作的關(guān)系。
當(dāng)Routine Load對(duì)應(yīng)的database或table被刪除后,作業(yè)會(huì)自動(dòng)CANCEL。
Kafka類(lèi)型的Routine Load作業(yè)和Kafka topic的關(guān)系。
當(dāng)創(chuàng)建例行導(dǎo)入聲明的
kafka_topic
在Kafka集群中不存在時(shí):Kafka集群的Broker設(shè)置了
auto.create.topics.enable=true
,則topic會(huì)先被自動(dòng)創(chuàng)建,自動(dòng)創(chuàng)建的Partition個(gè)數(shù)是由您的Kafka集群中的Broker配置num.partitions決定的。例行作業(yè)會(huì)不斷讀取該topic的數(shù)據(jù)。Kafka集群的Broker設(shè)置了
auto.create.topics.enable=false
,則topic不會(huì)被自動(dòng)創(chuàng)建,例行作業(yè)會(huì)在沒(méi)有讀取任何數(shù)據(jù)之前就被暫停,狀態(tài)置為PAUSED。
所以,如果您希望當(dāng)Kafka topic不存在的時(shí)候,被例行作業(yè)自動(dòng)創(chuàng)建的話,只需要將Kafka集群中的Broker設(shè)置
auto.create.topics.enable=true
即可。當(dāng)環(huán)境中存在網(wǎng)段和域名解析的隔離措施,因此需要注意以下問(wèn)題。
創(chuàng)建Routine Load任務(wù)中指定的Broker list必須能夠被SelectDB服務(wù)訪問(wèn)。
Kafka中如果配置了
advertised.listeners
,advertised.listeners
中的地址必須能夠被SelectDB服務(wù)訪問(wèn)。
指定消費(fèi)的Partition和Offset。
SelectDB支持指定Partition,Offset和時(shí)間點(diǎn)進(jìn)行消費(fèi)。這里說(shuō)明下對(duì)應(yīng)參數(shù)的配置關(guān)系。
三個(gè)相關(guān)參數(shù)如下。
kafka_partitions
:指定待消費(fèi)的Partition列表,如:"0,1,2,3"。kafka_offsets
:指定每個(gè)分區(qū)的起始o(jì)ffset,必須和kafka_partitions
列表個(gè)數(shù)對(duì)應(yīng)。如:"1000,1000,2000,2000"property.kafka_default_offset
:指定分區(qū)默認(rèn)的起始Offset。
在創(chuàng)建導(dǎo)入作業(yè)時(shí),這三個(gè)參數(shù)可以有以下五種組合方式。
方式
kafka_partitions
kafka_offsets
property.kafka_default_offset
行為
1
No
No
No
系統(tǒng)會(huì)自動(dòng)查找Topic對(duì)應(yīng)的所有分區(qū)并從OFFSET_END開(kāi)始消費(fèi)。
2
No
No
Yes
系統(tǒng)會(huì)自動(dòng)查找Topic對(duì)應(yīng)的所有分區(qū)并從default offset指定的位置開(kāi)始消費(fèi)。
3
Yes
No
No
系統(tǒng)會(huì)從指定分區(qū)的OFFSET_END開(kāi)始消費(fèi)。
4
Yes
Yes
No
系統(tǒng)會(huì)從指定分區(qū)的指定Offset處開(kāi)始消費(fèi)。
5
Yes
No
Yes
系統(tǒng)會(huì)從指定分區(qū),default Offset指定的位置開(kāi)始消費(fèi)。
STOP和PAUSE的區(qū)別。
FE會(huì)自動(dòng)定期清理STOP狀態(tài)的Routine Load,而PAUSE狀態(tài)的則可以再次被恢復(fù)啟用。