Paimon僅支持主鍵表和Append Only表。本文為您介紹Paimon主鍵表和Append Only表的基本特性與功能。
Paimon主鍵表
創建Paimon表時指定了主鍵(primary key),則該表即為Paimon主鍵表。
語法結構
例如,創建一張分區鍵為dt,主鍵為dt、shop_id和user_id,分桶數固定為4的Paimon主鍵表。
CREATE TABLE T (
dt STRING,
shop_id BIGINT,
user_id BIGINT,
num_orders INT,
total_amount INT,
PRIMARY KEY (dt, shop_id, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'bucket' = '4'
);
Paimon主鍵表中每行數據的主鍵值各不相同,如果將多條具有相同主鍵的數據寫入Paimon主鍵表,將根據數據合并機制對數據進行合并。
分桶方式
分桶(Bucket)是Paimon表讀寫操作的最小單元。非分區表的所有數據,以及分區表每個分區的數據,都會被進一步劃分到不同的分桶中,以便同一作業使用多個并發同時讀寫Paimon表,加快讀寫效率。支持的類別詳情如下。
類別 | 定義 | 說明 |
動態分桶(默認) | 創建Paimon主鍵表時,不在WITH參數中指定 |
|
固定分桶 | 創建Paimon主鍵表時,在WITH參數中指定 如果在創建固定分桶的Paimon表之后,需要修改分桶數,詳情請參見調整固定分桶表的分桶數量。 | 對于固定分桶的Paimon主鍵表,分區表的主鍵需要完全包含分區鍵(partition key),以避免主鍵的跨分區更新。 |
動態分桶表更新
類別 | 說明 |
跨分區更新的動態分桶表 | 對于主鍵不完全包含分區鍵的動態分桶表,Paimon無法根據主鍵確定該數據屬于哪個分區的哪個分桶,因此需要使用RocksDB維護主鍵與分區以及分桶編號的映射關系。相比固定分桶而言,數據量較大的表可能會產生明顯的性能損失。另外,因為作業啟動時需要將映射關系全量加載至RocksDB中,作業的啟動速度也會變慢。數據合并機制會對跨分區更新的結果產生影響:
|
非跨分區更新的動態分桶表 | 對于主鍵完全包含分區鍵的動態分桶表,Paimon可以確定該主鍵屬于哪個分區,但無法確定屬于哪個分桶,因此需要使用額外的堆內存創建索引,以維護主鍵與分桶編號的映射關系。 具體來說,每1億條主鍵將額外消耗1 GB的堆內存。只有當前正在寫入的分區才會消耗堆內存,歷史分區中的主鍵不會消耗堆內存。 除堆內存的消耗外,相比固定分桶而言,主鍵完全包含分區鍵的動態分桶表不會有明顯的性能損失。 |
數據分發
類別 | 數據分發 |
動態分桶 | 動態分桶的Paimon表會先將數據寫入已有的分桶中,當分桶的數據量超過限制時,再自動創建新的分桶。以下WITH參數將會影響動態分桶的行為。
|
固定分桶 | 默認情況下,Paimon將根據每條數據主鍵的哈希值,確定該數據屬于哪個分桶。 如果您需要修改數據的分桶方式,可以在創建Paimon表時,在WITH參數中指定 |
調整固定分桶表的分桶數量
由于分桶數限制了實際工作的作業并發數,且單個分桶內數據總量太大可能導致讀寫性能的降低,因此分桶數不宜太小。但是,分桶數過大也會造成小文件數量過多。建議每個分桶的數據大小在2 GB左右,最大不超過5 GB。調整固定分桶表的分桶數量具體的操作步驟如下。
停止所有寫入該Paimon表或消費該Paimon表的作業。
新建查詢腳本,執行以下SQL語句,調整Paimon表的bucket參數。
ALTER TABLE `<catalog-name>`.`<database-name>`.`<table-name>` SET ('bucket' = '<bucket-num>');
整理非分區表中的所有數據,或分區表中仍需寫入的分區中的所有數據。
非分區表:新建空白批作業草稿,在SQL編輯器中編寫以下SQL語句,之后部署并啟動批作業。
INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>` SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>`;
分區表:新建空白批作業草稿,在SQL編輯器中編寫以下SQL語句,之后部署并啟動批作業。
INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>` PARTITION (<partition-spec>) SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>` WHERE <partition-condition>;
其中,<partition-spec>和<partition-condition>指定了需要整理的分區。例如,整理
dt = 20240312, hh = 08
分區中的數據的SQL語句如下。INSERT OVERWRITE `<catalog-name>`.`<database-name>`.`<table-name>` PARTITION (dt = '20240312', hh = '08') SELECT * FROM `<catalog-name>`.`<database-name>`.`<table-name>` WHERE dt = '20240312' AND hh = '08';
批作業執行完成后,即可恢復Paimon表的寫入作業與消費作業。
變更數據產生機制
Paimon表需要將數據的增刪與更新改寫為完整的變更數據(類似于數據庫的Binlog),才能讓下游進行流式消費。通過在WITH參數中設置changelog-producer
,Paimon將會以不同的方式產生變更數據,其取值如下。
參數取值 | 說明 | 使用場景 |
none(默認) | Paimon主鍵表將不會產出完整的變更數據。 | Paimon表僅能通過批作業進行消費,不能通過流作業進行消費。 |
input | Paimon主鍵表會直接將輸入的消息作為變更數據傳遞給下游消費者。 | 僅在輸入數據流本身是完整的變更數據時(例如數據庫的Binlog)使用。 由于input機制不涉及額外的計算,因此其效率最高。 |
lookup | Paimon主鍵表會通過批量點查的方式,在Flink作業每次創建檢查點(checkpoint)時觸發小文件合并(compaction),并利用小文件合并的結果產生完整的變更數據。 | 無論輸入數據流是否為完整的變更數據,都可以使用該機制。 與full-compaction機制相比,lookup機制的時效性更好,但總體來看耗費的資源更多。推薦在對數據新鮮度有較高要求(分鐘級)的情況下使用。 |
full-compaction | Paimon主鍵表會在每一次執行小文件全量合并(full compaction)時,產生完整的變更數據。 | 無論輸入數據流是否為完整的變更數據,都可以使用該機制。 與lookup機制相比,full-compaction機制的時效性較差,但其利用了小文件合并流程,不產生額外計算,因此總體來看耗費的資源更少。推薦在對數據新鮮度要求不高(小時級)的情況下使用。 為了保證full-compaction機制的時效性,您可以在WITH參數中設置 |
默認情況下,即使更新后的數據與更新之前相同,Paimon仍然會產生變更數據。如果您希望消除此類無效的變更數據,可以在WITH參數中設置'changelog-producer.row-deduplicate' = 'true'
。該參數僅對lookup與full-compaction機制有效。由于設置該參數后,需要引入額外的計算對比更新前后的值,推薦僅在無效變更數據較多的情況下使用該參數。
數據合并機制
參數說明
如果將多條具有相同主鍵的數據寫入Paimon主鍵表,Paimon將會根據WITH參數中設置的merge-engine
參數對數據進行合并。參數取值如下:
deduplicate(默認值)
設置'merge-engine' = 'deduplicate'
后,對于多條具有相同主鍵的數據,Paimon主鍵表僅會保留最新一條數據,并丟棄其它具有相同主鍵的數據。如果最新數據是一條delete消息,所有具有該主鍵的數據都會被丟棄。創建Paimon表的DDL語句。
CREATE TABLE T (
k INT,
v1 DOUBLE,
v2 STRING,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine' = 'deduplicate' -- deduplicate 是默認值,可以不設置
);
如果寫入Paimon表的數據依次為
+I(1, 2.0, 'apple')
,+I(1, 4.0, 'banana')
,+I(1, 8.0, 'cherry')
,則SELECT * FROM T WHERE k = 1
將查詢到(1, 8.0, 'cherry')
這條數據。如果寫入Paimon表的數據依次為
+I(1, 2.0, 'apple')
,+I(1, 4.0, 'banana')
,-D(1, 4.0, 'banana')
,則SELECT * FROM T WHERE k = 1
將查不到任何數據。
first-row
設置'merge-engine' = 'first-row'
后,Paimon只會保留相同主鍵數據中的第一條。與deduplicate合并機制相比,first-row只會產生insert類型的變更數據,且變更數據的產出效率更高。
如果下游需要流式消費first-row的結果,需要將
changelog-producer
參數設為 lookup。first-row無法處理delete與update_before消息。您可以設置
'first-row.ignore-delete' = 'true'
以忽略這兩類消息。first-row合并機制不支持指定sequence field。
例如,創建Paimon表的DDL語句。
CREATE TABLE T (
k INT,
v1 DOUBLE,
v2 STRING,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine' = 'first-row'
);
如果寫入Paimon表的數據依次為+I(1, 2.0, 'apple')
,+I(1, 4.0, 'banana')
, +I(1, 8.0, 'cherry')
,則SELECT * FROM T WHERE k = 1
將查詢到(1, 2.0, 'apple')
這條數據。
aggregation
對于多條具有相同主鍵的數據,Paimon主鍵表將會根據您指定的聚合函數進行聚合。對于不屬于主鍵的每一列,都需要通過fields.<field-name>.aggregate-function
指定一個聚合函數,否則該列將默認使用last_non_null_value聚合函數。
如果下游需要流式消費aggregation的結果,需要將changelog-producer
參數設為lookup或full-compaction。
例如,price列將會使用max函數進行聚合,而sales列將會使用sum函數進行聚合。
CREATE TABLE T (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.price.aggregate-function' = 'max',
'fields.sales.aggregate-function' = 'sum'
);
如果寫入Paimon表的數據依次為+I(1, 23.0, 15)
和+I(1, 30.2, 20)
,SELECT * FROM T WHERE product_id = 1
將查詢到(1, 30.2, 35)
這條數據。
支持的聚合函數與對應的數據類型如下:
sum(求和):支持DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE。
product(求乘積):支持DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE。
count(統計非null值總數):支持INTEGER和BIGINT。
max(最大值)和min(最小值):CHAR、VARCHAR、DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP和TIMESTAMP_LTZ。
first_value(返回第一次輸入的值)和last_value(返回最新輸入的值):支持所有數據類型,包括null。
first_not_null_value(返回第一次輸入的非null值)和last_non_null_value(返回最新輸入的非 null 值):支持所有數據類型。
listagg(將輸入的字符串依次用英文逗號連接):支持STRING。
bool_and和bool_or:支持BOOLEAN。
上述聚合函數中,只有sum、product和count支持回撤消息(update_before 與 delete 消息)。您可以設置'fields.<field-name>.ignore-retract' = 'true'
使對應列忽略回撤消息。
partial-update
設置'merge-engine' = 'partial-update'
后,您可以通過多條消息對數據進行逐步更新,并最終得到完整的數據。即具有相同主鍵的新數據將會覆蓋原來的數據,但值為null的列不會進行覆蓋。
如果下游需要流式消費partial-update的結果,需要將
changelog-producer
參數設為lookup或full-compaction。partial-update 無法處理 delete 與 update_before 消息。您可以設置
'partial-update.ignore-delete' = 'true'
以忽略這兩類消息。
例如,考慮以下創建 Paimon 表的 DDL 語句。
CREATE TABLE T (
k INT,
v1 DOUBLE,
v2 BIGINT,
v3 STRING,
PRIMARY KEY (k) NOT ENFORCED
) WITH (
'merge-engine' = 'partial-update'
);
如果寫入Paimon表的數據依次為+I(1, 23.0, 10, NULL)
,+I(1, NULL, NULL, 'This is a book')
,+I(1, 25.2, NULL, NULL)
,則SELECT * FROM T WHERE k = 1
將查詢到(1, 25.2, 10, 'This is a book')
這條數據。
在partial-update合并機制中,您還可以設置指定WITH參數指定合并順序或對數據進行打寬與聚合,詳情如下:
通過Sequence Group為不同列分別指定合并順序
除了sequence field之外,您也可以通過sequence group為不同列分別指定合并順序。您可以為來自不同源表的列分別指定合并順序,幫您在打寬場景下處理亂序數據。
例如,a、b 兩列根據 g_1 列的值從小到大進行合并,而 c、d 兩列將根據g_2 列的值從小到大進行合并。
CREATE TABLE T ( k INT, a STRING, b STRING, g_1 INT, c STRING, d STRING, g_2 INT, PRIMARY KEY (k) NOT ENFORCED ) WITH ( 'merge-engine' = 'partial-update', 'fields.g_1.sequence-group' = 'a,b', 'fields.g_2.sequence-group' = 'c,d' );
同時進行數據的打寬與聚合
您還可以在WITH參數中設置
fields.<field-name>.aggregate-function
,為<field-name>
這一列指定聚合函數,對該列的值進行聚合。<field-name>
這一列需要屬于某個 sequence group。aggregation合并機制支持的聚合函數均可使用。例如,a、b兩列將根據g_1 列的值從小到大進行合并,其中a列將會保留最新的非null值,而 b列將會保留輸入的最大值。c、d兩列將根據g_2列的值從小到大進行合并,其中c列將會保留最新的非null值,而d列將會求出輸入數據的和。
CREATE TABLE T ( k INT, a STRING, b INT, g_1 INT, c STRING, d INT, g_2 INT, PRIMARY KEY (k) NOT ENFORCED ) WITH ( 'merge-engine' = 'partial-update', 'fields.g_1.sequence-group' = 'a,b', 'fields.b.aggregate-function' = 'max', 'fields.g_2.sequence-group' = 'c,d', 'fields.d.aggregate-function' = 'sum' );
更多信息,詳情請參見Merge Engine。
亂序數據處理
默認情況下,Paimon會按照數據的輸入順序確定合并的順序,最后寫入Paimon的數據會被認為是最新數據。如果您的輸入數據流存在亂序數據,可以通過在WITH參數中指定'sequence.field' = '<column-name>
,具有相同主鍵的數據將按<column-name>
這一列的值從小到大進行合并。可以作為sequence field的數據類型有:TINYINT、SMALLINT、INTEGER、BIGINT、TIMESTAMP和TIMESTAMP_LTZ。
如果您使用MySQL的op_t
元數據作為sequence field,會導致一對update_before與update_after消息具有相同的sequence field值,需要在WITH參數中設置'sequence.auto-padding' = 'row-kind-flag'
,以保證Paimon會先處理update_before消息,再處理update_after消息。
Paimon Append Only表(非主鍵表)
如果在創建Paimon表時沒有指定主鍵(Primary Key),則該表就是Paimon Append Only表。您只能以流式方式將完整記錄插入到表中,適合不需要流式更新的場景(例如日志數據同步)。
語法結構
例如,以下SQL語句將創建一張分區鍵為dt的Append Scalable表。
CREATE TABLE T (
dt STRING
order_id BIGINT,
item_id BIGINT,
amount INT,
address STRING
) PARTITIONED BY (dt) WITH (
'bucket' = '-1'
);
表類型
類型 | 定義 | 說明 |
Append Scalable表 | 創建Paimon Append Only表時,在WITH參數中指定 | 作為Hive表的高效替代,在對數據的流式消費順序沒有需求的情況下,應盡量選擇Append Scalable表。它具有寫入無shuffle、數據可排序、并發控制便捷,可直接將直接吸收或者轉換現存的hive表、且可以使用異步合并等優勢,進一步加快寫入過程。 |
Append Queue表 | 創建Paimon Append Only表時,在WITH參數中指定 其中, | 作為消息隊列具有分鐘級延遲的替代。Paimon表的分桶數此時相當于Kafka的Partition數或云消息隊列MQTT版的Shard數。 |
數據的分發
類型 | 說明 |
Append Scalable表 | 由于無視桶的概念,多個并發可以同時寫同一個分區,無考慮順序以及數據是否分到對應桶的問題。在寫入時,直接由上游將數據推往writer節點。其中,不需要對數據進行hash partitioning。由此,在其他條件相同的情況下,通常此類型的表寫入速度更快。當上游并發和writer并發相同時,需要注意是否會產生數據傾斜問題。 |
Append Queue表 | 默認情況下,Paimon將根據每條數據所有列的取值,確定該數據屬于哪個分桶(bucket)。如果您需要修改數據的分桶方式,可以在創建Paimon表時,在WITH參數中指定 例如,設置 說明 建議盡可能設置 |
數據消費順序
類型 | 說明 |
Append Scalable表 | 無法像Append Queue表一樣,在流式消費Paimon表時保證數據的消費順序與數據寫入Paimon表的順序一致。適合對數據的流式消費順序沒有需求場景。 |
Append Queue表 | Append Queue表可以保證流式消費Paimon表時,每個分桶中數據的消費順序與數據寫入Paimon表的順序一致。具體來說:
|
相關文檔
Paimon Catalog和Paimon表的創建操作,詳情請參見管理Paimon Catalog。
如果您需要了解Paimon主鍵表的常用優化,詳情請請見Paimon性能優化。