日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

MySQL

更新時間: 2024-12-16 11:28:00

本文為您介紹如何使用MySQL連接器。

背景信息

MySQL連接器支持所有兼容MySQL協議的數據庫,包括RDS MySQL、PolarDB for MySQL、OceanBase(MySQL模式)或者自建MySQL。

重要
  • 建議使用本連接器,而不要采用RDS MySQL連接器,后續我們將下線連接器中的云數據庫RDS MySQL版文檔。

  • 支持使用MySQL連接器讀取OceanBase。使用MySQL連接器讀取OceanBase時,請確保OceanBase Binlog已開啟且被正確設置,詳情請參見Binlog 相關操作。使用MySQL連接器讀取OceanBase Binlog目前處于公測階段,請在使用前充分評估并謹慎使用。

MySQL連接器支持的信息如下。

類別

詳情

支持類型

源表、維表和結果表,數據攝入數據源

運行模式

僅支持流模式

數據格式

暫不適用

特有監控指標

  • 源表

    • currentFetchEventTimeLag:數據產生到拉取到Source Operator的間隔。

      該指標僅在Binlog階段有效,Snapshot階段該值恒為0。

    • currentEmitEventTimeLag:數據產生到離開Source Operator的間隔。

      該指標僅在Binlog階段有效,Snapshot階段該值恒為0。

    • sourceIdleTime:源表至今有多久不產生新數據。

  • 維表和結果表:無。

說明

指標含義詳情,請參見監控指標說明

API種類

Datastream,SQL和數據攝入YAML

是否支持更新或刪除結果表數據

特色功能

MySQL的CDC源表,即MySQL的流式源表,會先讀取數據庫的歷史全量數據,并平滑切換到Binlog讀取上,保證不多讀一條也不少讀一條數據。即使發生故障,也能保證通過Exactly Once語義處理數據。MySQL CDC源表支持并發地讀取全量數據,通過增量快照算法實現了全程無鎖和斷點續傳,詳情可參見關于MySQL CDC源表

作為源表,支持以下功能特性。

  • 流批一體,支持讀取全量和增量數據,無需維護兩套流程。

  • 支持并發讀取全量數據,性能水平擴展。

  • 全量讀取無縫切換增量讀取,自動縮容,節省計算資源。

  • 全量階段讀取支持斷點續傳,更穩定。

  • 無鎖讀取全量數據,不影響在線業務。

  • 支持讀取RDS MySQL的備份日志。

  • 并行解析Binlog文件,讀取延遲更低。

前提條件

使用限制

  • CDC源表和數據攝入數據源

    • 僅VVR 4.0.8及以上引擎版本支持無鎖讀取和并發讀取功能。

    • 請根據MySQL版本選擇合適的引擎版本,MySQL版本支持情況如下表所示。

      您可以通過執行select version()命令來查看MySQL的版本。

      VVR版本

      支持的MySQL版本

      VVR 4.0.8 ~ VVR 4.0.10

      5.7

      8.0.x

      VVR 4.0.11及以上版本

      5.6.x

      5.7.x

      8.0.x

      重要

      為了確保RDS MySQL 5.6.x版本的正常運行,默認已開啟增量快照功能(即scan.incremental.snapshot.enabled=true),且不支持關閉增量快照功能,而RDS MySQL 6.0.8和8.0.1版本的數據庫已解除該限制,即支持關閉增量快照功能。建議您不要關閉增量快照功能,因為關閉增量快照功能會鎖定MySQL數據庫,可能會對線上業務處理性能產生影響。

    • MySQL CDC源表暫不支持定義Watermark。如果您需要進行窗口聚合,您可以采用非窗口聚合的方式,詳情請參見不支持定義Watermark,那如何進行窗口聚合?

    • MySQL的CDC源表需要一個有特定權限(包括SELECT、SHOW DATABASES、REPLICATION SLAVE和REPLICATION CLIENT)的MySQL用戶,才能讀取全量和增量數據。

    • 當結合CTAS和CDAS整庫同步語法使用時,MySQL CDC源表可以同步部分Schema變更,支持的變更類型詳情請參見表結構變更同步策略。在其他使用場景下,MySQL CDC源表無法同步Schema變更操作。

    • MySQL CDC源表無法同步Truncate操作。

    • 對于RDS MySQL,不建議通過備庫或只讀從庫讀取數據。因為RDS MySQL的備庫和只讀從庫Binlog保留時間默認很短,可能由于Binlog過期清理,導致作業無法消費Binlog數據而報錯。

    • MySQL CDC源表不支持讀取PolarDB MySQL版1.0.19及以前版本的多主架構集群(什么是多主集群?)。PolarDB MySQL版1.0.19及以前版本的多主架構集群產生的Binlog可能出現重復Table id,導致CDC源表Schema映射錯誤,從而解析Binlog數據報錯。PolarDB MySQL版在高于1.0.19的版本進行適配,保證Binlog內Table id不會出現重復,從而避免解析報錯。

  • 維表和結果表

    • Flink計算引擎VVR 4.0.11及以上版本支持MySQL連接器。

    • 語義上可以保證At-Least-Once,在結果表有主鍵的情況下,冪等可以保證數據的正確性。

注意事項

  • CDC源表和數據攝入數據源

    • 每個MySQL CDC數據源需顯式配置不同的Server ID。

      • Server ID作用

        每個同步數據庫數據的客戶端,都會有一個唯一ID,即Server ID。MySQL SERVER會根據該ID來維護網絡連接以及Binlog位點。因此如果有大量不同的Server ID的客戶端一起連接MySQL SERVER,可能導致MySQL SERVER的CPU陡增,影響線上業務穩定性。

        此外,如果多個MySQL CDC數據源共享相同的Server ID,且數據源之間無法復用時,會導致Binlog位點錯亂,多讀或少讀數據。還可能出現Server ID沖突的報錯,詳情請參見上下游存儲。因此建議每個MySQL CDC數據源都配置不同的Server ID。

      • Server ID配置方式

        Server ID可以在DDL中指定,也可以通過動態Hints配置。

        建議通過動態Hints來配置Server ID,而不是在DDL參數中配置Server ID。動態Hints詳情請參見動態Hints

      • 不同場景下Server ID的配置

        • 未開啟增量快照框架或并行度為1

          當未開啟增量快照框架或并行度為1時,可以指定一個特定的Server ID。

          SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
        • 開啟增量快照框架且并行度大于1

          當開啟增量快照框架且并行度大于1時,需要指定Server ID范圍,要保證范圍內可用的Server ID數量不小于并行度。假設并行度為3,可以如下配置:

          SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
        • 結合CTAS進行數據同步

          當結合CTAS進行數據同步時,如果CDC數據源配置相同,會自動對數據源進行復用,此時可以為多個CDC數據源配置相同的Server ID。詳情請參見代碼示例四:多CTAS語句

        • 同一作業包含多個MySQL CDC源表(非CTAS)

          當作業中包含多個MySQL CDC源表,且不是使用CTAS語句同步時,數據源無法進行復用,需要為每一個CDC源表提供不同的Server ID。同理,如果開啟增量快照框架且并行度大于1,需要指定Server ID范圍。

          select * from 
            source_table1 /*+ OPTIONS('server-id'='123456-123457') */
          left join 
            source_table2 /*+ OPTIONS('server-id'='123458-123459') */
          on source_table1.id=source_table2.id;
    • 僅VVR 4.0.8及以上版本支持全量階段的無鎖讀取、并發讀取、斷點續傳等功能。

      如果您使用的是VVR 4.0.8以下版本,需要對MySQL用戶授予RELOAD權限用來獲取全局讀鎖,保證數據讀取的一致性。全局讀鎖會阻塞寫入操作,持鎖時間可能達到秒級,因此可能對線上業務造成影響。

      此外,VVR 4.0.8以下版本在全量讀取階段無法執行Checkpoint,全量階段的作業失敗會導致作業重新讀取全量數據,穩定性不佳。因此建議您將作業升級到VVR 4.0.8及以上版本。

  • 結果表

    • RDS MySQL數據庫支持自增主鍵,因此在結果表的DDL中不聲明該自增字段。例如ID是自增字段,Flink DDL不聲明該自增字段,則數據庫在一行數據寫入過程中會自動填補相關自增字段。

    • 結果表的DDL聲明的字段必須至少存在一個非主鍵的字段,否則產生報錯。

    • 結果表的DDL中NOT ENFORCED表示Flink自身對主鍵不做強制校驗,需要您自行保證主鍵的正確性和完整性。

      Flink并不充分支持強制校驗,Flink將假設列的可為空性與主鍵中的列是對齊的,從而認為主鍵是正確的,詳情請參見Validity Check

  • 維表

    如果做維表時希望使用索引查詢,請按照MySQL最左前綴原則排列JOIN指定的數據列。但這并無法保證使用索引,由于SQL優化,某些條件可能會被優化導致連接器得到的過濾條件無法命中索引。要確定連接器是否真正使用了索引進行查詢,可以在數據庫側查看具體執行的Select語句。

SQL

MySQL連接器可以在SQL作業中使用,作為源表,維表或者結果表。

語法結構

CREATE TABLE mysqlcdc_source (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

說明
  • 連接器寫入結果表原理:寫入結果表時,會將接收到的每條數據拼接成一條SQL去執行。具體執行的SQL情況如下:

    • 對于沒有主鍵的結果表,會拼接執行INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...);語句。

    • 對于包含主鍵的結果表,會拼接執行INSERT INTO table_name (column1, column2, ...) VALUES (value1, value2, ...) ON DUPLICATE KEY UPDATE column1 = VALUES(column1), column2 = VALUES(column2), ...;語句。請注意:如果物理表存在除主鍵外的唯一索引約束,當插入兩條主鍵不同但唯一索引相同的記錄時,下游數據會因為唯一索引沖突導致數據覆蓋引發數據丟失。

  • 如果在MySQL數據庫定義了自增主鍵,在Flink DDL中不應該聲明該自增字段。數據寫入過程中,數據庫會自動填補該自增字段。連接器僅支持寫入和刪除帶自增字段的數據,不支持更新。

WITH參數

  • 通用

    參數

    說明

    是否必填

    數據類型

    默認值

    備注

    connector

    表類型。

    STRING

    作為源表時,可以填寫為mysql-cdc或者mysql,二者等價。作為維表或結果表時,固定值為mysql

    hostname

    MySQL數據庫的IP地址或者Hostname。

    STRING

    建議填寫專有網絡VPC地址。

    說明

    如果MySQL與Flink全托管不在同一VPC,需要先打通跨VPC的網絡或者使用公網的形式訪問,詳情請參見如何訪問跨VPC的其他服務?Flink全托管集群如何訪問公網?

    username

    MySQL數據庫服務的用戶名。

    STRING

    無。

    password

    MySQL數據庫服務的密碼。

    STRING

    無。

    database-name

    MySQL數據庫名稱。

    STRING

    • 作為源表時,數據庫名稱支持正則表達式以讀取多個數據庫的數據。

    • 使用正則表達式時,盡量不要使用^$符號匹配開頭和結尾。具體原因詳見table-name備注的說明。

    table-name

    MySQL表名。

    STRING

    • 作為源表時,表名支持正則表達式以讀取多個表的數據。

      在讀取多個MySQL表時,將多個CTAS語句作為一個作業提交,可以避免啟用多個Binlog監聽,提高性能和效率。詳情請參見示例四:多個CTAS語句作為一個作業提交

    • 使用正則表達式時,盡量不要使用^$符號匹配開頭和結尾。具體原因詳見以下說明。

    說明

    MySQL CDC源表在正則匹配表名時,會將您填寫的 database-nametable-name 通過字符串 \\.(VVR 8.0.1前使用字符.)連接成為一個全路徑的正則表達式,然后使用該正則表達式和MySQL數據庫中表的全限定名進行正則匹配。

    例如:當配置'database-name'='db_.*'且'table-name'='tb_.+'時,連接器將會使用正則表達式db_.*\\.tb_.+(8.0.1版本前為db_.*.tb_.+)去匹配表的全限定名來確定需要讀取的表。

    port

    MySQL數據庫服務的端口號。

    INTEGER

    3306

    無。

  • 源表獨有

    參數

    說明

    是否必填

    數據類型

    默認值

    備注

    server-id

    數據庫客戶端的一個數字ID。

    STRING

    默認會隨機生成一個5400~6400的值。

    該ID必須是MySQL集群中全局唯一的。建議針對同一個數據庫的每個作業都設置一個不同的ID。

    該參數也支持ID范圍的格式,例如5400-5408。在開啟增量讀取模式時支持多并發讀取,此時推薦設定為ID范圍,使得每個并發使用不同的ID。

    scan.incremental.snapshot.enabled

    是否開啟增量快照。

    BOOLEAN

    true

    默認開啟增量快照。增量快照是一種讀取全量數據快照的新機制。與舊的快照讀取相比,增量快照有很多優點,包括:

    • 讀取全量數據時,Source可以是并行讀取。

    • 讀取全量數據時,Source支持chunk粒度的檢查點。

    • 讀取全量數據時,Source不需要獲取全局讀鎖(FLUSH TABLES WITH read lock)。

    如果您希望Source支持并發讀取,每個并發的Reader需要有一個唯一的服務器ID,因此server-id必須是5400-6400這樣的范圍,并且范圍必須大于等于并發數。

    scan.incremental.snapshot.chunk.size

    每個chunk的大小(包含的行數)。

    INTEGER

    8096

    當開啟增量快照讀取時,表會被切分成多個chunk讀取。在讀完chunk的數據之前,chunk的數據會先緩存在內存中。

    每個chunk包含的行數越少,則表中的chunk的總數量越大,盡管這會降低故障恢復的粒度,但可能導致內存OOM和整體的吞吐量降低。因此,您需要進行權衡,并設置合理的chunk大小。

    scan.snapshot.fetch.size

    當讀取表的全量數據時,每次最多拉取的記錄數。

    INTEGER

    1024

    無。

    scan.startup.mode

    消費數據時的啟動模式。

    STRING

    initial

    參數取值如下:

    • initial(默認):在第一次啟動時,會先掃描歷史全量數據,然后讀取最新的Binlog數據。

    • latest-offset:在第一次啟動時,不會掃描歷史全量數據,直接從Binlog的末尾(最新的Binlog處)開始讀取,即只讀取該連接器啟動以后的最新變更。

    • earliest-offset:不掃描歷史全量數據,直接從可讀取的最早Binlog開始讀取。

    • specific-offset:不掃描歷史全量數據,從您指定的Binlog位點啟動,位點可通過同時配置scan.startup.specific-offset.filescan.startup.specific-offset.pos參數來指定從特定Binlog文件名和偏移量啟動,也可以只配置scan.startup.specific-offset.gtid-set來指定從某個GTID集合啟動。

    • timestamp:不掃描歷史全量數據,從指定的時間戳開始讀取Binlog。時間戳通過scan.startup.timestamp-millis指定,單位為毫秒。

    重要
    • earliest-offsetspecific-offsettimestamp啟動模式在Flink計算引擎VVR 6.0.4及以上的版本支持使用。

    • 對于earliest-offsetspecific-offsettimestamp啟動模式,如果啟動時刻和指定的啟動位點時刻的表結構不同,作業會因為表結構不同而報錯。換一句話說,使用這三種啟動模式,需要保證在指定的Binlog消費位置到作業啟動的時間之間,對應表不能發生表結構變更。

    scan.startup.specific-offset.file

    使用指定位點模式啟動時,啟動位點的Binlog文件名。

    STRING

    使用該配置時,scan.startup.mode必須配置為specific-offset。文件名格式例如mysql-bin.000003

    scan.startup.specific-offset.pos

    使用指定位點模式啟動時,啟動位點在指定Binlog文件中的偏移量。

    INTEGER

    使用該配置時,scan.startup.mode必須配置為specific-offset

    scan.startup.specific-offset.gtid-set

    使用指定位點模式啟動時,啟動位點的GTID集合。

    STRING

    使用該配置時,scan.startup.mode必須配置為specific-offset。GTID集合格式例如24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

    scan.startup.timestamp-millis

    使用指定時間模式啟動時,啟動位點的毫秒時間戳。

    LONG

    使用該配置時,scan.startup.mode必須配置為timestamp。時間戳單位為毫秒。

    重要

    在使用指定時間時,MySQL CDC會嘗試讀取每個Binlog文件的初始事件以確定其時間戳,最終定位至指定時間對應的Binlog文件。請保證指定的時間戳對應的Binlog文件在數據庫上沒有被清理且可以被讀取到。

    server-time-zone

    數據庫在使用的會話時區。

    VVR-6.0.2以下版本必填,其他版本選填

    STRING

    如果您沒有指定該參數,則系統默認使用Flink作業運行時的環境時區作為數據庫服務器時區,即您選擇的可用區所在的時區。

    例如Asia/Shanghai,該參數控制了MySQL中的TIMESTAMP類型如何轉成STRING類型。更多信息請參見Debezium時間類型

    debezium.min.row.count.to.stream.results

    當表的條數大于該值時,會使用分批讀取模式。

    INTEGER

    1000

    Flink采用以下方式讀取MySQL源表數據:

    • 全量讀取:直接將整個表的數據讀取到內存里。優點是速度快,缺點是會消耗對應大小的內存,如果源表數據量非常大,可能會有OOM風險。

    • 分批讀取:分多次讀取,每次讀取一定數量的行數,直到讀取完所有數據。優點是讀取數據量比較大的表沒有OOM風險,缺點是讀取速度相對較慢。

    connect.timeout

    連接MySQL數據庫服務器超時時,重試連接之前等待超時的最長時間。

    DURATION

    30s

    無。

    connect.max-retries

    連接MySQL數據庫服務時,連接失敗后重試的最大次數。

    INTEGER

    3

    無。

    connection.pool.size

    數據庫連接池大小。

    INTEGER

    20

    數據庫連接池用于復用連接,可以降低數據庫連接數量。

    jdbc.properties.*

    JDBC URL中的自定義連接參數。

    STRING

    您可以傳遞自定義的連接參數,例如不使用SSL協議,則可配置為'jdbc.properties.useSSL' = 'false'

    支持的連接參數請參見MySQL Configuration Properties

    debezium.*

    Debezium讀取Binlog的自定義參數。

    STRING

    您可以傳遞自定義的Debezium參數,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'來指定解析錯誤時的處理邏輯。

    heartbeat.interval

    Source通過心跳事件推動Binlog位點前進的時間間隔。

    DURATION

    30s

    心跳事件用于推動Source中的Binlog位點前進,這對MySQL中更新緩慢的表非常有用。對于更新緩慢的表,Binlog位點無法自動前進,通過夠心跳事件可以推到Binlog位點前進,可以避免Binlog位點不前進引起Binlog位點過期問題,Binlog位點過期會導致作業失敗無法恢復,只能無狀態重啟。

    scan.incremental.snapshot.chunk.key-column

    可以指定某一列作為快照階段切分分片的切分列。

    見備注列。

    STRING

    • 無主鍵表必填,選擇的列必須是非空類型(NOT NULL)。

    • 有主鍵的表為選填,僅支持從主鍵中選擇一列。

    說明

    僅Flink計算引擎VVR 6.0.7及以上版本支持。

    rds.region-id

    阿里云RDS MySQL實例所在的地域ID。

    使用讀取OSS歸檔日志功能時必填。

    STRING

    • 僅Flink計算引擎VVR 6.0.7及以上版本支持。

    • 地域ID請參見地域和可用區

    rds.access-key-id

    阿里云RDS MySQL賬號Access Key ID。

    使用讀取OSS歸檔日志功能時必填。

    STRING

    詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

    重要
    • 僅Flink計算引擎VVR 6.0.7及以上版本支持。

    • 為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理

    rds.access-key-secret

    阿里云RDS MySQL賬號Access Key Secret。

    使用讀取OSS歸檔日志功能時必填。

    STRING

    詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

    重要
    • 僅Flink計算引擎VVR 6.0.7及以上版本支持。

    • 為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理

    rds.db-instance-id

    阿里云RDS MySQL實例ID。

    使用讀取OSS歸檔日志功能時必填。

    STRING

    僅Flink計算引擎VVR 6.0.7及以上版本支持。

    rds.main-db-id

    阿里云RDS MySQL實例主庫編號。

    STRING

    • 獲取主庫編號詳情請參見RDS MySQL日志備份

    • 僅Flink計算引擎VVR 8.0.7及以上版本支持。

    rds.download.timeout

    從OSS下載單個歸檔日志的超時時間。

    DURATION

    60s

    僅Flink計算引擎VVR 6.0.7及以上版本支持。

    rds.endpoint

    獲取OSS Binlog信息的服務接入點。

    STRING

    • 可選值詳情請參見服務接入點

    • 僅Flink計算引擎VVR 8.0.8及以上版本支持。

    scan.incremental.close-idle-reader.enabled

    是否在快照結束后關閉空閑的 Reader。

    BOOLEAN

    false

    • 僅Flink計算引擎VVR 8.0.1及以上版本支持。

    • 該配置生效需要設置execution.checkpointing.checkpoints-after-tasks-finish.enabled為true。

    scan.read-changelog-as-append-only.enabled

    是否將changelog數據流轉換為append-only數據流。

    BOOLEAN

    false

    參數取值如下:

    • true:所有類型的消息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都會轉換成INSERT類型的消息。僅在需要保存上游表刪除消息等特殊場景下開啟使用。

    • false(默認):所有類型的消息都保持原樣下發。

    說明

    僅Flink計算引擎VVR 8.0.8及以上版本支持。

    scan.only.deserialize.captured.tables.changelog.enabled

    在增量階段,是否僅對指定表的變更事件進行反序列化。

    BOOLEAN

    false

    參數取值如下:

    • true:僅對目標表的變更數據進行反序列化,加快Binlog讀取速度。

    • false(默認):對所有表的變更數據進行反序列化。

    說明
    • 僅Flink計算引擎VVR 8.0.7及以上版本支持。

    • 在Flink計算引擎VVR 8.0.8及以下版本使用時,參數名需要修改為debezium.scan.only.deserialize.captured.tables.changelog.enable

    scan.parallel-deserialize-changelog.enabled

    在增量階段,是否使用多線程對變更事件進行解析。

    BOOLEAN

    false

    參數取值如下:

    • true:在變更事件的反序列化階段采用多線程處理,同時保證Binlog事件順序不變,從而加快讀取速度。

    • false(默認):在事件的反序列化階段使用單線程處理。

    說明

    僅Flink計算引擎VVR 8.0.7及以上版本支持。

  • 維表獨有

    參數

    說明

    是否必填

    數據類型

    默認值

    備注

    url

    MySQL JDBC URL

    STRING

    URL的格式為:jdbc:mysql://<連接地址>:<端口號>/<數據庫名稱>

    lookup.max-retries

    讀取數據失敗后,重試讀取的最大次數。

    INTEGER

    3

    僅Flink計算引擎VVR 6.0.7及以上版本支持。

    lookup.cache.strategy

    緩存策略。

    STRING

    None

    支持None、LRU和ALL三種緩存策略,取值含義詳情請參見背景信息

    說明

    使用LRU緩存策略時,還必須配置lookup.cache.max-rows參數。

    lookup.cache.max-rows

    最大緩存條數。

    INTEGER

    100000

    • 當選擇LRU緩存策略后,必須設置緩存大小。

    • 當選擇ALL緩存策略后,可以不設置緩存大小。

    lookup.cache.ttl

    緩存超時時間。

    DURATION

    10 s

    lookup.cache.ttl的配置和lookup.cache.strategy有關,詳情如下:

    • 如果lookup.cache.strategy配置為None,則lookup.cache.ttl可以不配置,表示緩存不超時。

    • 如果lookup.cache.strategy配置為LRU,則lookup.cache.ttl為緩存超時時間。默認不過期。

    • 如果lookup.cache.strategy配置為ALL,則lookup.cache.ttl為緩存加載時間。默認不重新加載。

    填寫時請使用時間格式,例如1min或10s。

    lookup.max-join-rows

    主表中每一條數據查詢維表時,匹配后最多返回的結果數。

    INTEGER

    1024

    在Flink計算引擎VVR 6.0.7及以上版本支持。

    lookup.filter-push-down.enabled

    是否開啟維表Filter下推。

    BOOLEAN

    false

    參數取值如下:

    • true:開啟維表Filter下推,在加載MySQL數據庫表的數據時,維表會根據SQL作業中設置的條件提前過濾數據。

    • false(默認):不開啟維表Filter下推,在加載MySQL數據庫表的數據時,維表會加載全量數據。

    說明

    僅Flink計算引擎VVR 8.0.7及以上版本支持。

    重要

    維表下推應該僅在Flink表用作維表時開啟。MySQL源表暫不支持開啟Filter下推,如果一張Flink表同時被作為源表和維表,且維表開啟了Filter下推,則在使用源表處需要通過SQL Hints的方式將該配置項顯式設為false,否則可能導致作業運行異常。

  • 結果表獨有

    參數

    說明

    是否必填

    數據類型

    默認值

    備注

    url

    MySQL JDBC URL

    STRING

    URL的格式為:jdbc:mysql://<連接地址>:<端口號>/<數據庫名稱>

    sink.max-retries

    寫入數據失敗后,重試寫入的最大次數。

    INTEGER

    3

    無。

    sink.buffer-flush.batch-size

    一次批量寫入的條數。

    INTEGER

    4096

    在Flink計算引擎VVR 6.0.7及以上版本支持。

    sink.buffer-flush.max-rows

    內存中緩存的數據條數。

    INTEGER

    • 在Flink計算引擎VVR 6.0.7版本以下,該參數默認值為100。

    • 在Flink計算引擎VVR 6.0.7版本及以上版本,該參數默認值為10000。

    需指定主鍵后,該參數才生效。

    sink.buffer-flush.interval

    清空緩存的時間間隔。表示如果緩存中的數據在等待指定時間后,依然沒有達到輸出條件,系統會自動輸出緩存中的所有數據。

    DURATION

    1s

    無。

    sink.ignore-delete

    是否忽略數據Delete操作。

    BOOLEAN

    false

    • 在Flink計算引擎VVR 6.0.7及以上版本支持。

    • Flink SQL可能會生成數據Delete操作,在多個輸出節點根據主鍵同時更新同一張結果表的不同字段的場景下,可能導致數據結果不正確。

      例如一個任務在刪除了一條數據后,另一個任務又只更新了這條數據的部分字段,其余未被更新的字段由于被刪除,其值會變成null或默認值。通過將sink.ignore-delete設置為true,可以避免數據刪除操作。

    sink.ignore-null-when-update

    更新數據時,如果傳入的數據字段值為null,是更新對應字段為null,還是跳過該字段的更新。

    BOOLEAN

    false

    參數取值如下:

    • true:不更新該字段。但是當Flink表設置主鍵時,才支持配置該參數為true。配置為true時:

      • 如果是8.0.6及以下的版本,結果表寫入數據不支持攢批執行。

      • 如果是8.0.7及以上的版本,結果表寫入數據支持攢批執行。

        攢批寫入雖然可以明顯增強寫入效率和整體吞吐量,但是會帶來數據延遲問題和內存溢出風險。因此請您根據實際業務場景做好權衡。

    • false:更新該字段為null。

    說明

    僅實時計算引擎VVR 8.0.5及以上版本支持該參數。

類型映射

  • CDC源表

    MySQL CDC字段類型

    Flink字段類型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    TINYINT UNSIGNED ZEROFILL

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    SMALLINT UNSIGNED ZEROFILL

    BIGINT

    BIGINT

    INT UNSIGNED

    INT UNSIGNED ZEROFILL

    MEDIUMINT UNSIGNED

    MEDIUMINT UNSIGNED ZEROFILL

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    BIGINT UNSIGNED ZEROFILL

    SERIAL

    FLOAT [UNSIGNED] [ZEROFILL]

    FLOAT

    DOUBLE [UNSIGNED] [ZEROFILL]

    DOUBLE

    DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

    REAL [UNSIGNED] [ZEROFILL]

    NUMERIC(p, s) [UNSIGNED] [ZEROFILL]

    DECIMAL(p, s)

    DECIMAL(p, s) [UNSIGNED] [ZEROFILL]

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)]

    TIMESTAMP [(p)] WITH LOCAL TIME ZONE

    CHAR(n)

    STRING

    VARCHAR(n)

    TEXT

    BINARY

    BYTES

    VARBINARY

    BLOB

    重要

    建議MySQL不要使用TINYINT(1)類型存儲0和1以外的數值,當property-version=0時,默認MySQL CDC源表會將TINYINT(1)映射到Flink的BOOLEAN上,造成數據不準確。如果需要使用TINYINT(1)類型存儲0和1以外的數值,請參見MySQL Catalog參數配置

  • 維表和結果表

    MySQL字段類型

    Flink字段類型

    TINYINT

    TINYINT

    SMALLINT

    SMALLINT

    TINYINT UNSIGNED

    INT

    INT

    MEDIUMINT

    SMALLINT UNSIGNED

    BIGINT

    BIGINT

    INT UNSIGNED

    BIGINT UNSIGNED

    DECIMAL(20, 0)

    FLOAT

    FLOAT

    DOUBLE

    DOUBLE

    DOUBLE PRECISION

    NUMERIC(p, s)

    DECIMAL(p, s)

    說明

    其中p <= 38。

    DECIMAL(p, s)

    BOOLEAN

    BOOLEAN

    TINYINT(1)

    DATE

    DATE

    TIME [(p)]

    TIME [(p)] [WITHOUT TIME ZONE]

    DATETIME [(p)]

    TIMESTAMP [(p)] [WITHOUT TIME ZONE]

    TIMESTAMP [(p)]

    CHAR(n)

    CHAR(n)

    VARCHAR(n)

    VARCHAR(n)

    BIT(n)

    BINARY(?n/8?)

    BINARY(n)

    BINARY(n)

    VARBINARY(N)

    VARBINARY(N)

    TINYTEXT

    STRING

    TEXT

    MEDIUMTEXT

    LONGTEXT

    TINYBLOB

    BYTES

    重要

    Flink僅支持小于等于2,147,483,647(2^31 - 1)的MySQL BLOB類型的記錄。

    BLOB

    MEDIUMBLOB

    LONGBLOB

數據攝入

MySQL連接器作為數據源可以在數據攝入YAML作業中使用。

語法結構

source:
   type: mysql
   name: MySQL Source
   hostname: localhost
   port: 3306
   username: <username>
   password: <password>
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404

sink:
  type: xxx

配置項

參數

說明

是否必填

數據類型

默認值

備注

type

數據源類型。

STRING

固定值為mysql。

name

數據源名稱。

STRING

無。

hostname

MySQL數據庫的IP地址或者Hostname。

STRING

建議填寫專有網絡VPC地址。

說明

如果MySQL與Flink全托管不在同一VPC,需要先打通跨VPC的網絡或者使用公網的形式訪問,詳情請參見如何訪問跨VPC的其他服務?Flink全托管集群如何訪問公網?

username

MySQL數據庫服務的用戶名。

STRING

無。

password

MySQL數據庫服務的密碼。

STRING

無。

tables

需要同步的MySQL數據表。

STRING

  • 表名支持正則表達式以讀取多個表的數據。

  • 可以用逗號分隔多個正則表達式。

說明

點號用于分割數據庫名和表名,如果需要用點號匹配任意字符,需要對點號使用反斜杠進行轉譯。如:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。

tables.exclude

需要在同步的表中排除的表。

STRING

  • 表名支持正則表達式以排除多個表的數據。

  • 可以用逗號分隔多個正則表達式。

說明

點號用于分割數據庫名和表名,如果需要用點號匹配任意字符,需要對點號使用反斜杠進行轉譯。如:db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*。

port

MySQL數據庫服務的端口號。

INTEGER

3306

無。

schema-change.enabled

是否發送Schame變更事件。

BOOLEAN

true

無。

server-id

數據庫客戶端的用于同步的數字ID或范圍。

STRING

默認會隨機生成一個5400~6400的值。

該ID必須是MySQL集群中全局唯一的。建議針對同一個數據庫的每個作業都設置一個不同的ID。

該參數也支持ID范圍的格式,例如5400-5408。在開啟增量讀取模式時支持多并發讀取,此時推薦設定為ID范圍,使得每個并發使用不同的ID。

jdbc.properties.*

JDBC URL中的自定義連接參數。

STRING

您可以傳遞自定義的連接參數,例如不使用SSL協議,則可配置為'jdbc.properties.useSSL' = 'false'

支持的連接參數請參見MySQL Configuration Properties

debezium.*

Debezium讀取Binlog的自定義參數。

STRING

您可以傳遞自定義的Debezium參數,例如使用'debezium.event.deserialization.failure.handling.mode'='ignore'來指定解析錯誤時的處理邏輯。

scan.incremental.snapshot.chunk.size

每個chunk的大小(包含的行數)。

INTEGER

8096

MySQL表會被切分成多個chunk讀取。在讀完chunk的數據之前,chunk的數據會先緩存在內存中。

每個chunk包含的行數越少,則表中的chunk的總數量越大,盡管這會降低故障恢復的粒度,但可能導致內存OOM和整體的吞吐量降低。因此,您需要進行權衡,并設置合理的chunk大小。

scan.snapshot.fetch.size

當讀取表的全量數據時,每次最多拉取的記錄數。

INTEGER

1024

無。

scan.startup.mode

消費數據時的啟動模式。

STRING

initial

參數取值如下:

  • initial(默認):在第一次啟動時,會先掃描歷史全量數據,然后讀取最新的Binlog數據。

  • latest-offset:在第一次啟動時,不會掃描歷史全量數據,直接從Binlog的末尾(最新的Binlog處)開始讀取,即只讀取該連接器啟動以后的最新變更。

  • earliest-offset:不掃描歷史全量數據,直接從可讀取的最早Binlog開始讀取。

  • specific-offset:不掃描歷史全量數據,從您指定的Binlog位點啟動,位點可通過同時配置scan.startup.specific-offset.filescan.startup.specific-offset.pos參數來指定從特定Binlog文件名和偏移量啟動,也可以只配置scan.startup.specific-offset.gtid-set來指定從某個GTID集合啟動。

  • timestamp:不掃描歷史全量數據,從指定的時間戳開始讀取Binlog。時間戳通過scan.startup.timestamp-millis指定,單位為毫秒。

重要

對于earliest-offsetspecific-offsettimestamp啟動模式,如果啟動時刻和指定的啟動位點時刻的表結構不同,作業會因為表結構不同而報錯。換一句話說,使用這三種啟動模式,需要保證在指定的Binlog消費位置到作業啟動的時間之間,對應表不能發生表結構變更。

scan.startup.specific-offset.file

使用指定位點模式啟動時,啟動位點的Binlog文件名。

STRING

使用該配置時,scan.startup.mode必須配置為specific-offset。文件名格式例如mysql-bin.000003

scan.startup.specific-offset.pos

使用指定位點模式啟動時,啟動位點在指定Binlog文件中的偏移量。

INTEGER

使用該配置時,scan.startup.mode必須配置為specific-offset

scan.startup.specific-offset.gtid-set

使用指定位點模式啟動時,啟動位點的GTID集合。

STRING

使用該配置時,scan.startup.mode必須配置為specific-offset。GTID集合格式例如24DA167-0C0C-11E8-8442-00059A3C7B00:1-19

scan.startup.timestamp-millis

使用指定時間模式啟動時,啟動位點的毫秒時間戳。

LONG

使用該配置時,scan.startup.mode必須配置為timestamp。時間戳單位為毫秒。

重要

在使用指定時間時,MySQL CDC會嘗試讀取每個Binlog文件的初始事件以確定其時間戳,最終定位至指定時間對應的Binlog文件。請保證指定的時間戳對應的Binlog文件在數據庫上沒有被清理且可以被讀取到。

server-time-zone

數據庫在使用的會話時區。

STRING

如果您沒有指定該參數,則系統默認使用Flink作業運行時的環境時區作為數據庫服務器時區,即您選擇的可用區所在的時區。

例如Asia/Shanghai,該參數控制了MySQL中的TIMESTAMP類型如何轉成STRING類型。更多信息請參見Debezium時間類型

scan.startup.specific-offset.skip-events

從指定的位點讀取時,跳過多少Binlog事件。

INTEGER

使用該配置時,scan.startup.mode必須配置為specific-offset

scan.startup.specific-offset.skip-rows

從指定的位點讀取時,跳過多少行變更(一個Binlog事件可能對應多行變更)。

INTEGER

使用該配置時,scan.startup.mode必須配置為specific-offset

connect.timeout

連接MySQL數據庫服務器超時時,重試連接之前等待超時的最長時間。

DURATION

30s

無。

connect.max-retries

連接MySQL數據庫服務時,連接失敗后重試的最大次數。

INTEGER

3

無。

connection.pool.size

數據庫連接池大小。

INTEGER

20

數據庫連接池用于復用連接,可以降低數據庫連接數量。

heartbeat.interval

Source通過心跳事件推動Binlog位點前進的時間間隔。

DURATION

30s

心跳事件用于推動Source中的Binlog位點前進,這對MySQL中更新緩慢的表非常有用。對于更新緩慢的表,Binlog位點無法自動前進,通過夠心跳事件可以推到Binlog位點前進,可以避免Binlog位點不前進引起Binlog位點過期問題,Binlog位點過期會導致作業失敗無法恢復,只能無狀態重啟。

scan.incremental.snapshot.chunk.key-column

可以指定某一列作為快照階段切分分片的切分列。

否。

STRING

僅支持從主鍵中選擇一列。

rds.region-id

阿里云RDS MySQL實例所在的地域ID。

使用讀取OSS歸檔日志功能時必填。

STRING

地域ID請參見地域和可用區

rds.access-key-id

阿里云RDS MySQL賬號Access Key ID。

使用讀取OSS歸檔日志功能時必填。

STRING

詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

重要

為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理

rds.access-key-secret

阿里云RDS MySQL賬號Access Key Secret。

使用讀取OSS歸檔日志功能時必填。

STRING

詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

重要

為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理

rds.db-instance-id

阿里云RDS MySQL實例ID。

使用讀取OSS歸檔日志功能時必填。

STRING

無。

rds.main-db-id

阿里云RDS MySQL實例主庫編號。

STRING

獲取主庫編號詳情請參見RDS MySQL日志備份

rds.download.timeout

從OSS下載單個歸檔日志的超時時間。

DURATION

60s

無。

rds.endpoint

獲取OSS Binlog信息的服務接入點。

STRING

可選值詳情請參見服務接入點

rds.binlog-directory-prefix

保存Binlog文件的目錄前綴。

STRING

rds-binlog-

無。

rds.use-intranet-link

是否使用內網下載Binlog文件。

BOOLEAN

true

無。

rds.binlog-directories-parent-path

保存Binlog文件的父目錄的絕對路徑。

STRING

無。

chunk-meta.group.size

chunk元信息的大小。

INTEGER

1000

如果元信息大于該值,元信息會分為多份傳遞。

chunk-key.even-distribution.factor.lower-bound

是否可以均勻分片的chunk分布因子的下限。

DOUBLE

0.05

分布因子小于該值會使用非均勻分片。

chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 總數據行數。

chunk-key.even-distribution.factor.upper-bound

是否可以均勻分片的chunk分布因子的上限。

DOUBLE

1000.0

分布因子大于該值會使用非均勻分片。

chunk分布因子 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 總數據行數。

scan.incremental.close-idle-reader.enabled

是否在快照結束后關閉空閑的Reader。

BOOLEAN

false

該配置生效,需要設置execution.checkpointing.checkpoints-after-tasks-finish.enabled為true。

scan.only.deserialize.captured.tables.changelog.enabled

在增量階段,是否僅對指定表的變更事件進行反序列化。

BOOLEAN

false

參數取值如下:

  • true:僅對目標表的變更數據進行反序列化,加快Binlog讀取速度。

  • false(默認):對所有表的變更數據進行反序列化。

scan.parallel-deserialize-changelog.enabled

在增量階段,是否使用多線程對變更事件進行解析。

BOOLEAN

false

參數取值如下:

  • true:在變更事件的反序列化階段采用多線程處理,同時保證Binlog事件順序不變,從而加快讀取速度。

  • false(默認):在事件的反序列化階段使用單線程處理。

scan.parallel-deserialize-changelog.handler.size

多線程對變更事件進行解析時,事件處理器的數量。

INTEGER

2

無。

metadata-column.include-list

需要傳給下游的元數據列。

STRING

可用的元數據包括table_namedatabase_nameop_tsrow_kind,您可以使用英文分號分隔。

scan.newly-added-table.enabled

從Checkpoint重啟時,是否同步上一次啟動時未匹配到的新增表。

BOOLEAN

false

從Checkpoint或Savepoint重啟時生效。

scan.binlog.newly-added-table.enabled

在增量階段,是否發送匹配到的新增表的數據。

BOOLEAN

false

不能與scan.newly-added-table.enabled同時開啟。

類型映射

數據攝入類型映射如下表所示。

MySQL CDC字段類型

CDC字段類型

TINYINT(n)

TINYINT

SMALLINT

SMALLINT

TINYINT UNSIGNED

TINYINT UNSIGNED ZEROFILL

YEAR

INT

INT

MEDIUMINT

MEDIUMINT UNSIGNED

MEDIUMINT UNSIGNED ZEROFILL

SMALLINT UNSIGNED

SMALLINT UNSIGNED ZEROFILL

BIGINT

BIGINT

INT UNSIGNED

INT UNSIGNED ZEROFILL

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT UNSIGNED ZEROFILL

SERIAL

FLOAT [UNSIGNED] [ZEROFILL]

FLOAT

DOUBLE [UNSIGNED] [ZEROFILL]

DOUBLE

DOUBLE PRECISION [UNSIGNED] [ZEROFILL]

REAL [UNSIGNED] [ZEROFILL]

NUMERIC(p, s) [UNSIGNED] [ZEROFILL]且p <= 38

DECIMAL(p, s)

DECIMAL(p, s) [UNSIGNED] [ZEROFILL]且p <= 38

FIXED(p, s) [UNSIGNED] [ZEROFILL]且p <= 38

BOOLEAN

BOOLEAN

BIT(1)

TINYINT(1)

DATE

DATE

TIME [(p)]

TIME [(p)]

DATETIME [(p)]

TIMESTAMP [(p)]

TIMESTAMP [(p)]

TIMESTAMP_LTZ [(p)]

CHAR(n)

CHAR(n)

VARCHAR(n)

VARCHAR(n)

BIT(n)

BINARY(?(n + 7) / 8?)

BINARY(n)

BINARY(n)

VARBINARY(N)

VARBINARY(N)

NUMERIC(p, s) [UNSIGNED] [ZEROFILL]且38 < p <= 65

STRING

說明

在MySQL中,十進制數據類型的精度高達 65,但在Flink中,十進制數據類型的精度僅限于38。所以,如果定義精度大于38的十進制列,則應將其映射到字符串以避免精度損失。

DECIMAL(p, s) [UNSIGNED] [ZEROFILL]且38 < p <= 65

FIXED(p, s) [UNSIGNED] [ZEROFILL]且38 < p <= 65

TINYTEXT

STRING

TEXT

MEDIUMTEXT

LONGTEXT

ENUM

JSON

STRING

說明

JSON數據類型將在Flink中轉換為JSON格式的字符串。

GEOMETRY

STRING

說明

MySQL中的空間數據類型將轉換為具有固定JSON格式的字符串,詳情請參見MySQL空間數據類型映射

POINT

LINESTRING

POLYGON

MULTIPOINT

MULTILINESTRING

MULTIPOLYGON

GEOMETRYCOLLECTION

TINYBLOB

BYTES

說明

對于MySQL中的BLOB數據類型,僅支持長度不大于2147483647(2**31-1)的 blob。

BLOB

MEDIUMBLOB

LONGBLOB

使用示例

  • CDC源表

    CREATE TEMPORARY TABLE mysqlcdc_source (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      order_id INT,
      customer_name STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT order_id, customer_name FROM mysqlcdc_source;
  • 維表

    CREATE TEMPORARY TABLE datagen_source(
      a INT,
      b BIGINT,
      c STRING,
      `proctime` AS PROCTIME()
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_dim (
      a INT,
      b VARCHAR,
      c VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink(
      a INT,
      b STRING
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT T.a, H.b
    FROM datagen_source AS T JOIN mysql_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.a = H.a;
  • 結果表

    CREATE TEMPORARY TABLE datagen_source (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE mysql_sink (
      `name` VARCHAR,
      `age` INT
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO mysql_sink
    SELECT * FROM datagen_source;
  • 數據攝入數據源

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 7601-7604
    
    sink:
      type: values
      name: Values Sink
      print.enabled: true
      sink.print.logger: true

關于MySQL CDC源表

  • 實現原理

    MySQL CDC源表在啟動時掃描全表,將表按照主鍵分成多個分片(chunk),記錄下此時的Binlog位點。并使用增量快照算法通過select語句,逐個讀取每個分片的數據。作業會周期性執行Checkpoint,記錄下已經完成的分片。當發生Failover時,只需要繼續讀取未完成的分片。當分片全部讀取完后,會從之前獲取的Binlog位點讀取增量的變更記錄。Flink作業會繼續周期性執行Checkpoint,記錄下Binlog位點,當作業發生Failover,便會從之前記錄的Binlog位點繼續處理,從而實現Exactly Once語義。

    更詳細的增量快照算法,請參見MySQL CDC Connector

  • 元數據

    元數據在分庫分表合并同步場景非常實用,因為分庫分表合并后,一般業務還是希望區分每條數據的庫名和表名來源,而元數據列可以訪問源表的庫名和表名信息。因此通過元數據列可以非常方便地將多張分表合并到一張目的表。

    自vvr-4.0.11-flink-1.13版本開始,MySQL CDC Source支持元數據列語法,您可以通過元數據列訪問以下元數據。

    元數據key

    元數據類型

    描述

    database_name

    STRING NOT NULL

    包含該行記錄的庫名。

    table_name

    STRING NOT NULL

    包含該行記錄的表名。

    op_ts

    TIMESTAMP_LTZ(3) NOT NULL

    該行記錄在數據庫中的變更時間,如果該記錄來自表的存量歷史數據而不是Binlog中獲取,則該值總是0。

    op_type

    STRING NOT NULL

    該行記錄的變更類型。

    • +I:表示INSERT消息

    • -D:表示DELETE消息

    • -U:表示UPDATE_BEFORE消息

    • +U:表示UPDATE_AFTER消息

    說明

    僅實時計算引擎VVR 8.0.7及以上版本支持。

    將MySQL實例中多個分庫下的多張orders表,合并同步到下游Hologres的holo_orders表中,代碼示例如下所示。

    CREATE TABLE mysql_orders (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,  -- 讀取庫名。
      table_name STRING METADATA  FROM 'table_name' VIRTUAL, -- 讀取表名。
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, -- 讀取變更時間。
      op_type STRING METADATA FROM 'op_type' VIRTUAL, -- 讀取變更類型。
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'flinkuser',
      'password' = 'flinkpw',
      'database-name' = 'mydb_.*', -- 正則匹配多個分庫。
      'table-name' = 'orders_.*'   -- 正則匹配多張分表。
    );
    
    INSERT INTO holo_orders SELECT * FROM mysql_orders;

    在上面代碼的基礎上,WITH參數里配置scan.read-changelog-as-append-only.enabled參數為true時,輸出結果根據下游表主鍵設置情況有不同的表現:

    • 下游表主鍵為order_id時,輸出結果僅包含上游表每個主鍵的最后一次變更。即對于某個主鍵最后一次變更為刪除操作的數據,在下游表可以看到一條相同主鍵的、op_type為-D的數據。

    • 下游表主鍵為order_id、operation_ts、op_type時,輸出結果包含上游表每個主鍵的完整變更。

  • 支持正則表達式

    MySQL CDC源表支持在表名或者庫名中使用正則表達式匹配多個表或者多個庫。通過正則表達式指定多張表的代碼示例如下。

    CREATE TABLE products (
      db_name STRING METADATA FROM 'database_name' VIRTUAL,
      table_name STRING METADATA  FROM 'table_name' VIRTUAL,
      operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
      order_id INT,
      order_date TIMESTAMP(0),
      customer_name STRING,
      price DECIMAL(10, 5),
      product_id INT,
      order_status BOOLEAN,
      PRIMARY KEY(order_id) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
      'hostname' = 'localhost',
      'port' = '3306',
      'username' = 'root',
      'password' = '123456',
      'database-name' = '(^(test).*|^(tpc).*|txc|.*[p$]|t{2})', -- 正則表達式匹配多個庫。
      'table-name' = '(t[5-8]|tt)' -- 正則表達式匹配多張表。
    );

    上述例子中的正則表達式解釋:

    • ^(test).* 是前綴匹配示例,這個表達式可以匹配以test開頭的庫名,例如test1或test2。

    • .*[p$] 是后綴匹配示例, 這個表達式可以匹配以p結尾的庫名,例如cdcp或edcp。

    • txc是指定匹配, 可以匹配指定名稱的數據庫名,例如txc。

    MySQL CDC在匹配全路徑表名時,會通過庫名和表名來唯一確定一張表,即使用database-name.table-name作為匹配表的模式,例如匹配模式 (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[ 5-8]|tt) 就可以匹配到數據庫中的表txc.tt和test2.test5。

    重要

    在SQL作業的配置中,table-name和database-name不支持使用逗號(,)分隔形式指定多張表或多個庫。

    • 如果需要匹配多個表或使用多個正則表達式,可以用豎線(|)連接并用小括號包圍,例如需要讀取表user和product,table-name可以配置為(user|product)

    • 如果正則表達式包含逗號,需要用豎線(|)運算符進行改寫,例如正則表達式mytable_\d{1, 2)需要改寫成等價的(mytable_\d{1}|mytable_\d{2}),來避免使用逗號。

  • 并發控制

    MySQL連接器支持多并發讀取全量數據,能夠提高數據加載效率。同時配合Flink實時計算控制臺的Autopilot自動調優功能,在多并發讀取完成后增量階段,能夠自動縮容,節約計算資源。

    在Flink全托管控制臺,您可以在資源配置頁面的基礎模式或專家模式中設置作業的并發數。設置并發的區別如下:

    • 基礎模式設置的并發數為整個作業的全局并發數。基礎模式

    • 專家模式支持按需為某個VERTEX設置并發數。vertex并發

    資源配置詳情請參見配置作業部署信息

    重要

    無論是基礎模式還是專家模式,在設置并發時,表中聲明的server-id范圍必須大于等于作業的并發數。例如server-id的范圍為5404-5412,則共有8個唯一的server-id,因此作業最多可以設置8個并發,且不同的作業對于同一個MySQL實例的server-id范圍不能有重疊,即每個作業需顯式配置不同的server-id。

  • Autopilot自動縮容

    全量階段積累了大量歷史數據,為了提高讀取效率,通常采用并發的方式讀取歷史數據。而在Binlog增量階段,因為Binlog數據量少且為了保證全局有序,通常只需要單并發讀取。全量階段和增量階段對資源的不同需求,可以通過自動調優功能自動幫您實現性能和資源的平衡。

    自動調優會監控MySQL CDC Source的每個task的流量。當進入Binlog階段,如果只有一個task在負責Binlog讀取,其他task均空閑時,自動調優便會自動縮小Source的CU數和并發。開啟自動調優只需要在作業運維頁面,將自動調優的模式設置為Active模式。

    說明

    默認調低并發度的最小觸發時間間隔為24小時。更多自動調優的參數和細節,請參見配置自動調優

  • 啟動模式

    使用配置項scan.startup.mode可以指定MySQL CDC源表的啟動模式。可選值包括:

    • initial (默認):在第一次啟動時對數據庫表進行全量讀取,完成后切換至增量模式讀取Binlog。

    • earliest-offset:跳過快照階段,從可讀取的最早Binlog位點開始讀取。

    • latest-offset:跳過快照階段,從Binlog的結尾處開始讀取。該模式下源表只能讀取在作業啟動之后的數據變更。

    • specific-offset:跳過快照階段,從指定的Binlog位點開始讀取。位點可通過Binlog文件名和位置指定,或者使用GTID集合指定。

    • timestamp:跳過快照階段,從指定的時間戳開始讀取Binlog事件。

    使用示例:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode' = 'earliest-offset', -- 從最早位點啟動。
        'scan.startup.mode' = 'latest-offset', -- 從最晚位點啟動。
        'scan.startup.mode' = 'specific-offset', -- 從特定位點啟動。
        'scan.startup.mode' = 'timestamp', -- 從特定位點啟動。
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位點啟動模式下指定Binlog文件名。
        'scan.startup.specific-offset.pos' = '4', -- 在特定位點啟動模式下指定Binlog位置。
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位點啟動模式下指定GTID集合。
        'scan.startup.timestamp-millis' = '1667232000000' -- 在時間戳啟動模式下指定啟動時間戳。
        ...
    )
    重要
    • MySQL source會在Checkpoint時將當前位點以INFO級別打印到日志中,日志前綴為Binlog offset on checkpoint {checkpoint-id},該日志可以幫助您將作業從某個Checkpoint位點開始啟動作業。

    • 如果讀取的表曾經發生過表結構變化,從最早位點(earliest-offset)、特定位點(specific-offset)或時間戳(timestamp)啟動可能會發生錯誤。因為Debezium讀取器會在內部保存當前的最新表結構,結構不匹配的早期數據無法被正確解析。

  • 關于無主鍵CDC源表

    • 在Flink計算引擎VVR 6.0.7及以上版本支持使用MySQL CDC無主鍵源表,使用無主鍵表要求必須設置scan.incremental.snapshot.chunk.key-column,且只能選擇非空類型的字段。

    • 無主鍵CDC源表的處理語義由scan.incremental.snapshot.chunk.key-column指定的列的行為決定:

      • 如果指定的列不存在更新操作,此時可以保證Exactly once語義。

      • 如果指定的列發生更新操作,此時只能保證At least once語義。但可以結合下游,通過指定下游主鍵,結合冪等性操作來保證數據的正確性。

  • 讀取阿里云RDS MySQL備份日志

    MySQL CDC源表支持讀取阿里云RDS MySQL的備份日志。這在全量階段執行時間較長,本地Binlog文件已經被自動清理,而自動或者手動上傳的備份文件依然存在的場景下非常適用。

    使用示例:

    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'rds.region-id' = 'cn-beijing',
        'rds.access-key-id' = 'xxxxxxxxx', 
        'rds.access-key-secret' = 'xxxxxxxxx', 
        'rds.db-instance-id' = 'rm-xxxxxxxxxxxxxxxxx', 
        'rds.main-db-id' = '12345678',
        'rds.download.timeout' = '60s'
        ...
    )
  • 開啟CDC Source復用

    當同一個作業中有多個MySQL CDC源表時,每個源表都會啟動對應的Binlog Client,如果源表數量較多并且讀取的MySQL表都在同一個實例中時,會對數據庫造成較大壓力,詳情請參見MySQL CDC常見問題

    實時計算引擎VVR 8.0.7及以上版本支持MySQL CDC Source復用,當不同的CDC源表配置項除了數據庫、表名和server-id外的其他配置項均相同時,可以進行合并。開啟Source復用后,實時計算引擎會盡可能將同一個作業中能夠合并的MySQL CDC源表進行合并。

    您可以在SQL作業中使用SET命令開啟source復用功能:

    SET 'table.optimizer.source-merge.enabled' = 'true';

    對已有作業啟用 Source 復用后,需要無狀態啟動。原因是 Source 復用會導致作業拓撲改變,從原有作業狀態可能無法啟動或者丟失數據。

    重要
    • VVR 8.0.8及8.0.9版本,在開啟CDC Source復用時,還需要額外設置SET 'sql-gateway.exec-plan.enabled' = 'false'

    • 在開啟CDC Source復用后,不建議將作業配置項pipeline.operator-chaining設為false,因為將算子鏈斷開后,Source發送給下游算子的數據會增加序列化和反序列的開銷,當合并的Source越多時,開銷會越大。

    • 在實時計算引擎VVR 8.0.7版本,將pipeline.operator-chaining設為false時會出現序列化的問題。

加速Binlog讀取

MySQL連接器作為源表或數據攝入數據源使用時,在增量階段會解析Binlog文件生成各種變更消息,Binlog文件使用二進制記錄著所有表的變更,可以通過以下方式加速Binlog文件解析。

  • 開啟并行解析和解析過濾配置

    • 使用配置項scan.only.deserialize.captured.tables.changelog.enabled:僅對指定表的變更事件進行解析。

    • 使用配置項scan.parallel-deserialize-changelog.enabled:采用多線程對Binlog文件進行解析、并按順序投放到消費隊列。

  • 優化Debezium參數

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size:阻塞隊列可以容納的記錄的最大數量。當Debezium從數據庫讀取事件流時,它會在將事件寫入下游之前將它們放入阻塞隊列。默認值為8192。

    • debezium.max.batch.size:該連接器每次迭代處理的事件條數最大值。默認值為2048。

    • debezium.poll.interval.ms:連接器應該在請求新的變更事件前等待多少毫秒。默認值為1000毫秒,即1秒。

使用示例:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Debezium配置
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- 開啟并行解析和解析過濾
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- 僅對指定表的變更事件進行解析。
    'scan.parallel-deserialize-changelog.enabled' = 'true'  -- 使用多線程對Binlog進行解析。
    ...
)
source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604
  # Debezium配置
  debezium.max.queue.size: 162580
  debezium.max.batch.size: 40960
  debezium.poll.interval.ms: 50
  # 開啟并行解析和解析過濾
  scan.only.deserialize.captured.tables.changelog.enabled: true
  scan.parallel-deserialize-changelog.enabled: true

MySQL CDC 企業版本binlog消費能力為85MB/s,約為開源社區的2倍,當Binlog文件產生速度大于 85MB/s 時(即每6s一個512MB大小的文件),Flink 作業的延遲會持續上升,在Binlog文件產生速度降低后處理延遲會逐步下降。在Binlog文件包含大事務時,可能會導致處理延遲短暫上升,讀取完該事務的日志后處理延遲會下降。

MySQL CDC DataStream API

重要

通過DataStream的方式讀寫數據時,則需要使用對應的DataStream連接器連接Flink全托管,DataStream連接器設置方法請參見DataStream連接器使用方法

創建DataStream API程序并使用MySqlSource。代碼及pom依賴項示例如下:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
public class MySqlSourceExample {
  public static void main(String[] args) throws Exception {
    MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
        .hostname("yourHostname")
        .port(yourPort)
        .databaseList("yourDatabaseName") // set captured database
        .tableList("yourDatabaseName.yourTableName") // set captured table
        .username("yourUsername")
        .password("yourPassword")
        .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
        .build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // enable checkpoint
    env.enableCheckpointing(3000);
    env
      .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
      // set 4 parallel source tasks
      .setParallelism(4)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    env.execute("Print MySQL Snapshot + Binlog");
  }
}
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-mysql</artifactId>
    <version>${vvr.version}</version>
</dependency>

在構建MySqlSource時,代碼中必須指定以下參數:

參數

說明

hostname

MySQL數據庫的IP地址或者Hostname。

port

MySQL數據庫服務的端口號。

databaseList

MySQL數據庫名稱。

說明

數據庫名稱支持正則表達式以讀取多個數據庫的數據,您可以使用.*匹配所有數據庫。

username

MySQL數據庫服務的用戶名。

password

MySQL數據庫服務的密碼。

deserializer

反序列化器,將SourceRecord類型記錄反序列化到指定類型。參數取值如下:

  • RowDataDebeziumDeserializeSchema:將SourceRecord轉成Flink Table或SQL內部數據結構RowData。

  • JsonDebeziumDeserializationSchema:將SourceRecord轉成JSON格式的String。

pom依賴項必須指定以下參數:

${vvr.version}

阿里云實時計算Flink版的引擎版本,例如:vvr-8.0.4-flink-1.17

${flink.version}

Apache Flink版本,例如:1.17.2

重要

請使用阿里云實時計算Flink版的引擎版本對應的Apache Flink版本,避免在作業運行時出現不兼容的問題。版本對應關系詳情,請參見引擎

常見問題

CDC源表使用中可能遇到的問題,詳情請參見CDC問題

Flink CDC技術原理及企業版特性

  • Flink CDC企業版特性

  • Flink CDC技術

上一篇: 日志服務SLS 下一篇: MySQL連接器DataStream本地調試
阿里云首頁 實時計算 Flink版 相關技術圈