Broker Load
在Broker Load模式下,通過部署的Broker程序,StarRocks可讀取對應(yīng)數(shù)據(jù)源(例如,Apache HDFS,阿里云OSS)上的數(shù)據(jù),利用自身的計算資源對數(shù)據(jù)進行預(yù)處理和導(dǎo)入。本文為您介紹Broker Load導(dǎo)入的使用示例以及常見問題。
背景信息
Broker Load是一種異步的導(dǎo)入方式。StarRocks支持從外部存儲系統(tǒng)導(dǎo)入數(shù)據(jù),支持CSV、ORCFile和Parquet等文件格式,建議單次導(dǎo)入數(shù)據(jù)量在幾十GB到上百GB級別。
Broker Load導(dǎo)入
查看Broker
阿里云EMR StarRocks實例在創(chuàng)建時已經(jīng)自動搭建并啟動Broker服務(wù),Broker服務(wù)位于每個Core節(jié)點上。使用以下SQL命令可以查看Broker。
SHOW PROC "/brokers";
創(chuàng)建導(dǎo)入任務(wù)
語法
LOAD LABEL [<database_name>.]<label_name> ( data_desc[, data_desc ...] ) WITH BROKER [broker_properties] [opt_properties]
參數(shù)描述
Label
導(dǎo)入任務(wù)的標(biāo)識。每個導(dǎo)入任務(wù)都有一個唯一的Label,由系統(tǒng)自動生成或在導(dǎo)入命令中自定義。Label可以用來防止導(dǎo)入相同的數(shù)據(jù),并查看對應(yīng)導(dǎo)入任務(wù)的執(zhí)行情況。當(dāng)Label對應(yīng)的導(dǎo)入任務(wù)狀態(tài)為FINISHED時,對應(yīng)的Label無法再次使用;當(dāng)狀態(tài)為CANCELLED時,可以再次使用該Label提交導(dǎo)入作業(yè)。
數(shù)據(jù)描述類data_desc
data_desc: DATA INFILE ('file_path', ...) [NEGATIVE] INTO TABLE tbl_name [PARTITION (p1, p2)] [COLUMNS TERMINATED BY column_separator ] [FORMAT AS file_type] [(col1, ...)] [COLUMNS FROM PATH AS (colx, ...)] [SET (k1=f1(xx), k2=f2(xx))] [WHERE predicate]
相關(guān)參數(shù)描述如下表所示。
參數(shù)
描述
file_path
文件路徑可以指定到文件,也可以用星號(*)通配符指定某個目錄下的所有文件。中間的目錄也可以使用通配符匹配。
可以使用的通配符有? * [] {} ^,使用規(guī)則請參見FileSystem。
例如, 通過oss://bucket/data/tablename , 可以匹配tablename下所有分區(qū)內(nèi)的所有文件。通過oss://bucket/data/tablename/dt=202301, 可以匹配tablename下1月分區(qū)的所有文件。
negative
設(shè)置數(shù)據(jù)取反導(dǎo)入。
該功能適用的場景是當(dāng)數(shù)據(jù)表中聚合列的類型均為SUM類型時,如果希望撤銷某一批導(dǎo)入的數(shù)據(jù),可以通過negative參數(shù)導(dǎo)入同一批數(shù)據(jù),StarRocks會自動為這批數(shù)據(jù)在聚合列上數(shù)據(jù)取反,以達(dá)到消除同一批數(shù)據(jù)的功能。
partition
指定待導(dǎo)入表的Partition信息。
如果待導(dǎo)入數(shù)據(jù)不屬于指定的Partition,則不會被導(dǎo)入。同時,不指定Partition中的數(shù)據(jù)會被認(rèn)為是“錯誤數(shù)據(jù)”。對于不想導(dǎo)入,也不想記錄為“錯誤數(shù)據(jù)”的數(shù)據(jù),可以使用where predicate來過濾。
column_separator
COLUMNS TERMINATED BY column_separator
,用于指定導(dǎo)入文件中的列分隔符,默認(rèn)為\t。如果是不可見字符,則需要加\x作為前綴,使用十六進制來表示分隔符。例如,Hive文件的分隔符為\x01,則列分隔符為\\x01。
file_type
FORMAT AS file_type
,用于指定導(dǎo)入文件的類型。例如,parquet、orc、csv,默認(rèn)值為csv。parquet類型也可以通過文件后綴名.parquet或者.parq判斷。
COLUMNS FROM PATH AS
提取文件路徑中的分區(qū)字段。
例如,導(dǎo)入文件為/path/col_name=col_value/dt=20210101/file1,其中col_name/dt為表中的列,則將col_value、20210101分別導(dǎo)入到col_name和dt對應(yīng)的列的代碼示例如下。
(col1, col2) COLUMNS FROM PATH AS (col_name, dt)
set column mapping
SET (k1=f1(xx), k2=f2(xx))
,data_desc中的SET語句負(fù)責(zé)設(shè)置列函數(shù)變換。如果原始數(shù)據(jù)的列和表中的列不一一對應(yīng),則需要使用該屬性。
where predicate
WHERE predicate
,data_desc中的WHERE語句負(fù)責(zé)過濾已經(jīng)完成transform的數(shù)據(jù)。被過濾的數(shù)據(jù)不會進入容忍率的統(tǒng)計中。如果多個data_desc中聲明了關(guān)于同一張表的多個條件,則會以AND語義合并這些條件。
導(dǎo)入作業(yè)參數(shù)
導(dǎo)入作業(yè)參數(shù)是指Broker Load創(chuàng)建導(dǎo)入語句中屬于broker_properties部分的參數(shù)。導(dǎo)入作業(yè)參數(shù)是作用于整個導(dǎo)入作業(yè)的。
broker_properties: (key2=value2, ...)
部分參數(shù)描述如下表所示。
參數(shù)
描述
timeout
導(dǎo)入作業(yè)的超時時間(以秒為單位)。
您可以在opt_properties中自行設(shè)置每個導(dǎo)入的超時時間。導(dǎo)入任務(wù)在設(shè)定的時限內(nèi)未完成則會被系統(tǒng)取消,變?yōu)镃ANCELLED。Broker Load的默認(rèn)導(dǎo)入超時時間為4小時。
重要通常情況下,不需要您手動設(shè)置導(dǎo)入任務(wù)的超時時間。當(dāng)在默認(rèn)超時時間內(nèi)無法完成導(dǎo)入時,可以手動設(shè)置任務(wù)的超時時間。
推薦超時時間的計算方式為:
超時時間 >((總文件大小 (MB)* 待導(dǎo)入的表及相關(guān)Roll up表的個數(shù)) / (30 * 導(dǎo)入并發(fā)數(shù)))
公式中的30為目前BE導(dǎo)入的平均速度,表示30 MB/s。例如,如果待導(dǎo)入數(shù)據(jù)文件為1 GB,待導(dǎo)入表包含2個Rollup表,當(dāng)前的導(dǎo)入并發(fā)數(shù)為3,則timeout的最小值為 (1 * 1024 * 3 ) / (10 * 3) = 102 秒。
由于每個StarRocks集群的機器環(huán)境不同且集群并發(fā)的查詢?nèi)蝿?wù)也不同,所以StarRocks集群的最慢導(dǎo)入速度需要您根據(jù)歷史的導(dǎo)入任務(wù)速度進行推測。
max_filter_ratio
導(dǎo)入任務(wù)的最大容忍率,默認(rèn)為0容忍,取值范圍是0~1。當(dāng)導(dǎo)入的錯誤率超過該值,則導(dǎo)入失敗。如果您希望忽略錯誤的行,可以設(shè)置該參數(shù)值大于0,來保證導(dǎo)入可以成功。
計算公式為:
max_filter_ratio = (dpp.abnorm.ALL / (dpp.abnorm.ALL + dpp.norm.ALL ) )
其中,
dpp.abnorm.ALL
表示數(shù)據(jù)質(zhì)量不合格的行數(shù),例如類型不匹配、列數(shù)不匹配和長度不匹配等。dpp.abnorm.ALL
指的是導(dǎo)入過程中正確數(shù)據(jù)的條數(shù),可以通過SHOW LOAD
命令查詢導(dǎo)入任務(wù)的正確數(shù)據(jù)量。原始文件的行數(shù) = dpp.abnorm.ALL + dpp.norm.ALL
load_mem_limit
導(dǎo)入內(nèi)存限制。默認(rèn)值為0,表示不限制。
strict_mode
Broker Load導(dǎo)入可以開啟Strict Mode模式。開啟方式為
properties ("strict_mode" = "true")
。默認(rèn)關(guān)閉。
Strict Mode模式是對于導(dǎo)入過程中的列類型轉(zhuǎn)換進行嚴(yán)格過濾。嚴(yán)格過濾的策略為,對于列類型轉(zhuǎn)換,如果Strict Mode為true,則錯誤的數(shù)據(jù)將被過濾掉。錯誤數(shù)據(jù)是指原始數(shù)據(jù)并不為空值,在參與列類型轉(zhuǎn)換后結(jié)果為空值的數(shù)據(jù)。但以下場景除外:
對于導(dǎo)入的某列由函數(shù)變換生成時,Strict Mode對其不產(chǎn)生影響。
對于導(dǎo)入的某列類型包含范圍限制的,如果原始數(shù)據(jù)能正常通過類型轉(zhuǎn)換,但無法通過范圍限制的,Strict Mode對其也不產(chǎn)生影響。例如,如果類型是decimal(1,0),原始數(shù)據(jù)為10,則屬于可以通過類型轉(zhuǎn)換但不在列聲明的范圍內(nèi),Strict Mode對其不產(chǎn)生影響。
查看導(dǎo)入任務(wù)狀態(tài)
SHOW LOAD;
返回參數(shù)的描述如下表所示。
參數(shù) | 描述 |
JobId | 導(dǎo)入任務(wù)的唯一ID,每個導(dǎo)入任務(wù)的JobId都不同,由系統(tǒng)自動生成。與Label不同的是,JobId永遠(yuǎn)不會相同,而Label則可以在導(dǎo)入任務(wù)失敗后被復(fù)用。 |
Label | 導(dǎo)入任務(wù)的標(biāo)識。 |
State | 導(dǎo)入任務(wù)當(dāng)前所處的階段。
|
Progress | 導(dǎo)入任務(wù)的進度描述。分為ETL和LOAD兩種進度,分別對應(yīng)導(dǎo)入流程的ETL和LOADING兩個階段。目前Broker Load只有LOADING階段,所以ETL固定顯示為N/A,而LOAD的進度范圍為0~100%。 LOAD的進度的計算公式為 如果所有導(dǎo)入表均完成導(dǎo)入,此時LOAD的進度為99%, 導(dǎo)入進入到最后生效階段,待整個導(dǎo)入任務(wù)完成后,LOAD的進度才會改為100%。 重要 導(dǎo)入進度并不是線性的,所以如果一段時間內(nèi)進度沒有變化,并不代表導(dǎo)入任務(wù)沒有執(zhí)行。 |
Type | 導(dǎo)入任務(wù)的類型。Broker Load的Type取值是BROKER。 |
EtlInfo | 主要顯示導(dǎo)入的數(shù)據(jù)量指標(biāo)unselected.rows,dpp.norm.ALL和dpp.abnorm.ALL。 您可以根據(jù)unselected.rows的參數(shù)值判斷where條件過濾了多少行,根據(jù)dpp.norm.ALL和dpp.abnorm.ALL兩個指標(biāo)可以驗證當(dāng)前導(dǎo)入任務(wù)的錯誤率是否超過max-filter-ratio。三個指標(biāo)之和就是原始數(shù)據(jù)量的總行數(shù)。 |
TaskInfo | 主要顯示當(dāng)前導(dǎo)入任務(wù)參數(shù),即創(chuàng)建Broker Load導(dǎo)入任務(wù)時您指定的參數(shù),包括cluster,timeout和max-filter-ratio。 |
ErrorMsg | 如果導(dǎo)入任務(wù)狀態(tài)為CANCELLED,則顯示失敗的原因,包括type和msg兩部分。如果導(dǎo)入任務(wù)成功則顯示N/A。type的取值意義如下:
|
CreateTime | 分別表示導(dǎo)入創(chuàng)建的時間、ETL階段開始的時間、ETL階段完成的時間、LOADING階段開始的時間和整個導(dǎo)入任務(wù)完成的時間。
|
EtlStartTime | |
EtlFinishTime | |
LoadStartTime | |
LoadFinishTime | |
URL | 導(dǎo)入任務(wù)的錯誤數(shù)據(jù)樣例,訪問URL地址即可獲取本次導(dǎo)入的錯誤數(shù)據(jù)樣例。當(dāng)本次導(dǎo)入不存在錯誤數(shù)據(jù)時,URL字段為N/A。 |
JobDetails | 顯示作業(yè)的詳細(xì)運行狀態(tài)。包括導(dǎo)入文件的個數(shù)、總大小(字節(jié))、子任務(wù)個數(shù)、已處理的原始行數(shù),運行子任務(wù)的BE節(jié)點ID,以及未完成的BE節(jié)點ID。
其中已處理的原始行數(shù),每5秒更新一次。該行數(shù)僅用于展示當(dāng)前的進度,不代表最終實際的處理行數(shù)。實際處理行數(shù)以EtlInfo中顯示的數(shù)據(jù)為準(zhǔn)。 |
取消導(dǎo)入任務(wù)
當(dāng)Broker Load作業(yè)狀態(tài)不為CANCELLED或FINISHED時,可以手動取消。取消時需要指定待取消導(dǎo)入任務(wù)的Label。
導(dǎo)入任務(wù)并發(fā)度
一個作業(yè)可以拆成一個或者多個任務(wù),任務(wù)之間并行執(zhí)行。拆分由LOAD語句中的DataDescription來決定。例如:
多個DataDescription對應(yīng)導(dǎo)入多個不同的表,每個會拆成一個任務(wù)。
多個DataDescription對應(yīng)導(dǎo)入同一個表的不同分區(qū),每個也會拆成一個任務(wù)。
每個任務(wù)還會拆分成一個或者多個實例,然后將這些實例平均分配到BE上并行執(zhí)行。實例的拆分由以下FE配置決定:
min_bytes_per_broker_scanner:單個實例處理的最小數(shù)據(jù)量,默認(rèn)值為64 MB。
max_broker_concurrency:單個任務(wù)最大并發(fā)實例數(shù),默認(rèn)值為100。
load_parallel_instance_num:單個BE上并發(fā)實例數(shù),默認(rèn)值為1個。
實例總數(shù)的計算公式為實例的總數(shù) = min(導(dǎo)入文件總大小/單個實例處理的最小數(shù)據(jù)量,單個任務(wù)最大并發(fā)實例數(shù),單個BE上并發(fā)實例數(shù) * BE數(shù))
。
通常情況下,一個作業(yè)只有一個DataDescription,只會拆分成一個任務(wù)。任務(wù)會拆成與BE數(shù)相等的實例,然后分配到所有BE上并行執(zhí)行。
導(dǎo)入示例
以下示例中的參數(shù)請根據(jù)實際情況替換。
阿里云OSS數(shù)據(jù)導(dǎo)入
創(chuàng)建測試表。
create database if not exists load_test; use load_test; create table if not exists customer( c_customer_sk bigint, c_customer_id char(16), c_current_cdemo_sk bigint, c_current_hdemo_sk bigint, c_current_addr_sk bigint, c_first_shipto_date_sk bigint, c_first_sales_date_sk bigint, c_salutation char(10), c_first_name char(20), c_last_name char(30), c_preferred_cust_flag char(1), c_birth_day int, c_birth_month int, c_birth_year int, c_birth_country varchar(20), c_login char(13), c_email_address char(50), c_last_review_date_sk bigint ) duplicate key (c_customer_sk) distributed by hash(c_customer_sk) buckets 5 properties( "replication_num"="1" );
創(chuàng)建導(dǎo)入任務(wù)。
下載customer.orc。
LOAD LABEL load_test.customer_label ( DATA INFILE("oss://{bucket_name}/data/customer.orc") INTO TABLE customer format as "orc" ) WITH BROKER 'broker' ( "fs.oss.accessKeyId" = "xxxxx", "fs.oss.accessKeySecret" = "xxxxx", "fs.oss.endpoint" = "oss-cn-xxx-internal.aliyuncs.com" );
查看導(dǎo)入任務(wù)狀態(tài)。
show load where label='customer_label';
查詢表信息。
示例1
select count(1) from customer;
返回信息如下。
+----------+ | count(1) | +----------+ | 6000000 | +----------+ 1 row in set (0.10 sec)
示例2
select * from customer limit 2;
返回信息如下。
+---------------+------------------+--------------------+--------------------+-------------------+------------------------+-----------------------+--------------+--------------+-------------+-----------------------+-------------+---------------+--------------+-----------------+---------+------------------------+-----------------------+ | c_customer_sk | c_customer_id | c_current_cdemo_sk | c_current_hdemo_sk | c_current_addr_sk | c_first_shipto_date_sk | c_first_sales_date_sk | c_salutation | c_first_name | c_last_name | c_preferred_cust_flag | c_birth_day | c_birth_month | c_birth_year | c_birth_country | c_login | c_email_address | c_last_review_date_sk | +---------------+------------------+--------------------+--------------------+-------------------+------------------------+-----------------------+--------------+--------------+-------------+-----------------------+-------------+---------------+--------------+-----------------+---------+------------------------+-----------------------+ | 2 | AAAAAAAACAAAAAAA | 819667 | 1461 | 681655 | 2452318 | 2452288 | Dr. | Amy | Moses | Y | 9 | 4 | 1966 | TOGO | NULL | Amy.M****@Ovk9KjHH.com | 2452318 | | 2 | AAAAAAAACAAAAAAA | 819667 | 1461 | 681655 | 2452318 | 2452288 | Dr. | Amy | Moses | Y | 9 | 4 | 1966 | TOGO | NULL | Amy.M****@Ovk9KjHH.com | 2452318 | +---------------+------------------+--------------------+--------------------+-------------------+------------------------+-----------------------+--------------+--------------+-------------+-----------------------+-------------+---------------+--------------+-----------------+---------+------------------------+-----------------------+
HDFS導(dǎo)入
HDFS導(dǎo)入語法示例
LOAD LABEL db1.label1 ( DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file1") INTO TABLE tbl1 COLUMNS TERMINATED BY "," (tmp_c1, tmp_c2) SET ( id=tmp_c2, name=tmp_c1 ), DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/hive/test.db/ml/file2") INTO TABLE tbl2 COLUMNS TERMINATED BY "," (col1, col2) where col1 > 1 ) WITH BROKER 'broker' ( "username" = "hdfs_username", "password" = "hdfs_password" ) PROPERTIES ( "timeout" = "3600" );
說明如果您的集群類型是新版數(shù)據(jù)湖(DataLake)、實時數(shù)據(jù)流(DataFlow)、數(shù)據(jù)分析(OLAP)、數(shù)據(jù)服務(wù)(DataServing)和自定義場景的集群, 請?zhí)鎿Qemr-header-1為master-1-1。
HDFS認(rèn)證
社區(qū)版本的HDFS支持簡單認(rèn)證和Kerberos認(rèn)證兩種認(rèn)證方式。
簡單認(rèn)證(Simple):用戶的身份由與HDFS建立鏈接的客戶端操作系統(tǒng)決定。涉及參數(shù)如下表。
參數(shù)
描述
hadoop.security.authentication
認(rèn)證方式。默認(rèn)值為simple。
username
HDFS的用戶名。
password
HDFS的密碼。
Kerberos認(rèn)證:客戶端的身份由用戶自己的Kerberos證書決定。
涉及參數(shù)如下表。
參數(shù)
描述
hadoop.security.authentication
認(rèn)證方式。默認(rèn)值為kerberos。
kerberos_principal
指定Kerberos的Principal。
kerberos_keytab
指定Kerberos的keytab文件路徑。該文件必須為Broker進程所在服務(wù)器上的文件。
kerberos_keytab_content
指定Kerberos中keytab文件內(nèi)容經(jīng)過Base64編碼之后的內(nèi)容。
重要該參數(shù)和kerberos_keytab參數(shù)只需配置一個。
HDFS HA配置
通過配置NameNode HA,可以在NameNode切換時,自動識別到新的NameNode。配置以下參數(shù)用于訪問以HA模式部署的HDFS集群。
參數(shù)
描述
dfs.nameservices
指定HDFS服務(wù)的名稱,您可以自定義。
例如,設(shè)置dfs.nameservices為my_ha。
dfs.ha.namenodes.xxx
自定義NameNode的名稱,多個名稱時以逗號(,)分隔。其中xxx為dfs.nameservices中自定義的名稱。
例如,設(shè)置dfs.ha.namenodes.my_ha為my_nn。
dfs.namenode.rpc-address.xxx.nn
指定NameNode的RPC地址信息。其中nn表示dfs.ha.namenodes.xxx中配置的NameNode的名稱。
例如,設(shè)置dfs.namenode.rpc-address.my_ha.my_nn參數(shù)值的格式為host:port。
dfs.client.failover.proxy.provider
指定Client連接NameNode的Provider,默認(rèn)值為org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider。