日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Postgres CDC(公測中)

更新時(shí)間: 2024-10-09 10:56:08

Postgres CDC可用于依次讀取PostgreSQL數(shù)據(jù)庫全量快照數(shù)據(jù)和變更數(shù)據(jù),保證不多讀一條也不少讀一條數(shù)據(jù)。即使發(fā)生故障,也能采用Exactly Once方式處理。本文為您介紹如何使用Postgres CDC連接器。

背景信息

Postgres CDC連接器支持的信息如下。

類別

詳情

支持類型

源表

說明

您可以使用JDBC作為結(jié)果表和維表連接器。

運(yùn)行模式

僅支持流模式

數(shù)據(jù)格式

暫不適用

特有監(jiān)控指標(biāo)

  • currentFetchEventTimeLag:數(shù)據(jù)產(chǎn)生到拉取到Source Operator的間隔。

  • currentEmitEventTimeLag:數(shù)據(jù)產(chǎn)生到離開Source Operator的間隔。

  • sourceIdleTime:source至今有多久不產(chǎn)生新數(shù)據(jù)。

說明
  • currentFetchEventTimeLag與currentEmitEventTimeLag指標(biāo)僅在增量階段有效,全量階段該值恒為0。

  • 指標(biāo)含義詳情,請參見監(jiān)控指標(biāo)說明

API種類

SQL

是否支持更新或刪除結(jié)果表數(shù)據(jù)

不涉及

特色功能

Postgres CDC連接器接入CDC增量快照框架(實(shí)時(shí)計(jì)算引擎VVR 8.0.6及以上版本)。Postgres CDC讀取歷史全量數(shù)據(jù)后,自動切換到WAL變更日志讀取,保證不多讀也不少讀數(shù)據(jù)。即使發(fā)生故障,也能保證Exactly Once語義處理數(shù)據(jù)。Postgres CDC源表提供了并發(fā)讀取全量數(shù)據(jù),無鎖讀取和斷點(diǎn)續(xù)傳的能力。

作為源表,功能與優(yōu)勢詳情如下:

  • 流批一體,支持讀取全量和增量數(shù)據(jù),無需維護(hù)兩套流程。

  • 支持并發(fā)讀取全量數(shù)據(jù),性能水平擴(kuò)展。

  • 全量讀取無縫切換增量讀取,自動縮容,節(jié)省計(jì)算資源。

  • 全量階段讀取支持?jǐn)帱c(diǎn)續(xù)傳,更穩(wěn)定。

  • 無鎖讀取全量數(shù)據(jù),不影響線上業(yè)務(wù)。

前提條件

Postgres CDC連接器通過PostgreSQL數(shù)據(jù)庫的邏輯復(fù)制讀取CDC變更流數(shù)據(jù),支持阿里云RDS PostgreSQL、Amazon RDS PostgreSQL自建PostgreSQL。

重要

阿里云RDS PostgreSQLAmazon RDS PostgreSQL或者自建PostgreSQL上相應(yīng)的配置可能有差異,請您在使用之前詳細(xì)閱讀配置Postgres文檔進(jìn)行相關(guān)配置

完成配置后確保有下列的條件:

  • wal_level參數(shù)的值需設(shè)置為logical,即在預(yù)寫式日志W(wǎng)AL(Write-ahead logging)中增加支持邏輯編碼所需的信息。

  • 訂閱表的REPLICA IDENTITY為FULL(發(fā)出的插入和更新操作事件包含表中所有列的舊值),以保障該表數(shù)據(jù)同步的一致性。

    說明

    REPLICA IDENTITY是PostgreSQL特有的表級設(shè)置,它決定了邏輯解碼插件在發(fā)生(INSERT)和更新(UPDATE)事件時(shí),是否包含涉及的表列的舊值。REPLICA IDENTITY取值含義詳情請參見REPLICA IDENTITY。

  • 需要確保max_wal_senders和max_replication_slots的參數(shù)值均大于當(dāng)前數(shù)據(jù)庫復(fù)制槽已使用數(shù)與Flink作業(yè)所需要的slot數(shù)量。

  • 確保賬戶系統(tǒng)權(quán)限為SUPERUSER或者同時(shí)擁有LOGIN和REPLICATION權(quán)限,并且具有訂閱表的SELECT權(quán)限用于全量數(shù)據(jù)查詢。

注意事項(xiàng)

  • 僅實(shí)時(shí)計(jì)算引擎8.0.6及以上版本支持Postgres CDC增量快照功能。

  • 請及時(shí)管理Replication Slot,以免出現(xiàn)磁盤空間浪費(fèi)的問題。

    為了防止在Flink作業(yè)重啟過程中由于Checkpoint對應(yīng)的WAL(Write-Ahead Log)段被清除而引發(fā)數(shù)據(jù)丟失,F(xiàn)link作業(yè)不會自動移除Replication Slot。因此,如果確認(rèn)特定的Flink作業(yè)不會再次啟動,應(yīng)當(dāng)手動刪除相關(guān)的Replication Slot,以釋放其占用的資源。另外,如果PostgreSQL的Replication Slot的確認(rèn)位點(diǎn)長時(shí)間不向前推進(jìn),PostgreSQL不會清理該槽位點(diǎn)之后的WAL條目,這可能會導(dǎo)致未使用的WAL積累而占用過多的磁盤空間。

  • 開啟增量快照時(shí),Postgres CDC連接器必須開啟Checkpoint,并且Source表必須聲明主鍵。Source多并發(fā)讀取全量數(shù)據(jù)時(shí)會創(chuàng)建多個(gè)臨時(shí)的Replication Slot。

    不開啟增量快照讀取的PostgreSQL CDC Source僅支持單一并發(fā),因此只需要一個(gè)全局Slot。當(dāng)開啟增量快照時(shí),PostgreSQL CDC Source在全量階段所需的最大Slot數(shù)量為Source數(shù)量 * 并發(fā)數(shù) + 1。進(jìn)入增量階段后,系統(tǒng)自動回收在全量階段創(chuàng)建的Slot,僅保留一個(gè)全局Slot。如果Slot數(shù)量有限,需要控制全量階段的并發(fā)數(shù)量,這樣做的缺點(diǎn)是會降低讀取速度。如果下游算子或存儲支持冪等性,可以啟用scan.incremental.snapshot.backfill.skip = true以跳過全量階段的日志讀取,這樣做的缺點(diǎn)是僅能提供至少一次(At-Least Once)的語義保證。如果SQL要做聚合、關(guān)聯(lián)等操作,不建議跳過全量階段日志的讀取。

  • 不開啟增量快照時(shí),Postgres CDC連接器不支持在全表掃描階段執(zhí)行Checkpoint。

    不開啟增量快照時(shí),如果您的作業(yè)在全表掃描階段觸發(fā)Checkpoint,則可能由于Checkpoint超時(shí)導(dǎo)致作業(yè)Failover。因此,建議您在其他配置中配置如下參數(shù),具體操作請參見如何配置作業(yè)運(yùn)行參數(shù)?。避免在全量同步階段由于Checkpoint超時(shí)導(dǎo)致Failover。

    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647

    相關(guān)的參數(shù)說明詳情如下表所示。

    參數(shù)

    說明

    備注

    execution.checkpointing.interval

    Checkpoint的時(shí)間間隔。

    單位是Duration類型,例如10min或30s。

    execution.checkpointing.tolerable-failed-checkpoints

    容忍Checkpoint失敗的次數(shù)。

    該參數(shù)的取值與Checkpoint調(diào)度間隔時(shí)間的乘積就是允許的快照讀取時(shí)間。

    說明

    如果表特別大,建議將該參數(shù)值配置得大一些。

    restart-strategy

    重啟策略。

    參數(shù)取值如下:

    • fixed-delay:固定延遲重啟策略。

    • failure-rate:故障率重啟策略。

    • exponential-delay:指數(shù)延遲重啟策略。

    詳情請參見Restart Strategies。

    restart-strategy.fixed-delay.attempts

    固定延遲重啟策略下,嘗試重啟的最大次數(shù)。

    無。

語法結(jié)構(gòu)

CREATE TABLE postgrescdc_source (
  shipment_id INT,
  order_id INT,
  origin STRING,
  destination STRING,
  is_arrived BOOLEAN
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<yourHostname>',
  'port' = '5432',
  'username' = '<yourUserName>',
  'password' = '<yourPassWord>',
  'database-name' = '<yourDatabaseName>',
  'schema-name' = '<yourSchemaName>',
  'table-name' = '<yourTableName>'
);

WITH參數(shù)

參數(shù)

說明

數(shù)據(jù)類型

是否必填

默認(rèn)值

備注

connector

connector類型。

STRING

固定值為postgres-cdc。

hostname

Postgres數(shù)據(jù)庫的IP地址或者Hostname。

STRING

無。

username

Postgres數(shù)據(jù)庫服務(wù)的用戶名。

STRING

無。

password

Postgres數(shù)據(jù)庫服務(wù)的密碼。

STRING

無。

database-name

數(shù)據(jù)庫名稱。

STRING

數(shù)據(jù)庫名稱。

schema-name

Postgres Schema名稱。

STRING

Schema名稱支持正則表達(dá)式以讀取多個(gè)Schema的數(shù)據(jù)。

table-name

Postgres表名。

STRING

表名支持正則表達(dá)式以讀取多個(gè)表的數(shù)據(jù)。

port

Postgres數(shù)據(jù)庫服務(wù)的端口號。

INTEGER

5432

無。

decoding.plugin.name

Postgres Logical Decoding插件名稱。

STRING

decoderbufs

根據(jù)Postgres服務(wù)上安裝的插件確定。支持的插件列表如下:

  • decoderbufs(默認(rèn)值):在Postgres 9.6及以上版本支持,需要安裝該插件。

  • pgoutput(推薦): Postgres 10及以上版本的官方內(nèi)置插件。

slot.name

邏輯解碼槽的名字。

STRING

8.0.1版本之前為非必填,從8.0.1版本開始為必填

8.0.1版本之前默認(rèn)值為flink,從8.0.1版本開始無默認(rèn)值

建議每個(gè)表都設(shè)置slot.name參數(shù),以避免出現(xiàn)PSQLException: ERROR: replication slot "debezium" is active for PID 974報(bào)錯(cuò)。

debezium.*

Debezium屬性參數(shù)。

STRING

更細(xì)粒度控制Debezium客戶端的行為。例如'debezium.snapshot.mode' = 'never',詳情請參見配置屬性

scan.incremental.snapshot.enabled

是否開啟增量快照。

BOOLEAN

false

參數(shù)取值如下:

  • false(默認(rèn)值):不開啟增量快照。

  • true:開啟增量快照。

說明
  • 此功能為實(shí)驗(yàn)性功能。僅實(shí)時(shí)計(jì)算引擎8.0.6及以上版本支持該參數(shù)。

  • 增量快照的功能優(yōu)勢,前提條件和使用限制詳情請參見特色功能前提條件注意事項(xiàng)。

scan.startup.mode

消費(fèi)數(shù)據(jù)時(shí)的啟動模式。

STRING

initial

參數(shù)取值如下:

  • initial(默認(rèn)):在第一次啟動時(shí),會先掃描歷史全量數(shù)據(jù),然后讀取最新的WAL日志數(shù)據(jù)。

  • latest-offset:在第一次啟動時(shí),不會掃描歷史全量數(shù)據(jù),直接從WAL日志的末尾(最新的WAL日志處)開始讀取,即只讀取該連接器啟動以后的最新變更。

  • snapshot:先掃描歷史全量數(shù)據(jù),再讀取全量階段新產(chǎn)生的WAL日志,最終作業(yè)會停止。

changelog-mode

用于編碼流更改的變更日志(Changelog)模式。

String

all

支持的Changelog模式包括:

  • ALL:支持所有類型,包括INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER。

  • UPSERT:僅支持Upsert類型,包括INSERT、DELETE和UPDATE_AFTER。

heartbeat.interval.ms

發(fā)送心跳包的時(shí)間間隔。

Duration

30s

單位為毫秒。

Postgres CDC連接器主動向數(shù)據(jù)庫發(fā)送心跳包來保證推進(jìn)Slot的偏移量。當(dāng)表變更不頻繁時(shí),設(shè)置該值可以及時(shí)回收WAL日志。

scan.incremental.snapshot.chunk.key-column

指定某一列作為快照階段切分分片的切分列。

STRING

默認(rèn)從主鍵中選擇第一列。

scan.incremental.close-idle-reader.enabled

是否在快照結(jié)束后關(guān)閉空閑的Reader。

Boolean

false

該配置生效需要設(shè)置execution.checkpointing.checkpoints-after-tasks-finish.enabled為true。

scan.incremental.snapshot.backfill.skip

是否跳過全量階段的日志讀取。

Boolean

false

參數(shù)取值如下:

  • true:跳過。

    增量階段從低水位線開始讀取日志。

    如果下游算子或存儲支持冪等性,建議跳過全量階段日志的讀取,這樣做的優(yōu)點(diǎn)是能夠減少WAL Slot數(shù)量,缺點(diǎn)是僅能提供至少一次(At-Least Once)的語義保證

  • false:不跳過。

    全量階段讀取分片時(shí),會讀取低水位線和高水位線之間的日志來保證一致性。

    如果SQL要做聚合、關(guān)聯(lián)等操作,不建議跳過全量階段日志的讀取。

類型映射

PostgreSQL和Flink字段類型對應(yīng)關(guān)系如下。

PostgreSQL字段類型

Flink字段類型

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

使用示例

CREATE TABLE source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>'
);

SELECT * FROM source;

相關(guān)文檔

  • 實(shí)時(shí)計(jì)算Flink版支持的連接器列表,請參見支持的連接器。

  • 將數(shù)據(jù)寫入PolarDB PostgreSQL版(Oracle語法兼容1.0)結(jié)果表,請參見PolarDB PostgreSQL版(Oracle語法兼容1.0)。

  • 如果您需要讀寫RDS MySQL、PolarDB for MySQL或者自建MySQL數(shù)據(jù)庫,請使用MySQL連接器。

上一篇: StarRocks 下一篇: 配置Postgres
阿里云首頁 實(shí)時(shí)計(jì)算 Flink版 相關(guān)技術(shù)圈