流式數據湖倉Paimon連接器推薦配合Paimon Catalog使用,本文為您介紹如何使用流式數據湖倉Paimon連接器。
背景信息
Apache Paimon是一種流批統一的湖存儲格式,支持高吞吐的寫入和低延遲的查詢。目前阿里云開源大數據平臺E-MapReduce常見的計算引擎(例如Flink、Spark、Hive或Trino)都與Paimon有著較為完善的集成度。您可以借助Apache Paimon快速地在HDFS或者云端OSS上構建自己的數據湖存儲服務,并接入上述計算引擎實現數據湖的分析,詳情請參見Apache Paimon。
類別 | 詳情 |
支持類型 | 源表、維表和結果表,數據攝入目標端 |
運行模式 | 流模式和批模式 |
數據格式 | 暫不支持 |
特有監控指標 | 暫無 |
API種類 | SQL,數據攝入YAML作業 |
是否支持更新或刪除結果表數據 | 是 |
特色功能
目前Apache Paimon提供以下核心能力:
基于HDFS或者對象存儲構建低成本的輕量級數據湖存儲服務。
支持在流模式與批模式下讀寫大規模數據集。
支持分鐘級到秒級數據新鮮度的批查詢和OLAP查詢。
支持消費與產生增量數據,可作為傳統的離線數倉和新型的流式數倉的各級存儲。
支持預聚合數據,降低存儲成本與下游計算壓力。
支持回溯歷史版本的數據。
支持高效的數據過濾。
支持表結構變更。
使用限制
僅Flink計算引擎VVR 6.0.6及以上版本支持Paimon連接器。
Paimon與VVR版本對應關系詳情如下表所示。
Paimon社區版本
實時計算Flink版引擎版本(VVR )
0.9
8.0.7、8.0.8、8.0.9、8.0.10
0.8
8.0.6
0.7
8.0.5
0.6
8.0.4
0.6
8.0.3
SQL
Paimon連接器可以在SQL作業中使用,作為源表或者結果表。
語法結構
如果您在Paimon Catalog中創建Paimon表,則無需指定
connector
參數,此時創建Paimon表的語法結構如下。CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( ... );
說明如果您已在Paimon Catalog中創建了Paimon表,后續無需再次創建表即可直接使用。
如果您在其他Catalog中創建Paimon臨時表,則需要指定connector參數與Paimon表的存儲路徑path,此時創建Paimon表的語法結構如下。
CREATE TEMPORARY TABLE paimon_table ( id BIGINT, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'paimon', 'path' = '<path-to-paimon-table-files>', 'auto-create' = 'true', -- 如果指定路徑不存在Paimon表數據文件,則會自動創建文件。 ... );
WITH參數
參數 | 說明 | 數據類型 | 是否必填 | 默認值 | 備注 |
connector | 表類型。 | String | 否 | 無 |
|
path | 表存儲路徑。 | String | 否 | 無 |
|
auto-create | 創建Paimon臨時表時,若指定路徑不存在Paimon表文件,是否自動創建文件。 | Boolean | 否 | false | 參數取值如下:
|
bucket | 每個分區的分桶數。 | Integer | 否 | 1 | 寫入Paimon表的數據將按 說明 建議每個Bucket的數據量在5 GB以下。 |
bucket-key | 分桶關鍵列。 | String | 否 | 無 | 指定將寫入Paimon表的數據按哪些列的值打散至不同的Bucket中。 列名之間用英文逗號(,)分隔,例如 說明
|
changelog-producer | 增量數據產生機制。 | String | 否 | none | Paimon可以為任意輸入數據流產生完整的增量數據(所有的update_after數據都有對應的update_before數據),方便下游消費者。增量數據產生機制的可選值如下:
關于增量數據產生機制的選擇,詳情請參見增量數據產生機制。 |
full-compaction.delta-commits | Full Compaction最大間隔。 | Integer | 否 | 無 | 該參數指定了每commit snapshot多少次之后,一定會進行一次Full Compaction。 |
lookup.cache-max-memory-size | Paimon維表的內存緩存大小。 | String | 否 | 256 MB | 該參數值會同時影響維表緩存大小和lookup changelog-producer的緩存大小,兩個機制的緩存大小都由該參數配置。 |
merge-engine | 相同primary key數據的合并機制。 | String | 否 | deduplicate | 參數取值如下:
關于數據合并機制的具體分析,詳情請參見數據合并機制。 |
partial-update.ignore-delete | 是否忽略delete(-D)類型的消息。 | Boolean | 否 | false | 參數取值如下:
說明
|
ignore-delete | 是否忽略delete(-D)類型的消息。 | Boolean | 否 | false | 參數取值同partial-update.ignore-delete。 說明
|
partition.default-name | 分區默認名稱。 | String | 否 | __DEFAULT_PARTITION__ | 如果分區列的值為null或空字符串,將會采用該默認名稱作為分區名。 |
partition.expiration-check-interval | 多久檢查一次分區過期。 | String | 否 | 1h | 詳情請參見如何設置分區自動過期? |
partition.expiration-time | 分區的過期時長。 | String | 否 | 無 | 當一個分區的存活時長超過該值時,該分區將會過期,默認永不過期。 一個分區的存活時長由該分區的分區值計算而來,詳情請參見如何設置分區自動過期? |
partition.timestamp-formatter | 將時間字符串轉換為時間戳的格式串。 | String | 否 | 無 | 設置從分區值提取分區存活時長的格式,詳情請參見如何設置分區自動過期? |
partition.timestamp-pattern | 將分區值轉換為時間字符串的格式串。 | String | 否 | 無 | 設置從分區值提取分區存活時長的格式,詳情請參見如何設置分區自動過期? |
scan.bounded.watermark | 當Paimon源表產生的數據的watermark超過該值時,Paimon源表將會結束產生數據。 | Long | 否 | 無 | 無。 |
scan.mode | 指定Paimon源表的消費位點。 | String | 否 | default | 詳情請參見如何設置Paimon源表的消費位點? |
scan.snapshot-id | 指定Paimon源表從哪個snapshot開始消費。 | Integer | 否 | 無 | 詳情請參見如何設置Paimon源表的消費位點? |
scan.timestamp-millis | 指定Paimon源表從哪個時間點開始消費。 | Integer | 否 | 無 | 詳情請參見如何設置Paimon源表的消費位點? |
snapshot.num-retained.max | 至多保留幾個最新Snapshot不過期。 | Integer | 否 | 2147483647 | 只要滿足該配置或snapshot.time-retained其中之一,并同時滿足snapshot.num-retained.min,就會觸發Snapshot過期。 |
snapshot.num-retained.min | 至少保留幾個最新Snapshot不過期。 | Integer | 否 | 10 | 無。 |
snapshot.time-retained | Snapshot產生多久以后會過期。 | String | 否 | 1h | 只要滿足該配置或snapshot.num-retained.max其中之一,并同時滿足snapshot.num-retained.min,就會觸發snapshot過期。 |
write-mode | Paimon表的寫入模式。 | String | 否 | change-log | 參數取值如下:
關于寫入模式的具體介紹,詳情請參見寫入模式。 |
scan.infer-parallelism | 是否自動推斷Paimon源表的并發度。 | Boolean | 否 | false | 參數取值如下:
|
scan.parallelism | Paimon源表的并發度。 | Integer | 否 | 無 | 說明 在作業 頁簽中,資源模式為專家模式時,該參數不生效。 |
sink.parallelism | Paimon結果表的并發度。 | Integer | 否 | 無 | 說明 在作業 頁簽中,資源模式為專家模式時,該參數不生效。 |
sink.clustering.by-columns | 指定寫入Paimon結果表的聚類列。 | String | 否 | 無 | 對于Paimon Append Only表(非主鍵表),在批作業中配置該參數可以啟用聚類寫入功能,使數據在特定列上按大小范圍聚集分布,從而提升該表的查詢速度。 多個列名請使用英文逗號(,)進行分隔,例如 聚類詳情請參見Apache Paimon官方文檔。 |
sink.delete-strategy? | 設定校驗策略,確保系統能正確處理回撤(-D/-U)類型消息。 ?? | Enum | 否 | NONE | 校驗策略取值及Sink算子應當正確處理回撤消息的行為如下:?
說明
|
更多配置項詳情請參見Apache Paimon官方文檔。
特色功能詳解
數據新鮮度與一致性保證
Paimon結果表使用兩階段提交協議,在每次Flink作業的checkpoint期間提交寫入的數據,因此數據新鮮度即為Flink作業的checkpoint間隔。每次提交將會產生至多兩個snapshot。
當兩個Flink作業同時寫入一張Paimon表時,如果兩個作業的數據沒有寫入同一個分桶,則能保證serializable級別的一致性。如果兩個作業的數據寫入了同一個分桶,則只能保證snapshot isolation級別的一致性。也就是說,表中的數據可能混合了兩個作業的結果,但不會有數據丟失。
數據合并機制
當Paimon結果表收到多條具有相同primary key的數據時,為了保持primary key的唯一性,Paimon結果表會將這些數據合并成一條數據。通過指定merge-engine
參數,您可以指定數據合并的具體行為。數據合并機制詳情如下表所示。
合并機制 | 詳情 |
去重(Deduplicate) | 去重機制(deduplicate)是默認的數據合并機制。對于多條具有相同primary key的數據,Paimon結果表僅會保留最新一條數據,并丟棄其它具有primary key的數據。 說明 如果最新一條數據是一條delete消息,所有具有該primary key的數據都將被丟棄。 |
部分更新(Partial Update) | 通過指定部分更新機制(partial-update),您可以通過多條消息對數據進行逐步更新,并最終得到完整的數據。具體來說,具有相同primary key的新數據將會覆蓋原來的數據,但值為null的列不會進行覆蓋。 例如,假設Paimon結果表按順序收到了以下三條數據:
第一列是primary key,則最終結果為<1, 25.2, 10, 'This is a book'>。 說明
|
預聚合(Aggregation) | 部分場景下,可能只關心聚合后的值。預聚合機制(aggregation)將具有相同primary key的數據根據您指定的聚合函數進行聚合。對于不屬于primary key的每一列,都需要通過
price列將會根據max函數進行聚合,而sales列將會根據sum函數進行聚合。給定兩條輸入數據 <1, 23.0, 15>和 <1, 30.2, 20>,最終結果為<1, 30.2, 35>。當前支持的聚合函數與對應的數據類型如下:
說明
|
增量數據產生機制
通過changelog-producer參數設置相應的增量數據產生機制,Paimon可以為任意輸入數據流產生完整的增量數據(所有的update_after數據都有對應的update_before數據)。以下列舉了所有的增量數據產生機制,更加詳細的介紹請參見Apache Paimon官方文檔。
機制 | 詳情 |
None | 設置 例如,假設下游消費者需要計算某一列的總和,如果消費者只看到了最新數據5,它無法斷定該如何更新總和。因為如果之前的數據是4,它應該將總和增加1;如果之前的數據是6,它應該將總和減去1。此類消費者對update_before較為敏感,建議不要將增量數據產生機制配置為None,但是其他增量數據產生機制會帶來性能損耗。 說明 如果您的下游是數據庫之類的對update_before數據不敏感的消費者,則可以將增量數據產生機制配置為None。因此,建議您根據實際需要配置增量數據產生機制。 |
Input | 設置 因此,只有當輸入數據流本身是完整的增量數據時(例如CDC數據),才能使用這一增量數據產生機制。 |
Lookup | 設置 與下文的Full Compaction機制相比,Lookup機制產生增量數據的時效性更好,但總體來看耗費的資源更多。 推薦在對增量數據的新鮮度有較高要求(例如分鐘級)的情況下使用。 |
Full Compaction | 設置 與上文的Lookup機制相比,Full Compaction機制產生增量數據的時效性更差,但它利用了數據的full compaction過程,不產生額外計算,因此總體來看耗費的資源更少。 推薦在對增量數據的新鮮度要求不高(例如小時級)的情況下使用。 |
寫入模式
Paimon表目前支持的寫入模式如下。
模式 | 詳情 |
Change-log | change-log寫入模式是Paimon表的默認寫入模式。該寫入模式支持根據primary key對數據進行插入、刪除與更新,您也可以在該寫入模式下使用上文提到的數據合并機制與增量數據產生機制。 |
Append-only | append-only寫入模式僅支持數據的插入,且不支持primary key。該模式比change-log模式更加高效,可在對數據新鮮度要求一般的場景下(例如分鐘級新鮮度)作為消息隊列的替代品。 關于append-only寫入模式的詳細介紹,請參見Apache Paimon官方文檔。在使用append-only寫入模式時,需要注意以下兩點:
|
作為CTAS和CDAS的目標端
Paimon表支持實時同步單表或整庫級別的數據,在同步過程之中如果上游的表結構發生了變更也會實時同步到Paimon表中。詳見管理Paimon表和管理Paimon Catalog。
數據攝入
Paimon連接器可以用于數據攝入YAML作業開發,作為目標端寫入。
語法結構
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /path/warehouse
配置項
參數 | 說明 | 是否必填 | 數據類型 | 默認值 | 備注 |
type | 連接器類型。 | 是 | STRING | 無 | 固定值為 |
name | 目標端名稱。 | 否 | STRING | 無 | Sink的名稱。 |
catalog.properties.metastore | Paimon Catalog的類型。 | 否 | STRING | filesystem | 取值如下:
|
catalog.properties.* | 創建Paimon Catalog的參數。 | 否 | STRING | 無 | 詳情請參見管理Paimon Catalog。 |
table.properties.* | 創建Paimon table的參數。 | 否 | STRING | 無 | 詳情請參見Paimon table options。 |
catalog.properties.warehouse | 文件存儲的根目錄。 | 否 | STRING | 無 | 僅在 |
commit.user | 提交數據文件時的用戶名。 | 否 | STRING | 無 | 說明 建議為不同的作業設置不同的用戶名,方便在出現提交沖突時定位沖突的作業。 |
partition.key | 每個分區表的分區字段。 | 否 | STRING | 無 | 不同的表使用 |
使用示例
使用Paimon作為數據攝入目標端時,根據Paimon Catalog的類型不同可以參考下面的示例進行配置。
Paimon Catalog為filesystem,寫入阿里云OSS的配置示例:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: ${mysql.source.table}
server-id: 8601-8604
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: oss://default/test
catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
catalog.properties.fs.oss.accessKeyId: xxxxxxxx
catalog.properties.fs.oss.accessKeySecret: xxxxxxxx
其中,catalog.properties前綴的參數含義請參見創建Paimon Filesystem Catalog。
Paimon Catalog為dlf-paimon,寫入阿里云DLF2.0的配置示例:
source:
type: mysql
name: MySQL Source
hostname: ${secret_values.mysql.hostname}
port: ${mysql.port}
username: ${secret_values.mysql.username}
password: ${secret_values.mysql.password}
tables: ${mysql.source.table}
server-id: 8601-8604
sink:
type: paimon
name: Paimon Sink
sink:
type: paimon
name: Paimon Sink
catalog.properties.metastore: dlf-paimon
catalog.properties.dlf.endpoint: dlfnext-vpc.cn-hangzhou.aliyuncs.com
catalog.properties.dlf.region: cn-hangzhou
catalog.properties.dlf.catalog.instance.id: clg-paimon-927606********35b48a444ee
其中,catalog.properties前綴的參數含義請參見創建Paimon DLF Catalog。