日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Broker Load

本文介紹如何通過Broker Load導(dǎo)入數(shù)據(jù)至云數(shù)據(jù)庫 SelectDB 版實(shí)例。

背景信息

Broker Load是一種異步的導(dǎo)入方式,通過讀取遠(yuǎn)端存儲(如HDFS、S3)上的數(shù)據(jù),導(dǎo)入數(shù)據(jù)到云數(shù)據(jù)庫 SelectDB 版的表中。您可通過MySQL協(xié)議創(chuàng)建Broker Load導(dǎo)入,并通過SHOW LOAD命令檢查導(dǎo)入結(jié)果。單次導(dǎo)入數(shù)據(jù)量最多可支持百GB級別。

創(chuàng)建導(dǎo)入

該方式用于通過Broker導(dǎo)入,讀取遠(yuǎn)端存儲(如HDFS、S3)上的數(shù)據(jù)導(dǎo)入到云數(shù)據(jù)庫 SelectDB 版的表中。

語法

LOAD LABEL load_label
(
data_desc1[, data_desc2, ...]
)
WITH broker_type
[broker_properties]
[load_properties];

參數(shù)說明

參數(shù)名稱

參數(shù)說明

load_label

導(dǎo)入任務(wù)的唯一標(biāo)識。Label是在導(dǎo)入命令中自定義的名稱。通過Label,可以查看對應(yīng)導(dǎo)入任務(wù)的執(zhí)行情況。Label也可用于防止重復(fù)導(dǎo)入相同的數(shù)據(jù),當(dāng)Label對應(yīng)的導(dǎo)入作業(yè)狀態(tài)為CANCELLED時,該Label可以再次被使用。

格式:[database.]label_name

說明

推薦同一批次數(shù)據(jù)使用相同的Label。這樣同一批次數(shù)據(jù)的重復(fù)請求只會被接受一次,保證了At-Most-Once。

data_desc1

描述一組需要導(dǎo)入的文件。詳細(xì)參數(shù)說明,請參見data_desc1參數(shù)說明

WITH broker_type

指定需要使用的Broker類型,支持HDFS、S3兩種。S3類型的Broker Load也稱為OSS Load,詳情請參見OSS Load

broker_properties

指定Broker所需的參數(shù)讓Broker能夠訪問遠(yuǎn)端存儲系統(tǒng)。例如:BOS或HDFS。

語法如下:

( "key1" = "val1", "key2" = "val2", ...)

load_properties

指定導(dǎo)入的相關(guān)參數(shù)。詳細(xì)參數(shù)說明,請參見load_properties參數(shù)說明

data_desc1參數(shù)說明

[MERGE|APPEND|DELETE]
DATA INFILE
(
"file_path1"[, file_path2, ...]
)
[NEGATIVE]
INTO TABLE `table_name`
[PARTITION (p1, p2, ...)]
[COLUMNS TERMINATED BY "column_separator"]
[FORMAT AS "file_type"]
[(column_list)]
[COLUMNS FROM PATH AS (c1, c2, ...)]
[PRECEDING FILTER predicate]
[SET (column_mapping)]
[WHERE predicate]
[DELETE ON expr]
[ORDER BY source_sequence]
[PROPERTIES ("key1"="value1", ...)]

參數(shù)名稱

參數(shù)說明

[MERGE|APPEND|DELETE]

指定數(shù)據(jù)合并類型。默認(rèn)為APPEND。默認(rèn)值表示本次導(dǎo)入是普通的追加寫操作。MERGE和DELETE類型僅適用于Unique Key模型表。其中MERGE類型需要配合[DELETE ON]語句使用,以標(biāo)注Delete Flag列。而DELETE類型則表示本次導(dǎo)入的所有數(shù)據(jù)皆為刪除數(shù)據(jù)。

DATA INFILE

指定需要導(dǎo)入的文件路徑。需要導(dǎo)入的文件路徑可以是多個,可以使用通配符。路徑最終必須匹配到文件,如果只匹配到目錄則導(dǎo)入失敗。

NEGATIVE

表示本次導(dǎo)入為一批“負(fù)”導(dǎo)入。這種方式僅針對具有整型SUM聚合類型的聚合數(shù)據(jù)表。該方式會將導(dǎo)入數(shù)據(jù)中SUM聚合列對應(yīng)的整型數(shù)值取反。用于沖抵之前導(dǎo)入錯誤的數(shù)據(jù)。

PARTITION(p1, p2, ...)

指定僅導(dǎo)入表的某些分區(qū),不在分區(qū)范圍內(nèi)的數(shù)據(jù)將被忽略。

COLUMNS TERMINATED BY

指定列分隔符,僅在CSV格式下有效,僅能指定單字節(jié)分隔符。

FORMAT AS

指定文件類型,默認(rèn)為CSV。支持CSV、PARQUET和ORC格式。

column list

指定原始文件中的列順序。

COLUMNS FROM PATH AS

指定從導(dǎo)入的文件中抽取的列。

PRECEDING FILTER predicate

指定前置過濾條件。數(shù)據(jù)首先根據(jù)column listCOLUMNS FROM PATH AS按順序拼接成原始數(shù)據(jù)行。然后按照前置過濾條件進(jìn)行過濾。

SET (column_mapping)

指定列的轉(zhuǎn)換函數(shù)。

WHERE predicate

指定數(shù)據(jù)的過濾條件。

DELETE ON expr

需配合MERGE導(dǎo)入模式一起使用,僅針對Unique Key模型的表。用于指定導(dǎo)入數(shù)據(jù)中表示Delete Flag的列和計算關(guān)系。

ORDER BY

僅針對使用Unique Key模型的表。用于指定導(dǎo)入數(shù)據(jù)中表示Sequence Col的列。主要用于導(dǎo)入時保證數(shù)據(jù)順序。

PROPERTIES ("key1"="value1", ...)

指定導(dǎo)入的format的一些參數(shù)。如導(dǎo)入的文件是JSON格式,則可以在這里指定json_root、jsonpaths、fuzzy_parse等參數(shù)。

load_properties參數(shù)說明

參數(shù)名稱

參數(shù)說明

timeout

導(dǎo)入超時時間,單位為秒,默認(rèn)為14400,即4小時。

max_filter_ratio

最大容忍可過濾比率(數(shù)據(jù)不規(guī)范等原因)。默認(rèn)0,即零容忍。取值范圍為0~1。

exec_mem_limit

導(dǎo)入內(nèi)存限制,單位為字節(jié),默認(rèn)為2147483648,即2 GB。

strict_mode

設(shè)置導(dǎo)入任務(wù)是否開啟嚴(yán)格模式,默認(rèn)為false。

timezone

指定某些受時區(qū)影響的函數(shù)的時區(qū),默認(rèn)為Asia/Shanghai時區(qū)。支持strftime、alignment_timestamp和from_unixtime等。

load_parallelism

導(dǎo)入并發(fā)度,默認(rèn)為1。調(diào)大導(dǎo)入并發(fā)度會啟動多個執(zhí)行計劃同時執(zhí)行導(dǎo)入任務(wù),加快導(dǎo)入速度。

send_batch_parallelism

用于設(shè)置發(fā)送批處理數(shù)據(jù)的并行度,如果并行度的值超過計算集群BE配置中的max_send_batch_parallelism_per_job,那么計算集群將使用max_send_batch_parallelism_per_job的值。

load_to_single_tablet

是否只導(dǎo)入數(shù)據(jù)到對應(yīng)分區(qū)的一個tablet,默認(rèn)值為false。該參數(shù)只允許在對帶有random分桶的Duplicate表導(dǎo)入數(shù)據(jù)的時候設(shè)置。

使用示例

  1. 創(chuàng)建待導(dǎo)入的SelectDB數(shù)據(jù)表,示例如下。

    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50)
    )
    UNIQUE KEY(`id`)
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
    
    CREATE TABLE test_table2
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50)
    )
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
  2. 創(chuàng)建待導(dǎo)入的文件。

    • 文件file1.txt,文件內(nèi)容如下。

      1,tomori,32,shanghai
      2,anon,22,beijing
      3,taki,23,shenzhen
      4,rana,45,hangzhou
      5,soyo,14,shanghai
      6,saki,25,hangzhou
      7,mutsumi,45,shanghai
      8,uika,26,shanghai
      9,umiri,27,shenzhen
      10,nyamu,37,shanghai
    • 文件file2.csv,文件內(nèi)容如下。

      1,saki,25,hangzhou
      2,mutsumi,45,shanghai
      3,uika,26,shanghai
      4,umiri,27,shenzhen
      5,nyamu,37,shanghai
  3. 將文件數(shù)據(jù)導(dǎo)入到表中。

    • 從HDFS導(dǎo)入文本文件file1.txt,示例如下。

      LOAD LABEL example_db.label1
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `my_table`
          COLUMNS TERMINATED BY ","
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );

      導(dǎo)入文件file1.txt,按逗號分隔,導(dǎo)入到表test_table。當(dāng)從HDFS導(dǎo)入數(shù)據(jù)時,broker_properties中需要指定fs.defaultFS屬性,以確保可以正確的連接到HDFS集群并找到相應(yīng)的數(shù)據(jù)文件。

    • 從HDFS導(dǎo)入數(shù)據(jù),同時導(dǎo)入兩個文件到兩個表中,示例如下。

      LOAD LABEL test_db.test_02
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file2.csv")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY ","
          (id,name,temp_age,address)    
          SET (
              age = temp_age + 1
          ),
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `test_table2`
          COLUMNS TERMINATED BY ","
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );

      導(dǎo)入兩個文件file1.txtfile2.csv,分別導(dǎo)入到test_tabletest_table2兩張表中,并且將file2.csv中age的值加1后導(dǎo)入。

    • 從HA模式部署的HDFS集群中,導(dǎo)入一批數(shù)據(jù),示例如下。

      LOAD LABEL test_db.test_03
      (
          DATA INFILE("hdfs://hdfs_host:hdfs_port/example/*")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY "\\x01"
      )
      WITH HDFS
      (
          "hadoop.username" = "hive",
          "fs.defaultFS" = "hdfs://my_ha",
          "dfs.nameservices" = "my_ha",
          "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2",
          "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port",
          "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port",
          "dfs.client.failover.proxy.provider.my_ha" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
      );

      指定分隔符為Hive的默認(rèn)分隔符\\x01,并使用通配符*指定data目錄下所有目錄的所有文件。

    • 對導(dǎo)入數(shù)據(jù)file1.txt進(jìn)行過濾處理,符合條件的數(shù)據(jù)才可導(dǎo)入,示例如下。

      LOAD LABEL test_db.test_04
      (
          DATA INFILE("hdfs://host:port/example/file1.txt")
          INTO TABLE `test_table2`
          COLUMNS TERMINATED BY ","
          (id,name,age,address)   
          WHERE age < 20
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      );
      

      只有原始數(shù)據(jù)中age < 20的行才會被導(dǎo)入。

    • 從HDFS導(dǎo)入一批數(shù)據(jù)file1.txt,指定超時時間和過濾比例,并且將原有數(shù)據(jù)中與導(dǎo)入數(shù)據(jù)中age<20的列相匹配的行刪除,其他行正常導(dǎo)入,示例如下。

      LOAD LABEL test_db.test_05
      (
          MERGE DATA INFILE("hdfs://hdfs_host:hdfs_port/example/file1.txt")
          INTO TABLE `test_table`
          COLUMNS TERMINATED BY ","
          (id,name,age,address)   
          DELETE ON age < 20
      )
      WITH HDFS
      (
          "fs.defaultFS" = "hdfs://hdfs_host:hdfs_port"
      )
      PROPERTIES
      (
          "timeout" = "3600",
          "max_filter_ratio" = "0.1"
      );

      使用MERGE方式導(dǎo)入。test_table必須是一張Unique Key的表。當(dāng)導(dǎo)入數(shù)據(jù)中的age列的值小于20時,該行會被認(rèn)為是一個刪除行。導(dǎo)入任務(wù)的超時時間是3600秒,并且允許錯誤率在10%以內(nèi)。

取消導(dǎo)入

當(dāng)Broker Load作業(yè)狀態(tài)不為CANCELLED或FINISHED時,可以手動取消導(dǎo)入。取消時需要指定待取消導(dǎo)入任務(wù)的Label。導(dǎo)入任務(wù)取消后,已寫入的數(shù)據(jù)也會回滾,不會生效。

語法

CANCEL LOAD
[FROM db_name]
WHERE [LABEL = "load_label" | LABEL like "label_pattern"];

參數(shù)說明

參數(shù)名稱

參數(shù)說明

db_name

數(shù)據(jù)庫名稱。不指定的時使用當(dāng)前默認(rèn)數(shù)據(jù)庫。

load_label

導(dǎo)入任務(wù)的Label名稱,精確匹配。如果使用LABEL LIKE,則會匹配導(dǎo)入任務(wù)的Label包含label_pattern的導(dǎo)入任務(wù)。

使用示例

  • 撤銷數(shù)據(jù)庫example_db上,Label為example_db_test_load_label的導(dǎo)入作業(yè)。

    CANCEL LOAD
    FROM example_db
    WHERE LABEL = "example_db_test_load_label";
  • 撤銷數(shù)據(jù)庫example_db上,所有包含example_的導(dǎo)入作業(yè)。

    CANCEL LOAD
    FROM example_db
    WHERE LABEL like "example_";

查看導(dǎo)入

Broker Load是一個異步導(dǎo)入過程,語句執(zhí)行成功僅代表導(dǎo)入任務(wù)提交成功,并不代表數(shù)據(jù)導(dǎo)入成功。導(dǎo)入狀態(tài)需要通過SHOW LOAD命令查看。

語法

SHOW LOAD
[FROM db_name]
[
   WHERE
   [LABEL [ = "your_label" | LIKE "label_matcher"]]
   [STATE = ["PENDING"|"ETL"|"LOADING"|"FINISHED"|"CANCELLED"|]]
]
[ORDER BY ...]
[LIMIT limit][OFFSET offset];

參數(shù)說明

參數(shù)名稱

參數(shù)說明

db_name

數(shù)據(jù)庫名稱。不指定的場合使用當(dāng)前默認(rèn)數(shù)據(jù)庫。

your_label

導(dǎo)入任務(wù)的Label名稱,精確匹配。如果使用LABEL LIKE,則會匹配導(dǎo)入任務(wù)的Label包含label_matcher的導(dǎo)入任務(wù)。

STATE

導(dǎo)入狀態(tài)。只查看指定狀態(tài)的導(dǎo)入任務(wù)。

ORDER BY

指定排序依據(jù)。

LIMIT

顯示Limit條匹配記錄。不指定的場合全部顯示。

OFFSET

從偏移量offset開始顯示查詢結(jié)果。默認(rèn)情況下偏移量為0。

使用示例

  • 展示數(shù)據(jù)庫example_db的導(dǎo)入任務(wù),Label中包含字符串2014_01_02,展示保存時間最久的10個。

    SHOW LOAD FROM example_db WHERE LABEL LIKE "2014_01_02" LIMIT 10;
  • 展示數(shù)據(jù)庫example_db的導(dǎo)入任務(wù),指定Label為load_example_db_20140102并按LoadStartTime降序排序。

    SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" ORDER BY LoadStartTime DESC;
  • 展示數(shù)據(jù)庫example_db的導(dǎo)入任務(wù),指定Label為load_example_db_20140102,state為loading

    SHOW LOAD FROM example_db WHERE LABEL = "load_example_db_20140102" AND STATE = "loading";
  • 展示數(shù)據(jù)庫example_db的導(dǎo)入任務(wù),按LoadStartTime降序排序,并從偏移量5開始顯示10條查詢結(jié)果。

    SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 5,10;
    SHOW LOAD FROM example_db ORDER BY LoadStartTime DESC limit 10 offset 5;

最佳實(shí)踐

  • 查看導(dǎo)入任務(wù)狀態(tài)

    Broker Load是一個異步導(dǎo)入過程,語句執(zhí)行成功僅代表導(dǎo)入任務(wù)提交成功,并不代表數(shù)據(jù)導(dǎo)入成功。導(dǎo)入狀態(tài)需要通過SHOW LOAD命令查看。

  • 取消導(dǎo)入任務(wù)

    已提交切尚未結(jié)束的導(dǎo)入任務(wù)可以通過CANCEL LOAD命令取消。取消后,已寫入的數(shù)據(jù)也會回滾,不會生效。

  • Label、導(dǎo)入事務(wù)、多表原子性

    SelectDB中所有導(dǎo)入任務(wù)都是原子生效的。并且在同一個導(dǎo)入任務(wù)中對多張表的導(dǎo)入也能夠保證原子性。同時,SelectDB還可以通過Label的機(jī)制來保證數(shù)據(jù)導(dǎo)入的不丟失和不重復(fù)。

  • 列映射、衍生列和過濾

    SelectDB可以在導(dǎo)入語句中支持非常豐富的列轉(zhuǎn)換和過濾操作。支持絕大多數(shù)內(nèi)置函數(shù)和UDF。詳情請參見數(shù)據(jù)轉(zhuǎn)換文檔。

  • 錯誤數(shù)據(jù)過濾

    SelectDB的導(dǎo)入任務(wù)可以容忍一部分格式錯誤的數(shù)據(jù)。容忍了通過max_filter_ratio設(shè)置。默認(rèn)為0,即表示當(dāng)有一條錯誤數(shù)據(jù)時,整個導(dǎo)入任務(wù)將會失敗。如果您希望忽略部分有問題的數(shù)據(jù)行,可以將次參數(shù)設(shè)置為0~1之間的數(shù)值,SelectDB會自動跳過哪些數(shù)據(jù)格式不正確的行。

    關(guān)于容忍率的一些計算方式,詳情請參見數(shù)據(jù)轉(zhuǎn)換文檔。

  • 嚴(yán)格模式

    strict_mode屬性用于設(shè)置導(dǎo)入任務(wù)是否運(yùn)行在嚴(yán)格模式下。該格式會對列映射、轉(zhuǎn)換和過濾的結(jié)果產(chǎn)生影響。

  • 超時時間

    Broker Load的默認(rèn)超時時間為4小時,從任務(wù)提交開始算起。如果超時未完成,則任務(wù)會失敗。

  • 數(shù)據(jù)量和任務(wù)數(shù)限制

    建議通過Broker Load單次導(dǎo)入100 GB以內(nèi)的數(shù)據(jù)。雖然理論上在一個導(dǎo)入任務(wù)中導(dǎo)入的數(shù)據(jù)量沒有上限,但是提交過大的導(dǎo)入會導(dǎo)致運(yùn)行時間較長,并且失敗后重試的代價也會較大。

    同時受限于集群規(guī)模,我們限制了導(dǎo)入的最大數(shù)據(jù)量為節(jié)點(diǎn)數(shù)乘3 GB。以保證系統(tǒng)資源的合理利用。如果有大數(shù)據(jù)量需要導(dǎo)入,建議分成多個導(dǎo)入任務(wù)提交。

    SelectDB會限制集群內(nèi)同時運(yùn)行的導(dǎo)入任務(wù)數(shù)量,通常在3~10個之間,之后提交的導(dǎo)入作業(yè)會排隊(duì)等待。隊(duì)列最大長度為100,之后的提交會直接拒絕。

    說明

    排隊(duì)時間也會被計算到作業(yè)總時間中。如果超時,則作業(yè)會被取消。所以建議通過監(jiān)控作業(yè)運(yùn)行狀態(tài)來合理控制作業(yè)提交頻率。