日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Paimon主鍵表和Append Only表

Paimon僅支持主鍵表和Append Only表。本文為您介紹Paimon主鍵表和Append Only表的基本特性與功能。

Paimon主鍵表

創建Paimon表時指定了主鍵(primary key),則該表即為Paimon主鍵表。

語法結構

例如,創建一張分區鍵為dt,主鍵為dt、shop_id和user_id,分桶數固定為4的Paimon主鍵表。

CREATE TABLE T (
  dt STRING,
  shop_id BIGINT,
  user_id BIGINT,
  num_orders INT,
  total_amount INT,
  PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
  'bucket' = '4'
);

Paimon主鍵表中每行數據的主鍵值各不相同,如果將多條具有相同主鍵的數據寫入Paimon主鍵表,將根據數據合并機制對數據進行合并。

分桶方式

分桶(Bucket)是Paimon表讀寫操作的最小單元。非分區表的所有數據,以及分區表每個分區的數據,都會被進一步劃分到不同的分桶中,以便同一作業使用多個并發同時讀寫Paimon表,加快讀寫效率。支持的類別詳情如下。

類別

定義

說明

動態分桶(默認)

創建Paimon主鍵表時,不在WITH參數中指定bucket或指定'bucket' = '-1',將會創建動態分桶的Paimon表。

  • 動態分桶表不支持多個作業同時寫入。

  • 對于動態分桶的Paimon主鍵表,可以支持跨分區更新主鍵。

固定分桶

創建Paimon主鍵表時,在WITH參數中指定'bucket' = '<num>',即可指定非分區表的分桶數為<num>,或者分區表單個分區的分桶數為<num><num>是一個大于0的整數。

如果在創建固定分桶的Paimon表之后,需要修改分桶數,詳情請參見調整固定分桶表的分桶數量

對于固定分桶的Paimon主鍵表,分區表的主鍵需要完全包含分區鍵(partition key),以避免主鍵的跨分區更新。

動態分桶表更新

類別

說明

跨分區更新的動態分桶表

對于主鍵不完全包含分區鍵的動態分桶表,Paimon無法根據主鍵確定該數據屬于哪個分區的哪個分桶,因此需要使用RocksDB維護主鍵與分區以及分桶編號的映射關系。相比固定分桶而言,數據量較大的表可能會產生明顯的性能損失。另外,因為作業啟動時需要將映射關系全量加載至RocksDB中,作業的啟動速度也會變慢。數據合并機制會對跨分區更新的結果產生影響:

  • deduplicate:數據將會從老分區刪除,并插入新分區。

  • aggregation與partial-update:數據將會直接在老分區中更新,無視新數據的分區鍵。

  • first-row:如果相同主鍵的數據已經存在,則新數據將被直接丟棄。

非跨分區更新的動態分桶表

對于主鍵完全包含分區鍵的動態分桶表,Paimon可以確定該主鍵屬于哪個分區,但無法確定屬于哪個分桶,因此需要使用額外的堆內存創建索引,以維護主鍵與分桶編號的映射關系。

具體來說,每1億條主鍵將額外消耗1 GB的堆內存。只有當前正在寫入的分區才會消耗堆內存,歷史分區中的主鍵不會消耗堆內存。

除堆內存的消耗外,相比固定分桶而言,主鍵完全包含分區鍵的動態分桶表不會有明顯的性能損失。

數據分發

類別

數據分發

動態分桶

動態分桶的Paimon表會先將數據寫入已有的分桶中,當分桶的數據量超過限制時,再自動創建新的分桶。以下WITH參數將會影響動態分桶的行為。

  • dynamic-bucket.target-row-num:每個分桶最多存儲幾條數據。默認值為2000000。

  • dynamic-bucket.initial-buckets:初始的分桶數。如果不設置,初始將會創建等同于writer算子并發數的分桶。

固定分桶

默認情況下,Paimon將根據每條數據主鍵的哈希值,確定該數據屬于哪個分桶。

如果您需要修改數據的分桶方式,可以在創建Paimon表時,在WITH參數中指定bucket-key參數,不同列的名稱用英文逗號分隔,主鍵必須完整包含bucket-key。例如,如果設置了'bucket-key' = 'c1,c2',則Paimon將根據每條數據c1c2兩列的值,確定該數據屬于哪個分桶。

調整固定分桶表的分桶數量

由于分桶數限制了實際工作的作業并發數,且單個分桶內數據總量太大可能導致讀寫性能的降低,因此分桶數不宜太小。但是,分桶數過大也會造成小文件數量過多。建議每個分桶的數據大小在2 GB左右,最大不超過5 GB。調整固定分桶表的分桶數量具體的操作步驟如下。

  1. 停止所有寫入該Paimon表或消費該Paimon表的作業。

  2. 新建查詢腳本,執行以下SQL語句,調整Paimon表的bucket參數。

    ALTER TABLE `<catalog-name>`.`<database-name>`.`<table-name>` SET ('bucket' = '<bucket-num>');
  3. 整理非分區表中的所有數據,或分區表中仍需寫入的分區中的所有數據。

    • 非分區表:新建空白批作業草稿,在SQL編輯器中編寫以下SQL語句,之后部署啟動批作業。

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`;
    • 分區表:新建空白批作業草稿,在SQL編輯器中編寫以下SQL語句,之后部署啟動批作業。

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      PARTITION (<partition-spec>)
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`
      WHERE <partition-condition>;

      其中,<partition-spec>和<partition-condition>指定了需要整理的分區。例如,整理dt = 20240312, hh = 08分區中的數據的SQL語句如下。

      INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>`
      PARTITION (dt = '20240312', hh = '08')
      SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`
      WHERE dt = '20240312' AND hh = '08';
  4. 批作業執行完成后,即可恢復Paimon表的寫入作業與消費作業。

變更數據產生機制

Paimon表需要將數據的增刪與更新改寫為完整的變更數據(類似于數據庫的Binlog),才能讓下游進行流式消費。通過在WITH參數中設置changelog-producer,Paimon將會以不同的方式產生變更數據,其取值如下。

參數取值

說明

使用場景

none(默認)

Paimon主鍵表將不會產出完整的變更數據。

Paimon表僅能通過批作業進行消費,不能通過流作業進行消費。

input

Paimon主鍵表會直接將輸入的消息作為變更數據傳遞給下游消費者。

僅在輸入數據流本身是完整的變更數據時(例如數據庫的Binlog)使用。

由于input機制不涉及額外的計算,因此其效率最高。

lookup

Paimon主鍵表會通過批量點查的方式,在Flink作業每次創建檢查點(checkpoint)時觸發小文件合并(compaction),并利用小文件合并的結果產生完整的變更數據。

無論輸入數據流是否為完整的變更數據,都可以使用該機制。

與full-compaction機制相比,lookup機制的時效性更好,但總體來看耗費的資源更多。推薦在對數據新鮮度有較高要求(分鐘級)的情況下使用。

full-compaction

Paimon主鍵表會在每一次執行小文件全量合并(full compaction)時,產生完整的變更數據。

無論輸入數據流是否為完整的變更數據,都可以使用該機制。

與lookup機制相比,full-compaction機制的時效性較差,但其利用了小文件合并流程,不產生額外計算,因此總體來看耗費的資源更少。推薦在對數據新鮮度要求不高(小時級)的情況下使用。

為了保證full-compaction機制的時效性,您可以在WITH參數中設置'full-compaction.delta-commits' = '<num>',要求Paimon在每<num>個Flink作業的檢查點執行小文件全量合并。然而,由于小文件全量合并會消耗較多計算資源,因此頻率不宜過高,建議每30分鐘至1小時強制執行一次。

說明

默認情況下,即使更新后的數據與更新之前相同,Paimon仍然會產生變更數據。如果您希望消除此類無效的變更數據,可以在WITH參數中設置'changelog-producer.row-deduplicate' = 'true'。該參數僅對lookup與full-compaction機制有效。由于設置該參數后,需要引入額外的計算對比更新前后的值,推薦僅在無效變更數據較多的情況下使用該參數。

數據合并機制

參數說明

如果將多條具有相同主鍵的數據寫入Paimon主鍵表,Paimon將會根據WITH參數中設置的merge-engine參數對數據進行合并。參數取值如下:

deduplicate(默認值)

設置'merge-engine' = 'deduplicate' 后,對于多條具有相同主鍵的數據,Paimon主鍵表僅會保留最新一條數據,并丟棄其它具有相同主鍵的數據。如果最新數據是一條delete消息,所有具有該主鍵的數據都會被丟棄。創建Paimon表的DDL語句。

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'deduplicate' -- deduplicate 是默認值,可以不設置
);
  • 如果寫入Paimon表的數據依次為+I(1, 2.0, 'apple')+I(1, 4.0, 'banana')+I(1, 8.0, 'cherry'),則SELECT * FROM T WHERE k = 1將查詢到(1, 8.0, 'cherry')這條數據。

  • 如果寫入Paimon表的數據依次為+I(1, 2.0, 'apple')+I(1, 4.0, 'banana')-D(1, 4.0, 'banana'),則SELECT * FROM T WHERE k = 1將查不到任何數據。

first-row

設置'merge-engine' = 'first-row'后,Paimon只會保留相同主鍵數據中的第一條。與deduplicate合并機制相比,first-row只會產生insert類型的變更數據,且變更數據的產出效率更高。

說明
  • 如果下游需要流式消費first-row的結果,需要將changelog-producer參數設為 lookup。

  • first-row無法處理delete與update_before消息。您可以設置'first-row.ignore-delete' = 'true'以忽略這兩類消息。

  • first-row合并機制不支持指定sequence field。

例如,創建Paimon表的DDL語句。

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'first-row'
);

如果寫入Paimon表的數據依次為+I(1, 2.0, 'apple')+I(1, 4.0, 'banana'), +I(1, 8.0, 'cherry'),則SELECT * FROM T WHERE k = 1將查詢到(1, 2.0, 'apple')這條數據。

aggregation

對于多條具有相同主鍵的數據,Paimon主鍵表將會根據您指定的聚合函數進行聚合。對于不屬于主鍵的每一列,都需要通過fields.<field-name>.aggregate-function指定一個聚合函數,否則該列將默認使用last_non_null_value聚合函數。

說明

如果下游需要流式消費aggregation的結果,需要將changelog-producer參數設為lookup或full-compaction。

例如,price列將會使用max函數進行聚合,而sales列將會使用sum函數進行聚合。

CREATE TABLE T (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

如果寫入Paimon表的數據依次為+I(1, 23.0, 15)+I(1, 30.2, 20)SELECT * FROM T WHERE product_id = 1將查詢到(1, 30.2, 35)這條數據。

支持的聚合函數與對應的數據類型如下:

  • sum(求和):支持DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE。

  • product(求乘積):支持DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE。

  • count(統計非null值總數):支持INTEGER和BIGINT。

  • max(最大值)和min(最小值):CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP和TIMESTAMP_LTZ。

  • first_value(返回第一次輸入的值)和last_value(返回最新輸入的值):支持所有數據類型,包括null。

  • first_not_null_value(返回第一次輸入的非null值)和last_non_null_value(返回最新輸入的非 null 值):支持所有數據類型。

  • listagg(將輸入的字符串依次用英文逗號連接):支持STRING。

  • bool_and和bool_or:支持BOOLEAN。

說明

上述聚合函數中,只有sum、product和count支持回撤消息(update_before 與 delete 消息)。您可以設置'fields.<field-name>.ignore-retract' = 'true'使對應列忽略回撤消息。

partial-update

設置'merge-engine' = 'partial-update'后,您可以通過多條消息對數據進行逐步更新,并最終得到完整的數據。即具有相同主鍵的新數據將會覆蓋原來的數據,但值為null的列不會進行覆蓋。

說明
  • 如果下游需要流式消費partial-update的結果,需要將changelog-producer參數設為lookup或full-compaction。

  • partial-update 無法處理 delete 與 update_before 消息。您可以設置'partial-update.ignore-delete' = 'true' 以忽略這兩類消息。

例如,考慮以下創建 Paimon 表的 DDL 語句。

CREATE TABLE T (
  k INT,
  v1 DOUBLE,
  v2 BIGINT,
  v3 STRING,
  PRIMARY KEY (k) NOT ENFORCED
) WITH (
  'merge-engine' = 'partial-update'
);

如果寫入Paimon表的數據依次為+I(1, 23.0, 10, NULL)+I(1, NULL, NULL, 'This is a book')+I(1, 25.2, NULL, NULL),則SELECT * FROM T WHERE k = 1將查詢到(1, 25.2, 10, 'This is a book')這條數據。

在partial-update合并機制中,您還可以設置指定WITH參數指定合并順序或對數據進行打寬與聚合,詳情如下:

  • 通過Sequence Group為不同列分別指定合并順序

    除了sequence field之外,您也可以通過sequence group為不同列分別指定合并順序。您可以為來自不同源表的列分別指定合并順序,幫您在打寬場景下處理亂序數據。

    例如,a、b 兩列根據 g_1 列的值從小到大進行合并,而 c、d 兩列將根據g_2 列的值從小到大進行合并。

    CREATE TABLE T (
      k INT,
      a STRING,
      b STRING,
      g_1 INT,
      c STRING,
      d STRING,
      g_2 INT,
      PRIMARY KEY (k) NOT ENFORCED
    ) WITH (
      'merge-engine' = 'partial-update',
      'fields.g_1.sequence-group' = 'a,b',
      'fields.g_2.sequence-group' = 'c,d'
    );
  • 同時進行數據的打寬與聚合

    您還可以在WITH參數中設置fields.<field-name>.aggregate-function,為<field-name>這一列指定聚合函數,對該列的值進行聚合。<field-name>這一列需要屬于某個 sequence group。aggregation合并機制支持的聚合函數均可使用。

    例如,a、b兩列將根據g_1 列的值從小到大進行合并,其中a列將會保留最新的非null值,而 b列將會保留輸入的最大值。c、d兩列將根據g_2列的值從小到大進行合并,其中c列將會保留最新的非null值,而d列將會求出輸入數據的和。

    CREATE TABLE T (
      k INT,
      a STRING,
      b INT,
      g_1 INT,
      c STRING,
      d INT,
      g_2 INT,
      PRIMARY KEY (k) NOT ENFORCED
    ) WITH (
      'merge-engine' = 'partial-update',
      'fields.g_1.sequence-group' = 'a,b',
      'fields.b.aggregate-function' = 'max',
      'fields.g_2.sequence-group' = 'c,d',
      'fields.d.aggregate-function' = 'sum'
    );

更多信息,詳情請參見Merge Engine

亂序數據處理

默認情況下,Paimon會按照數據的輸入順序確定合并的順序,最后寫入Paimon的數據會被認為是最新數據。如果您的輸入數據流存在亂序數據,可以通過在WITH參數中指定'sequence.field' = '<column-name>,具有相同主鍵的數據將按<column-name>這一列的值從小到大進行合并。可以作為sequence field的數據類型有:TINYINT、SMALLINT、INTEGER、BIGINT、TIMESTAMP和TIMESTAMP_LTZ。

說明

如果您使用MySQLop_t元數據作為sequence field,會導致一對update_before與update_after消息具有相同的sequence field值,需要在WITH參數中設置'sequence.auto-padding' = 'row-kind-flag',以保證Paimon會先處理update_before消息,再處理update_after消息。

Paimon Append Only表(非主鍵表)

如果在創建Paimon表時沒有指定主鍵(Primary Key),則該表就是Paimon Append Only表。您只能以流式方式將完整記錄插入到表中,適合不需要流式更新的場景(例如日志數據同步)。

語法結構

例如,以下SQL語句將創建一張分區鍵為dt的Append Scalable表。

CREATE TABLE T (
  dt STRING
  order_id BIGINT,
  item_id BIGINT,
  amount INT,
  address STRING
) PARTITIONED BY (dt) WITH (
  'bucket' = '-1'
);

表類型

類型

定義

說明

Append Scalable表

創建Paimon Append Only表時,在WITH參數中指定'bucket' = '-1',將會創建Append Scalable表。

作為Hive表的高效替代,在對數據的流式消費順序沒有需求的情況下,應盡量選擇Append Scalable表。它具有寫入無shuffle、數據可排序、并發控制便捷,可直接將直接吸收或者轉換現存的hive表、且可以使用異步合并等優勢,進一步加快寫入過程。

Append Queue表

創建Paimon Append Only表時,在WITH參數中指定'bucket' = '<num>',將會創建Append Queue表。

其中,<num>是一個大于0的整數,指定了非分區表的分桶數,或分區表單個分區的分桶數。

作為消息隊列具有分鐘級延遲的替代。Paimon表的分桶數此時相當于Kafka的Partition數或云消息隊列MQTT版的Shard數。

數據的分發

類型

說明

Append Scalable表

由于無視桶的概念,多個并發可以同時寫同一個分區,無考慮順序以及數據是否分到對應桶的問題。在寫入時,直接由上游將數據推往writer節點。其中,不需要對數據進行hash partitioning。由此,在其他條件相同的情況下,通常此類型的表寫入速度更快。當上游并發和writer并發相同時,需要注意是否會產生數據傾斜問題。

Append Queue表

默認情況下,Paimon將根據每條數據所有列的取值,確定該數據屬于哪個分桶(bucket)。如果您需要修改數據的分桶方式,可以在創建Paimon表時,在WITH參數中指定bucket-key參數,不同列的名稱用英文逗號分隔。

例如,設置'bucket-key' = 'c1,c2',則Paimon將根據每條數據c1c2兩列的值,確定該數據屬于哪個分桶。

說明

建議盡可能設置bucket-key,以減少分桶過程中參與計算的列的數量,提高Paimon表的寫入效率。

數據消費順序

類型

說明

Append Scalable表

無法像Append Queue表一樣,在流式消費Paimon表時保證數據的消費順序與數據寫入Paimon表的順序一致。適合對數據的流式消費順序沒有需求場景。

Append Queue表

Append Queue表可以保證流式消費Paimon表時,每個分桶中數據的消費順序與數據寫入Paimon表的順序一致。具體來說:

  • 對于兩條來自不同分區(partition)的數據

    • 如果表參數中設置了'scan.plan-sort-partition' = 'true',則分區值更小的數據會首先產出。

    • 如果表參數中未設置'scan.plan-sort-partition' = 'true',則分區創建時間更早的數據會首先產出。

  • 對于兩條來自相同分區的相同分桶的數據,先寫入Paimon表的數據會首先產出。

  • 對于兩條來自相同分區但不同分桶的數據,由于不同分桶可能被不同的Flink作業并發處理,因此不保證兩條數據的消費順序。

相關文檔