Postgres CDC(公測中)
Postgres CDC可用于依次讀取PostgreSQL數(shù)據(jù)庫全量快照數(shù)據(jù)和變更數(shù)據(jù),保證不多讀一條也不少讀一條數(shù)據(jù)。即使發(fā)生故障,也能采用Exactly Once方式處理。本文為您介紹如何使用Postgres CDC連接器。
背景信息
Postgres CDC連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 源表 說明 您可以使用JDBC作為結(jié)果表和維表連接器。 |
運(yùn)行模式 | 僅支持流模式 |
數(shù)據(jù)格式 | 暫不適用 |
特有監(jiān)控指標(biāo) |
說明
|
API種類 | SQL |
是否支持更新或刪除結(jié)果表數(shù)據(jù) | 不涉及 |
特色功能
Postgres CDC連接器接入CDC增量快照框架(實(shí)時(shí)計(jì)算引擎VVR 8.0.6及以上版本)。Postgres CDC讀取歷史全量數(shù)據(jù)后,自動切換到WAL變更日志讀取,保證不多讀也不少讀數(shù)據(jù)。即使發(fā)生故障,也能保證Exactly Once語義處理數(shù)據(jù)。Postgres CDC源表提供了并發(fā)讀取全量數(shù)據(jù),無鎖讀取和斷點(diǎn)續(xù)傳的能力。
作為源表,功能與優(yōu)勢詳情如下:
流批一體,支持讀取全量和增量數(shù)據(jù),無需維護(hù)兩套流程。
支持并發(fā)讀取全量數(shù)據(jù),性能水平擴(kuò)展。
全量讀取無縫切換增量讀取,自動縮容,節(jié)省計(jì)算資源。
全量階段讀取支持?jǐn)帱c(diǎn)續(xù)傳,更穩(wěn)定。
無鎖讀取全量數(shù)據(jù),不影響線上業(yè)務(wù)。
前提條件
Postgres CDC連接器通過PostgreSQL數(shù)據(jù)庫的邏輯復(fù)制讀取CDC變更流數(shù)據(jù),支持阿里云RDS PostgreSQL、Amazon RDS PostgreSQL和自建PostgreSQL。
阿里云RDS PostgreSQL、Amazon RDS PostgreSQL或者自建PostgreSQL上相應(yīng)的配置可能有差異,請您在使用之前詳細(xì)閱讀配置Postgres文檔進(jìn)行相關(guān)配置。
完成配置后確保有下列的條件:
wal_level參數(shù)的值需設(shè)置為logical,即在預(yù)寫式日志W(wǎng)AL(Write-ahead logging)中增加支持邏輯編碼所需的信息。
訂閱表的REPLICA IDENTITY為FULL(發(fā)出的插入和更新操作事件包含表中所有列的舊值),以保障該表數(shù)據(jù)同步的一致性。
說明REPLICA IDENTITY是PostgreSQL特有的表級設(shè)置,它決定了邏輯解碼插件在發(fā)生(INSERT)和更新(UPDATE)事件時(shí),是否包含涉及的表列的舊值。REPLICA IDENTITY取值含義詳情請參見REPLICA IDENTITY。
需要確保max_wal_senders和max_replication_slots的參數(shù)值均大于當(dāng)前數(shù)據(jù)庫復(fù)制槽已使用數(shù)與Flink作業(yè)所需要的slot數(shù)量。
確保賬戶系統(tǒng)權(quán)限為SUPERUSER或者同時(shí)擁有LOGIN和REPLICATION權(quán)限,并且具有訂閱表的SELECT權(quán)限用于全量數(shù)據(jù)查詢。
注意事項(xiàng)
僅實(shí)時(shí)計(jì)算引擎8.0.6及以上版本支持Postgres CDC增量快照功能。
請及時(shí)管理Replication Slot,以免出現(xiàn)磁盤空間浪費(fèi)的問題。
為了防止在Flink作業(yè)重啟過程中由于Checkpoint對應(yīng)的WAL(Write-Ahead Log)段被清除而引發(fā)數(shù)據(jù)丟失,F(xiàn)link作業(yè)不會自動移除Replication Slot。因此,如果確認(rèn)特定的Flink作業(yè)不會再次啟動,應(yīng)當(dāng)手動刪除相關(guān)的Replication Slot,以釋放其占用的資源。另外,如果PostgreSQL的Replication Slot的確認(rèn)位點(diǎn)長時(shí)間不向前推進(jìn),PostgreSQL不會清理該槽位點(diǎn)之后的WAL條目,這可能會導(dǎo)致未使用的WAL積累而占用過多的磁盤空間。
開啟增量快照時(shí),Postgres CDC連接器必須開啟Checkpoint,并且Source表必須聲明主鍵。Source多并發(fā)讀取全量數(shù)據(jù)時(shí)會創(chuàng)建多個(gè)臨時(shí)的Replication Slot。
不開啟增量快照讀取的PostgreSQL CDC Source僅支持單一并發(fā),因此只需要一個(gè)全局Slot。當(dāng)開啟增量快照時(shí),PostgreSQL CDC Source在全量階段所需的最大Slot數(shù)量為
Source數(shù)量 * 并發(fā)數(shù) + 1
。進(jìn)入增量階段后,系統(tǒng)自動回收在全量階段創(chuàng)建的Slot,僅保留一個(gè)全局Slot。如果Slot數(shù)量有限,需要控制全量階段的并發(fā)數(shù)量,這樣做的缺點(diǎn)是會降低讀取速度。如果下游算子或存儲支持冪等性,可以啟用scan.incremental.snapshot.backfill.skip = true
以跳過全量階段的日志讀取,這樣做的缺點(diǎn)是僅能提供至少一次(At-Least Once)的語義保證。如果SQL要做聚合、關(guān)聯(lián)等操作,不建議跳過全量階段日志的讀取。
不開啟增量快照時(shí),Postgres CDC連接器不支持在全表掃描階段執(zhí)行Checkpoint。
不開啟增量快照時(shí),如果您的作業(yè)在全表掃描階段觸發(fā)Checkpoint,則可能由于Checkpoint超時(shí)導(dǎo)致作業(yè)Failover。因此,建議您在其他配置中配置如下參數(shù),具體操作請參見如何配置作業(yè)運(yùn)行參數(shù)?。避免在全量同步階段由于Checkpoint超時(shí)導(dǎo)致Failover。
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647
相關(guān)的參數(shù)說明詳情如下表所示。
參數(shù)
說明
備注
execution.checkpointing.interval
Checkpoint的時(shí)間間隔。
單位是Duration類型,例如10min或30s。
execution.checkpointing.tolerable-failed-checkpoints
容忍Checkpoint失敗的次數(shù)。
該參數(shù)的取值與Checkpoint調(diào)度間隔時(shí)間的乘積就是允許的快照讀取時(shí)間。
說明如果表特別大,建議將該參數(shù)值配置得大一些。
restart-strategy
重啟策略。
參數(shù)取值如下:
fixed-delay:固定延遲重啟策略。
failure-rate:故障率重啟策略。
exponential-delay:指數(shù)延遲重啟策略。
詳情請參見Restart Strategies。
restart-strategy.fixed-delay.attempts
固定延遲重啟策略下,嘗試重啟的最大次數(shù)。
無。
語法結(jié)構(gòu)
CREATE TABLE postgrescdc_source (
shipment_id INT,
order_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<yourHostname>',
'port' = '5432',
'username' = '<yourUserName>',
'password' = '<yourPassWord>',
'database-name' = '<yourDatabaseName>',
'schema-name' = '<yourSchemaName>',
'table-name' = '<yourTableName>'
);
WITH參數(shù)
參數(shù) | 說明 | 數(shù)據(jù)類型 | 是否必填 | 默認(rèn)值 | 備注 |
connector | connector類型。 | STRING | 是 | 無 | 固定值為 |
hostname | Postgres數(shù)據(jù)庫的IP地址或者Hostname。 | STRING | 是 | 無 | 無。 |
username | Postgres數(shù)據(jù)庫服務(wù)的用戶名。 | STRING | 是 | 無 | 無。 |
password | Postgres數(shù)據(jù)庫服務(wù)的密碼。 | STRING | 是 | 無 | 無。 |
database-name | 數(shù)據(jù)庫名稱。 | STRING | 是 | 無 | 數(shù)據(jù)庫名稱。 |
schema-name | Postgres Schema名稱。 | STRING | 是 | 無 | Schema名稱支持正則表達(dá)式以讀取多個(gè)Schema的數(shù)據(jù)。 |
table-name | Postgres表名。 | STRING | 是 | 無 | 表名支持正則表達(dá)式以讀取多個(gè)表的數(shù)據(jù)。 |
port | Postgres數(shù)據(jù)庫服務(wù)的端口號。 | INTEGER | 否 | 5432 | 無。 |
decoding.plugin.name | Postgres Logical Decoding插件名稱。 | STRING | 否 | decoderbufs | 根據(jù)Postgres服務(wù)上安裝的插件確定。支持的插件列表如下:
|
slot.name | 邏輯解碼槽的名字。 | STRING | 8.0.1版本之前為非必填,從8.0.1版本開始為必填 | 8.0.1版本之前默認(rèn)值為flink,從8.0.1版本開始無默認(rèn)值 | 建議每個(gè)表都設(shè)置 |
debezium.* | Debezium屬性參數(shù)。 | STRING | 否 | 無 | 更細(xì)粒度控制Debezium客戶端的行為。例如 |
scan.incremental.snapshot.enabled | 是否開啟增量快照。 | BOOLEAN | 否 | false | 參數(shù)取值如下:
說明
|
scan.startup.mode | 消費(fèi)數(shù)據(jù)時(shí)的啟動模式。 | STRING | 否 | initial | 參數(shù)取值如下:
|
changelog-mode | 用于編碼流更改的變更日志(Changelog)模式。 | String | 否 | all | 支持的Changelog模式包括:
|
heartbeat.interval.ms | 發(fā)送心跳包的時(shí)間間隔。 | Duration | 否 | 30s | 單位為毫秒。 Postgres CDC連接器主動向數(shù)據(jù)庫發(fā)送心跳包來保證推進(jìn)Slot的偏移量。當(dāng)表變更不頻繁時(shí),設(shè)置該值可以及時(shí)回收WAL日志。 |
scan.incremental.snapshot.chunk.key-column | 指定某一列作為快照階段切分分片的切分列。 | STRING | 否 | 無 | 默認(rèn)從主鍵中選擇第一列。 |
scan.incremental.close-idle-reader.enabled | 是否在快照結(jié)束后關(guān)閉空閑的Reader。 | Boolean | 否 | false | 該配置生效需要設(shè)置 |
scan.incremental.snapshot.backfill.skip | 是否跳過全量階段的日志讀取。 | Boolean | 否 | false | 參數(shù)取值如下:
|
類型映射
PostgreSQL和Flink字段類型對應(yīng)關(guān)系如下。
PostgreSQL字段類型 | Flink字段類型 |
SMALLINT | SMALLINT |
INT2 | |
SMALLSERIAL | |
SERIAL2 | |
INTEGER | INT |
SERIAL | |
BIGINT | BIGINT |
BIGSERIAL | |
REAL | FLOAT |
FLOAT4 | |
FLOAT8 | DOUBLE |
DOUBLE PRECISION | |
NUMERIC(p, s) | DECIMAL(p, s) |
DECIMAL(p, s) | |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIME [(p)] [WITHOUT TIMEZONE] | TIME [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] | TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) | STRING |
CHARACTER(n) | |
VARCHAR(n) | |
CHARACTER VARYING(n) | |
TEXT | |
BYTEA | BYTES |
使用示例
CREATE TABLE source (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'postgres-cdc',
'hostname' = '<host name>',
'port' = '<port>',
'username' = '<user name>',
'password' = '<password>',
'database-name' = '<database name>',
'schema-name' = '<schema name>',
'table-name' = '<table name>'
);
SELECT * FROM source;
相關(guān)文檔
實(shí)時(shí)計(jì)算Flink版支持的連接器列表,請參見支持的連接器。
將數(shù)據(jù)寫入PolarDB PostgreSQL版(Oracle語法兼容1.0)結(jié)果表,請參見PolarDB PostgreSQL版(Oracle語法兼容1.0)。
如果您需要讀寫RDS MySQL、PolarDB for MySQL或者自建MySQL數(shù)據(jù)庫,請使用MySQL連接器。