日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

流式數據湖倉Paimon

流式數據湖倉Paimon連接器推薦配合Paimon Catalog使用,本文為您介紹如何使用流式數據湖倉Paimon連接器。

背景信息

Apache Paimon是一種流批統一的湖存儲格式,支持高吞吐的寫入和低延遲的查詢。目前阿里云開源大數據平臺E-MapReduce常見的計算引擎(例如Flink、Spark、Hive或Trino)都與Paimon有著較為完善的集成度。您可以借助Apache Paimon快速地在HDFS或者云端OSS上構建自己的數據湖存儲服務,并接入上述計算引擎實現數據湖的分析,詳情請參見Apache Paimon

類別

詳情

支持類型

源表、維表和結果表,數據攝入目標端

運行模式

流模式和批模式

數據格式

暫不支持

特有監控指標

暫無

API種類

SQL,數據攝入YAML作業

是否支持更新或刪除結果表數據

特色功能

目前Apache Paimon提供以下核心能力:

  • 基于HDFS或者對象存儲構建低成本的輕量級數據湖存儲服務。

  • 支持在流模式與批模式下讀寫大規模數據集。

  • 支持分鐘級到秒級數據新鮮度的批查詢和OLAP查詢。

  • 支持消費與產生增量數據,可作為傳統的離線數倉和新型的流式數倉的各級存儲。

  • 支持預聚合數據,降低存儲成本與下游計算壓力。

  • 支持回溯歷史版本的數據。

  • 支持高效的數據過濾。

  • 支持表結構變更。

使用限制

  • 僅Flink計算引擎VVR 6.0.6及以上版本支持Paimon連接器。

  • Paimon與VVR版本對應關系詳情如下表所示。

    Paimon社區版本

    實時計算Flink版引擎版本(VVR )

    0.9

    8.0.7、8.0.8、8.0.9、8.0.10

    0.8

    8.0.6

    0.7

    8.0.5

    0.6

    8.0.4

    0.6

    8.0.3

SQL

Paimon連接器可以在SQL作業中使用,作為源表或者結果表。

語法結構

  • 如果您在Paimon Catalog中創建Paimon表,則無需指定connector參數,此時創建Paimon表的語法結構如下。

    CREATE TABLE `<YOUR-PAIMON-CATALOG>`.`<YOUR-DB>`.paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      ...
    );
    說明

    如果您已在Paimon Catalog中創建了Paimon表,后續無需再次創建表即可直接使用。

  • 如果您在其他Catalog中創建Paimon臨時表,則需要指定connector參數與Paimon表的存儲路徑path,此時創建Paimon表的語法結構如下。

    CREATE TEMPORARY TABLE paimon_table (
      id BIGINT,
      data STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'paimon',
      'path' = '<path-to-paimon-table-files>',
      'auto-create' = 'true', -- 如果指定路徑不存在Paimon表數據文件,則會自動創建文件。
      ...
    );

WITH參數

參數

說明

數據類型

是否必填

默認值

備注

connector

表類型。

String

  • 如果在Paimon Catalog中創建Paimon表,則無需填寫。

  • 如果在其他Catalog中創建Paimon臨時表,則固定值為paimon

path

表存儲路徑。

String

  • 如果在Paimon Catalog中創建Paimon表,則無需填寫。

  • 如果在其他Catalog中創建Paimon臨時表,則為表在HDFS或OSS中的存儲目錄。

auto-create

創建Paimon臨時表時,若指定路徑不存在Paimon表文件,是否自動創建文件。

Boolean

false

參數取值如下:

  • false(默認):如果指定路徑不存在Paimon表文件,則報錯。

  • true:如果指定路徑不存在,則Flink系統自動創建Paimon表文件。

bucket

每個分區的分桶數。

Integer

1

寫入Paimon表的數據將按bucket-key打散至每個bucket中。

說明

建議每個Bucket的數據量在5 GB以下。

bucket-key

分桶關鍵列。

String

指定將寫入Paimon表的數據按哪些列的值打散至不同的Bucket中。

列名之間用英文逗號(,)分隔,例如'bucket-key' = 'order_id,cust_id'會將數據按order_id列和cust_id列的值進行打散。

說明
  • 如果該參數未填寫,則按primary key進行打散。

  • 如果Paimon表未指定primary key,則按所有列的值進行打散。

changelog-producer

增量數據產生機制。

String

none

Paimon可以為任意輸入數據流產生完整的增量數據(所有的update_after數據都有對應的update_before數據),方便下游消費者。增量數據產生機制的可選值如下:

  • none(默認值):不額外產生增量數據。下游仍然可以流讀Paimon表,但讀到的增量數據是不完整的(只有update_after數據,沒有對應的update_before數據)。

  • input:將輸入數據流雙寫至增量數據文件中,作為增量數據。

  • full-compaction:每次Full Compaction產生完整的增量數據。

  • lookup:每次commit snapshot前產生完整的增量數據。

關于增量數據產生機制的選擇,詳情請參見增量數據產生機制

full-compaction.delta-commits

Full Compaction最大間隔。

Integer

該參數指定了每commit snapshot多少次之后,一定會進行一次Full Compaction。

lookup.cache-max-memory-size

Paimon維表的內存緩存大小。

String

256 MB

該參數值會同時影響維表緩存大小和lookup changelog-producer的緩存大小,兩個機制的緩存大小都由該參數配置。

merge-engine

相同primary key數據的合并機制。

String

deduplicate

參數取值如下:

  • deduplicate:僅保留最新一條。

  • partial-update:用最新數據中非null的列更新相同primary key的現有數據,其它列保持不變。

  • aggregation:通過指定聚合函數進行預聚合。

關于數據合并機制的具體分析,詳情請參見數據合并機制

partial-update.ignore-delete

是否忽略delete(-D)類型的消息。

Boolean

false

參數取值如下:

  • true:忽略delete消息。

  • false:不忽略delete消息。您需要通過sequence.field等配置項來設定Sink對于delete數據的處理策略,否則可能會拋出IllegalStateException或IllegalArgumentException報錯。

說明
  • 在實時計算引擎VVR 8.0.6及以下版本,該參數只在partial update場景下,merge-engine = partial-update時生效。

  • 在實時計算引擎VVR 8.0.7及以上版本,該參數兼容適配非partial update場景,與ignore-delete參數功能一致,推薦替換成ignore-delete

  • 請您根據實際業務場景,判斷出現的delete類型數據是否符合預期,從而決定是否啟用該參數。如果delete類型數據所代表的作業語義不符合預期,則拋出錯誤是更合適的選擇。

ignore-delete

是否忽略delete(-D)類型的消息。

Boolean

false

參數取值同partial-update.ignore-delete

說明
  • 僅實時計算引擎VVR 8.0.7及以上版本支持該參數。

  • 與partial-update.ignore-delete參數功能一致,推薦使用ignore-delete參數,并避免同時配置這兩參數。

partition.default-name

分區默認名稱。

String

__DEFAULT_PARTITION__

如果分區列的值為null或空字符串,將會采用該默認名稱作為分區名。

partition.expiration-check-interval

多久檢查一次分區過期。

String

1h

詳情請參見如何設置分區自動過期?

partition.expiration-time

分區的過期時長。

String

當一個分區的存活時長超過該值時,該分區將會過期,默認永不過期。

一個分區的存活時長由該分區的分區值計算而來,詳情請參見如何設置分區自動過期?

partition.timestamp-formatter

將時間字符串轉換為時間戳的格式串。

String

設置從分區值提取分區存活時長的格式,詳情請參見如何設置分區自動過期?

partition.timestamp-pattern

將分區值轉換為時間字符串的格式串。

String

設置從分區值提取分區存活時長的格式,詳情請參見如何設置分區自動過期?

scan.bounded.watermark

當Paimon源表產生的數據的watermark超過該值時,Paimon源表將會結束產生數據。

Long

無。

scan.mode

指定Paimon源表的消費位點。

String

default

詳情請參見如何設置Paimon源表的消費位點?

scan.snapshot-id

指定Paimon源表從哪個snapshot開始消費。

Integer

詳情請參見如何設置Paimon源表的消費位點?

scan.timestamp-millis

指定Paimon源表從哪個時間點開始消費。

Integer

詳情請參見如何設置Paimon源表的消費位點?

snapshot.num-retained.max

至多保留幾個最新Snapshot不過期。

Integer

2147483647

只要滿足該配置或snapshot.time-retained其中之一,并同時滿足snapshot.num-retained.min,就會觸發Snapshot過期。

snapshot.num-retained.min

至少保留幾個最新Snapshot不過期。

Integer

10

無。

snapshot.time-retained

Snapshot產生多久以后會過期。

String

1h

只要滿足該配置或snapshot.num-retained.max其中之一,并同時滿足snapshot.num-retained.min,就會觸發snapshot過期。

write-mode

Paimon表的寫入模式。

String

change-log

參數取值如下:

  • change-log:Paimon表支持根據primary key進行數據的插入、刪除和更新。

  • append-only:Paimon表只接受數據的插入,且不支持primary key。該模式比change-log模式更加高效。

關于寫入模式的具體介紹,詳情請參見寫入模式

scan.infer-parallelism

是否自動推斷Paimon源表的并發度。

Boolean

false

參數取值如下:

  • true:將會根據分桶數自動推斷Paimon源表的并發度。

  • false:按VVP配置的默認并發。如果是專家模式就按用戶配置的并發。

scan.parallelism

Paimon源表的并發度。

Integer

說明

在作業部署詳情 > 資源配置頁簽中,資源模式為專家模式時,該參數不生效。

sink.parallelism

Paimon結果表的并發度。

Integer

說明

在作業部署詳情 > 資源配置頁簽中,資源模式為專家模式時,該參數不生效。

sink.clustering.by-columns

指定寫入Paimon結果表的聚類列。

String

對于Paimon Append Only表(非主鍵表),在批作業中配置該參數可以啟用聚類寫入功能,使數據在特定列上按大小范圍聚集分布,從而提升該表的查詢速度。

多個列名請使用英文逗號(,)進行分隔,例如'col1,col2'

聚類詳情請參見Apache Paimon官方文檔

sink.delete-strategy?

設定校驗策略,確保系統能正確處理回撤(-D/-U)類型消息。

??

Enum

NONE

校驗策略取值及Sink算子應當正確處理回撤消息的行為如下:?

  • ?NONE(默認值):不做校驗。?

  • IGNORE_DELETE:Sink算子應當忽略-U和-D類型的消息,不發生回撤。

  • NON_PK_FIELD_TO_NULL:Sink算子應當忽略-U類型的消息,但是在收到-D類型的消息時,保持主鍵值不變、回撤Schema中其他非主鍵值。

    主要用在多個Sink同時寫入同一張表時部分更新的場景。?

  • DELETE_ROW_ON_PK:Sink算子應當忽略-U類型的消息,但是在收到-D類型的消息時刪除主鍵對應的行。?

  • CHANGELOG_STANDARD:Sink算子應當在收到-U和-D類型的數據時均會刪除主鍵對應的行。?

說明
  • 僅實時計算引擎VVR 8.0.8及以上版本支持該參數。

  • Paimon Sink處理回撤消息的行為實際由ignore-delete、merge-engine等其他配置項的值決定。本配置項不直接影響這部分行為,而是會校驗這部分行為是否符合預期策略。在不符合預期策略的情況下,相關校驗步驟將終止,并在作業報錯中提示您如何修改ignore-delete、merge-engine等其他配置項以符合預期。

說明

更多配置項詳情請參見Apache Paimon官方文檔

特色功能詳解

數據新鮮度與一致性保證

Paimon結果表使用兩階段提交協議,在每次Flink作業的checkpoint期間提交寫入的數據,因此數據新鮮度即為Flink作業的checkpoint間隔。每次提交將會產生至多兩個snapshot。

當兩個Flink作業同時寫入一張Paimon表時,如果兩個作業的數據沒有寫入同一個分桶,則能保證serializable級別的一致性。如果兩個作業的數據寫入了同一個分桶,則只能保證snapshot isolation級別的一致性。也就是說,表中的數據可能混合了兩個作業的結果,但不會有數據丟失。

數據合并機制

當Paimon結果表收到多條具有相同primary key的數據時,為了保持primary key的唯一性,Paimon結果表會將這些數據合并成一條數據。通過指定merge-engine參數,您可以指定數據合并的具體行為。數據合并機制詳情如下表所示。

合并機制

詳情

去重(Deduplicate)

去重機制(deduplicate)是默認的數據合并機制。對于多條具有相同primary key的數據,Paimon結果表僅會保留最新一條數據,并丟棄其它具有primary key的數據。

說明

如果最新一條數據是一條delete消息,所有具有該primary key的數據都將被丟棄。

部分更新(Partial Update)

通過指定部分更新機制(partial-update),您可以通過多條消息對數據進行逐步更新,并最終得到完整的數據。具體來說,具有相同primary key的新數據將會覆蓋原來的數據,但值為null的列不會進行覆蓋。

例如,假設Paimon結果表按順序收到了以下三條數據:

  • <1, 23.0, 10, NULL>

  • <1, NULL, NULL, 'This is a book'>

  • <1, 25.2, NULL, NULL>

第一列是primary key,則最終結果為<1, 25.2, 10, 'This is a book'>。

說明
  • 如果需要流讀partial-update的結果,必須將changelog-producer參數設置為lookup或full-compaction。

  • partial-update無法處理delete消息。您可以設置partial-update.ignore-delete參數以忽略delete消息。

預聚合(Aggregation)

部分場景下,可能只關心聚合后的值。預聚合機制(aggregation)將具有相同primary key的數據根據您指定的聚合函數進行聚合。對于不屬于primary key的每一列,都需要通過fields.<field-name>.aggregate-function指定一個聚合函數,否則該列將默認使用last_non_null_value聚合函數。例如,考慮以下Paimon表的定義。

CREATE TABLE MyTable (
  product_id BIGINT,
  price DOUBLE,
  sales BIGINT,
  PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
  'merge-engine' = 'aggregation',
  'fields.price.aggregate-function' = 'max',
  'fields.sales.aggregate-function' = 'sum'
);

price列將會根據max函數進行聚合,而sales列將會根據sum函數進行聚合。給定兩條輸入數據 <1, 23.0, 15>和 <1, 30.2, 20>,最終結果為<1, 30.2, 35>。當前支持的聚合函數與對應的數據類型如下:

  • sum:支持DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT和DOUBLE。

  • min和max:支持DECIMAL、TINYINT、SMALLINT、INTEGER、BIGINT、FLOAT、DOUBLE、DATE、TIME、TIMESTAMP和TIMESTAMP_LTZ。

  • last_value和last_non_null_value:支持所有數據類型。

  • listagg:支持STRING。

  • bool_and和bool_or:支持BOOLEAN。

說明
  • 只有sum函數支持回撤與刪除數據,其它聚合函數不支持回撤與刪除。如果您需要某些列忽略回撤與刪除消息,可以設置'fields.${field_name}.ignore-retract'='true'

  • 如果需要流讀aggregation的結果,必須將changelog-producer參數設置為lookup或full-compaction。

增量數據產生機制

通過changelog-producer參數設置相應的增量數據產生機制,Paimon可以為任意輸入數據流產生完整的增量數據(所有的update_after數據都有對應的update_before數據)。以下列舉了所有的增量數據產生機制,更加詳細的介紹請參見Apache Paimon官方文檔

機制

詳情

None

設置changelog-producer為none(默認值)后,此時,對于同一個primary key,下游的Paimon源表只能看到數據的最新情況。但這些最新情況無法讓下游消費者方便地了解完整的增量數據,從而進行正確的計算。因為它只能確定對應數據是否被刪除了,或最新數據是什么,無法得知更改之前的數據是什么。

例如,假設下游消費者需要計算某一列的總和,如果消費者只看到了最新數據5,它無法斷定該如何更新總和。因為如果之前的數據是4,它應該將總和增加1;如果之前的數據是6,它應該將總和減去1。此類消費者對update_before較為敏感,建議不要將增量數據產生機制配置為None,但是其他增量數據產生機制會帶來性能損耗。

說明

如果您的下游是數據庫之類的對update_before數據不敏感的消費者,則可以將增量數據產生機制配置為None。因此,建議您根據實際需要配置增量數據產生機制。

Input

設置changelog-producer為input后,Paimon結果表會將輸入數據流雙寫至增量數據文件中,作為增量數據。

因此,只有當輸入數據流本身是完整的增量數據時(例如CDC數據),才能使用這一增量數據產生機制。

Lookup

設置changelog-producer為lookup后,Paimon結果表會通過一種類似于維表的點查機制,在每次commit snapshot之前產生本次snapshot對應的完整增量數據。無論輸入數據是否為完整的增量數據,這一增量數據產生機制均能產生完整的增量數據。

與下文的Full Compaction機制相比,Lookup機制產生增量數據的時效性更好,但總體來看耗費的資源更多。

推薦在對增量數據的新鮮度有較高要求(例如分鐘級)的情況下使用。

Full Compaction

設置changelog-producer為full-compaction后,Paimon結果表會在每一次full compaction時產生完整的增量數據。無論輸入數據是否為完整的增量數據,這一增量數據產生機制均能產生完整的增量數據。Full compaction的時間間隔由full-compaction.delta-commits參數指定。

與上文的Lookup機制相比,Full Compaction機制產生增量數據的時效性更差,但它利用了數據的full compaction過程,不產生額外計算,因此總體來看耗費的資源更少。

推薦在對增量數據的新鮮度要求不高(例如小時級)的情況下使用。

寫入模式

Paimon表目前支持的寫入模式如下。

模式

詳情

Change-log

change-log寫入模式是Paimon表的默認寫入模式。該寫入模式支持根據primary key對數據進行插入、刪除與更新,您也可以在該寫入模式下使用上文提到的數據合并機制與增量數據產生機制。

Append-only

append-only寫入模式僅支持數據的插入,且不支持primary key。該模式比change-log模式更加高效,可在對數據新鮮度要求一般的場景下(例如分鐘級新鮮度)作為消息隊列的替代品。

關于append-only寫入模式的詳細介紹,請參見Apache Paimon官方文檔。在使用append-only寫入模式時,需要注意以下兩點:

  • 建議您根據實際需求設置bucket-key參數,否則Paimon表將根據所有列的值進行分桶,計算效率較低。

  • append-only寫入模式可在一定程度上保證數據的產出順序,具體的產出順序為:

    1. 如果兩條數據來自不同的分區,若設置了scan.plan-sort-partition參數,則分區值較小的數據將首先產出。否則來自較早創建的分區的數據將首先產出。

    2. 如果兩條數據來自同一分區的同一分桶,則較早寫入的數據將首先產出。

    3. 如果兩條數據來自同一分區的不同分桶,由于不同分桶由不同的并發進行處理,因此不保證兩條數據的產出順序。

作為CTAS和CDAS的目標端

Paimon表支持實時同步單表或整庫級別的數據,在同步過程之中如果上游的表結構發生了變更也會實時同步到Paimon表中。詳見管理Paimon表管理Paimon Catalog

數據攝入

Paimon連接器可以用于數據攝入YAML作業開發,作為目標端寫入。

語法結構

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: /path/warehouse

配置項

參數

說明

是否必填

數據類型

默認值

備注

type

連接器類型。

STRING

固定值為paimon

name

目標端名稱。

STRING

Sink的名稱。

catalog.properties.metastore

Paimon Catalog的類型。

STRING

filesystem

取值如下:

  • filesystem(默認值)

  • dlf-paimon(僅支持DLF 2.0版本)

catalog.properties.*

創建Paimon Catalog的參數。

STRING

詳情請參見管理Paimon Catalog

table.properties.*

創建Paimon table的參數。

STRING

詳情請參見Paimon table options

catalog.properties.warehouse

文件存儲的根目錄。

STRING

僅在catalog.properties.metastore設置為 filesystem時生效。

commit.user

提交數據文件時的用戶名。

STRING

說明

建議為不同的作業設置不同的用戶名,方便在出現提交沖突時定位沖突的作業。

partition.key

每個分區表的分區字段。

STRING

不同的表使用;分割,不同的字段使用,分割,表與字段使用:分割。例如:testdb.table1:id1,id2;testdb.table2:name

使用示例

使用Paimon作為數據攝入目標端時,根據Paimon Catalog的類型不同可以參考下面的示例進行配置。

  • Paimon Catalog為filesystem,寫入阿里云OSS的配置示例:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: ${mysql.source.table}
  server-id: 8601-8604

sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: filesystem
  catalog.properties.warehouse: oss://default/test
  catalog.properties.fs.oss.endpoint: oss-cn-beijing-internal.aliyuncs.com
  catalog.properties.fs.oss.accessKeyId: xxxxxxxx
  catalog.properties.fs.oss.accessKeySecret: xxxxxxxx

其中,catalog.properties前綴的參數含義請參見創建Paimon Filesystem Catalog

  • Paimon Catalog為dlf-paimon,寫入阿里云DLF2.0的配置示例:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: ${mysql.source.table}
  server-id: 8601-8604

sink:
  type: paimon
  name: Paimon Sink
sink:
  type: paimon
  name: Paimon Sink
  catalog.properties.metastore: dlf-paimon
  catalog.properties.dlf.endpoint: dlfnext-vpc.cn-hangzhou.aliyuncs.com
  catalog.properties.dlf.region: cn-hangzhou
  catalog.properties.dlf.catalog.instance.id: clg-paimon-927606********35b48a444ee

其中,catalog.properties前綴的參數含義請參見創建Paimon DLF Catalog

常見問題