Paimon表數(shù)據(jù)寫入和消費(fèi)
本文為您介紹如何在實(shí)時計算開發(fā)控制臺向Paimon表中插入、更新、覆寫或刪除數(shù)據(jù),以及從Paimon表消費(fèi)數(shù)據(jù),并指定消費(fèi)位點(diǎn)。
前提條件
已創(chuàng)建Paimon Catalog和Paimon表,詳情請參見管理Paimon Catalog。
使用限制
僅實(shí)時計算引擎VVR 8.0.5及以上版本支持Paimon表。
向Paimon表寫入數(shù)據(jù)
通過CTAS/CDAS語句同步數(shù)據(jù)及表結(jié)構(gòu)變更
詳情請參見管理Paimon Catalog。
通過INSERT INTO語句插入或更新數(shù)據(jù)
您可以通過INSERT INTO語句,直接向Paimon表插入或更新數(shù)據(jù)。
Paimon主鍵表可以接受所有類型(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE)的消息,相同主鍵的數(shù)據(jù)在寫入后會根據(jù)數(shù)據(jù)合并機(jī)制進(jìn)行合并。
Paimon Append Only表(非主鍵表)只能接受INSERT類型的消息。
通過INSERT OVERWRITE語句覆寫數(shù)據(jù)
覆寫是指清空并重新寫入數(shù)據(jù)。您可以通過INSERT OVERWRITE語句覆寫整張Paimon表或覆寫特定分區(qū),SQL語句示例如下。
僅批作業(yè)支持INSERT OVERWRITE語句。
默認(rèn)情況下,INSERT OVERWRITE操作不會產(chǎn)生變更數(shù)據(jù),刪除與導(dǎo)入的數(shù)據(jù)無法被下游流式消費(fèi)。如果您需要消費(fèi)此類數(shù)據(jù),請參見流式消費(fèi)INSERT OVERWRITE語句的結(jié)果。
my_table表是非分區(qū)表,覆寫整張my_table表。
INSERT OVERWRITE my_table SELECT ...;
my_table表是分區(qū)表,覆寫my_table表中的
dt=20240108,hh=06
分區(qū)。INSERT OVERWRITE my_table PARTITION (`dt` = '20240108', `hh` = '06') SELECT ...;
my_table表是分區(qū)表,動態(tài)覆寫my_table表中的分區(qū),即SELECT語句結(jié)果中出現(xiàn)的分區(qū)都會被覆寫,其它分區(qū)保持不變。
INSERT OVERWRITE my_table SELECT ...;
my_table表是分區(qū)表,覆寫整張my_table表。
INSERT OVERWRITE my_table /*+ OPTIONS('dynamic-partition-overwrite' = 'false') */ SELECT ...;
通過DELETE語句刪除數(shù)據(jù)
您可以通過DELETE語句從Paimon主鍵表中刪除數(shù)據(jù)。DELETE語句只能在查詢腳本中執(zhí)行。
--從my_table表中刪除所有currency = 'UNKNOWN'的數(shù)據(jù)。
DELETE FROM my_table WHERE currency = 'UNKNOWN';
過濾刪除消息
使用Paimon主鍵表時,默認(rèn)情況下,類型為DELETE的消息會將Paimon表中對應(yīng)主鍵的數(shù)據(jù)刪除。如果您不希望Paimon表處理此類消息,可以通過SQL hint將以下參數(shù)設(shè)置為true,過濾刪除消息。
參數(shù) | 說明 | 數(shù)據(jù)類型 | 默認(rèn)值 |
ignore-delete | 是否過濾刪除消息。 | Boolean | false |
調(diào)整結(jié)果表的并發(fā)數(shù)
您可以通過SQL hint設(shè)置以下參數(shù),手動調(diào)整結(jié)果表算子的并發(fā)數(shù)。
參數(shù) | 說明 | 數(shù)據(jù)類型 | 默認(rèn)值 |
sink.parallelism | 手動設(shè)定Paimon結(jié)果表算子的并發(fā)數(shù)。 | Integer | 無 |
例如,以下SQL將手動設(shè)置Paimon結(jié)果表算子并發(fā)數(shù)為10。
INSERT INTO t /*+ OPTIONS('sink.parallelism' = '10') */ SELECT * FROM s;
從Paimon表消費(fèi)數(shù)據(jù)
通過流作業(yè)消費(fèi)Paimon表
通過流作業(yè)消費(fèi)的Paimon主鍵表需要設(shè)置變更數(shù)據(jù)產(chǎn)生機(jī)制。
默認(rèn)情況下,流作業(yè)中的Paimon源表算子將首先產(chǎn)出作業(yè)啟動時刻Paimon表中的全量數(shù)據(jù),之后產(chǎn)出從作業(yè)啟動時刻開始Paimon表中的增量數(shù)據(jù)。
從指定位點(diǎn)消費(fèi)Paimon表
您可以通過以下方式從指定位點(diǎn)消費(fèi)Paimon表:
如果您不需要消費(fèi)作業(yè)啟動時刻Paimon表中的全量數(shù)據(jù),只需要消費(fèi)后續(xù)的增量數(shù)據(jù),可通過SQL Hint設(shè)置
'scan.mode' = 'latest'
。SELECT * FROM t /*+ OPTIONS('scan.mode' = 'latest') */;
如果您不想要消費(fèi)全量數(shù)據(jù),只想消費(fèi)從指定時間點(diǎn)開始的增量數(shù)據(jù),可通過SQL Hint設(shè)置
scan.timestamp-millis
參數(shù)。參數(shù)值表示從Unix Epoch(1970-01-01 00:00:00 UTC)開始到指定時間點(diǎn)經(jīng)過的毫秒數(shù)。SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
如果您想要消費(fèi)從指定時間點(diǎn)之后寫入的全量數(shù)據(jù),并持續(xù)消費(fèi)后續(xù)的增量數(shù)據(jù),可以從以下兩種操作中選擇一種。
說明此類消費(fèi)方式將讀取在指定時間點(diǎn)之后修改的數(shù)據(jù)文件。由于小文件合并,數(shù)據(jù)文件中可能包含少量在指定時間點(diǎn)之前寫入的數(shù)據(jù)。您可以根據(jù)業(yè)務(wù)需求,在SQL作業(yè)中添加WHERE 過濾條件對數(shù)據(jù)進(jìn)行過濾。
不設(shè)置任何SQL Hint,在啟動作業(yè)時,選擇指定源表開始時間并指定具體的時間信息。
通過SQL Hint設(shè)置
scan.file-creation-time-millis
參數(shù)。SELECT * FROM t /*+ OPTIONS('scan.file-creation-time-millis' = '1678883047356') */;
如果您不想要消費(fèi)全量數(shù)據(jù),只想消費(fèi)從特定快照文件開始的增量數(shù)據(jù),可通過SQL Hint設(shè)置
scan.snapshot-id
參數(shù),參數(shù)值是指定快照文件的編號。SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;
如果您想要消費(fèi)特定快照文件的全量數(shù)據(jù),并持續(xù)消費(fèi)后續(xù)的增量數(shù)據(jù),可通過SQL hint設(shè)置
'scan.mode' = 'from-snapshot-full'
和scan.snapshot-id
參數(shù),scan.snapshot-id
參數(shù)值是指定快照文件的編號。SELECT * FROM t /*+ OPTIONS('scan.mode' = 'from-snapshot-full', 'scan.snapshot-id' = '1') */;
指定Consumer ID
Consumer ID可以保存Paimon表的消費(fèi)進(jìn)度,主要用于以下場景:
如果您修改了SQL作業(yè)的計算邏輯,可能會導(dǎo)致作業(yè)拓?fù)浒l(fā)生變化,無法從Flink狀態(tài)中恢復(fù)消費(fèi)進(jìn)度。設(shè)置Consumer ID可以將此ID對應(yīng)的消費(fèi)進(jìn)度保存在Paimon表的元數(shù)據(jù)文件中,即使后續(xù)無狀態(tài)啟動作業(yè),也能從中斷的位點(diǎn)繼續(xù)消費(fèi)Paimon表。
設(shè)置Consumer ID后,未被消費(fèi)過的快照文件不會因過期而被刪除,可以防止因消費(fèi)速度跟不上快照過期速度導(dǎo)致的報錯。
通過設(shè)置consumer-id
參數(shù),您可以給流作業(yè)中的Paimon源表算子賦予一個Consumer ID,其值可以是任意的字符串。Consumer ID第一次創(chuàng)建時,它的起始消費(fèi)位點(diǎn)根據(jù)從指定位點(diǎn)消費(fèi)Paimon表中的規(guī)則確定。后續(xù)只要繼續(xù)使用相同的Consumer ID,即可恢復(fù)Paimon表的消費(fèi)進(jìn)度。
例如,為Paimon源表算子設(shè)置名為test-id的Consumer ID的SQL語句示例如下。如果您想要重置某個Consumer ID對應(yīng)的消費(fèi)位點(diǎn),可以額外設(shè)置'consumer.ignore-progress' = 'true'
。
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'test-id') */;
由于未被Consumer ID消費(fèi)過的快照文件不會因過期而被刪除,如果不及時清理廢棄的Consumer ID,快照文件及其對應(yīng)的歷史數(shù)據(jù)文件將永遠(yuǎn)不會被刪除,會占用存儲空間。您可以設(shè)置consumer.expiration-time
表參數(shù),將超過規(guī)定時間不使用的Cconsumer ID清理掉。例如,'consumer.expiration-time' = '3d'
表示將3天未使用的Consumer ID清理掉。
流式消費(fèi)INSERT OVERWRITE語句的結(jié)果
默認(rèn)情況下,INSERT OVERWRITE操作不會產(chǎn)生變更數(shù)據(jù),刪除與導(dǎo)入的數(shù)據(jù)無法被下游流式消費(fèi)。如果您需要消費(fèi)此類數(shù)據(jù),可以在流式消費(fèi)作業(yè)中通過SQL Hint配置'streaming-read-overwrite' = 'true'
。
SELECT * FROM t /*+ OPTIONS('streaming-read-overwrite' = 'true') */;
通過批作業(yè)消費(fèi)Paimon表
默認(rèn)情況下,批作業(yè)中的Paimon源表算子將讀取最新的快照文件,輸出Paimon表的最新狀態(tài)數(shù)據(jù)。
Batch Time Travel
通過SQL Hint設(shè)置scan.timestamp-millis
參數(shù),即可查詢Paimon表在該時間點(diǎn)的狀態(tài)。參數(shù)值表示從Unix Epoch(1970-01-01 00:00:00 UTC)開始到指定時間點(diǎn)經(jīng)過的毫秒數(shù)。
SELECT * FROM t /*+ OPTIONS('scan.timestamp-millis' = '1678883047356') */;
通過SQL Hint設(shè)置scan.snapshot-id
參數(shù),即可查詢Paimon表在該快照文件產(chǎn)生時的狀態(tài)。參數(shù)值為指定快照文件的編號。
SELECT * FROM t /*+ OPTIONS('scan.snapshot-id' = '3') */;
查詢兩次快照之間的數(shù)據(jù)變化
如果您想要查詢兩次快照間Paimon表中數(shù)據(jù)發(fā)生的變化,可以通過SQL Hint設(shè)置incremental-between
參數(shù)。例如,查看20號快照文件和12號快照文件間發(fā)生變化的所有數(shù)據(jù),SQL語句示例如下。
SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */;
由于批作業(yè)不支持消費(fèi)Delete類型的消息,默認(rèn)情況下此類消息將會被丟棄。如果您想要在批作業(yè)中消費(fèi)Delete類型的消息,請查詢Audit Log系統(tǒng)表。例如SELECT * FROM `t$audit_log ` /*+ OPTIONS('incremental-between' = '12,20') */;
。
調(diào)整源表的并發(fā)數(shù)
默認(rèn)情況下,Paimon根據(jù)分區(qū)數(shù)以及分桶數(shù)等信息自動推斷源表算子的并發(fā)數(shù)。您可以通過SQL Hint設(shè)置以下參數(shù),手動調(diào)整源表算子的并發(fā)數(shù)。
參數(shù) | 數(shù)據(jù)類型 | 默認(rèn)值 | 備注 |
scan.parallelism | Integer | 無 | 手動設(shè)定Paimon源表算子的并發(fā)數(shù)。 |
scan.infer-parallelism | Boolean | true | 是否自動推斷Paimon源表算子的并發(fā)數(shù)。 |
scan.infer-parallelism.max | Integer | 1024 | Paimon源表算子自動推斷出的并發(fā)數(shù)上限。 |
手動設(shè)置Paimon源表算子并發(fā)數(shù)為10的SQL語句示例如下。
SELECT * FROM t /*+ OPTIONS('scan.parallelism' = '10') */;
使用Paimon維表
Paimon也可以作為維表使用。關(guān)于維表JOIN的語法,詳情請參見維表JOIN語句。
相關(guān)文檔
Paimon表數(shù)據(jù)寫入和消費(fèi)時,支持使用SQL Hint臨時修改表參數(shù),詳情請參見作為CTAS的目標(biāo)端Catalog。
Paimon主鍵表和Append表的基本特性與功能,詳情請參見Paimon主鍵表和Append Only表。
不同場景下Paimon主鍵表和Append Scalable表的常用優(yōu)化,詳情請參見Paimon性能優(yōu)化。
Paimon表的消費(fèi)依賴快照文件,快照過期時間太短或消費(fèi)作業(yè)效率低會導(dǎo)致正在消費(fèi)的快照文件因過期被刪除,消費(fèi)作業(yè)出現(xiàn)
File xxx not found, Possible causes
報錯,解決方法請參見讀Paimon作業(yè)出現(xiàn)File xxx not found, Possible causes。