本文為您介紹MySQL連接器在常見使用場景下的最佳實踐。
設置Server ID,避免Binlog消費沖突
每個同步數據庫數據的客戶端,都會有一個唯一ID,即Server ID。如果不同作業使用了相同的Server ID,會因為沖突導致作業報錯。建議為每個MySQL CDC數據源配置不同的Server ID。
Server ID配置方式
Server ID可以在Flink建表語句中指定,也可以通過動態Hints配置。
建議通過動態Hints來配置Server ID,而不是在建表的WITH參數中配置Server ID。動態Hints詳情請參見動態Hints。
不同場景下Server ID的配置
未開啟增量快照框架或并行度為1
當未開啟增量快照框架或并行度為1時,可以指定一個特定的Server ID。
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
開啟增量快照框架且并行度大于1
當開啟增量快照框架且并行度大于1時,需要指定Server ID范圍,確保范圍內可用的Server ID數量不小于并行度。假設并行度為3,可以如下配置:
SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
結合CTAS進行數據同步
當結合CTAS進行數據同步時,如果CDC數據源配置相同,會自動對數據源進行合并復用,此時可以為多個CDC數據源配置相同的Server ID。詳情請參見代碼示例四:多CTAS語句。
同一作業包含多個MySQL CDC源表(非CTAS)
當作業中包含多個MySQL CDC源表,且不是使用CTAS語句同步時,如果沒有開啟Source復用(詳情請參見開啟Source復用,減少Binlog數據連接),需要為每一個CDC源表提供不同的Server ID。同理,如果開啟增量快照框架且并行度大于1,需要指定Server ID范圍。
select * from source_table1 /*+ OPTIONS('server-id'='123456-123457') */ left join source_table2 /*+ OPTIONS('server-id'='123458-123459') */ on source_table1.id=source_table2.id;
設置分片參數,優化內存空間
MySQL CDC源表在啟動時掃描全表,將表按照主鍵分成多個分片(chunk),記錄下此時的Binlog位點。并使用增量快照算法通過Select語句,逐個讀取每個分片的數據。作業會周期性執行Checkpoint,記錄下已經完成的分片。當發生Failover時,只需要繼續讀取未完成的分片。當分片全部讀取完后,會從之前獲取的Binlog位點讀取增量的變更記錄。Flink作業會繼續周期性執行Checkpoint,記錄下Binlog位點,當作業發生Failover,便會從之前記錄的Binlog位點繼續處理,從而實現Exactly Once語義。
更詳細的增量快照算法,請參見MySQL CDC Connector。
對于只有一個字段的主鍵表,默認使用該字段進行分片。對于有聯合主鍵的MySQL物理表,默認使用主鍵里的第一個字段進行分片。Flink計算引擎VVR 6.0.7及以上版本支持讀取無主鍵源表,需要設置scan.incremental.snapshot.chunk.key-column指定一個非空類型的字段進行分片。
分片參數優化
分片數據和分片信息會保存在內存中,在一些情況下,可能會出現OOM的問題。可以根據出現OOM的組件進行參數調整:
JobManager
JobManager保存所有分片的信息,如果分片數量過多會出現OOM,需要通過增加scan.incremental.snapshot.chunk.size值來減少分片數。也可以在運行參數配置中設置jobmanager.memory.heap.size以增大JobManager堆內存,參見Flink參數配置。
TaskManager
TaskManager讀取每個分片的數據,如果分片里數據條數過多會出現OOM,需要通過減少scan.incremental.snapshot.chunk.size值來減少分片里的數據條數。也可以在運行參數配置中調整Task Manager Memory為更大值以增加TaskManager堆內存。
在VVR 8.0.8及之前版本,最后一個分片需要讀取的數據量可能比較大,導致TaskManager出現OOM,建議升級到VVR 8.0.9及以上以避免該問題。
對于有聯合主鍵的MySQL CDC源表,會使用主鍵里的第一個字段進行分片,如果存在大量的數據在該字段中為相同字段值的情況,對應分片的數據會更多,可能會導致TaskManager出現OOM,可以設置scan.incremental.snapshot.chunk.key-column指定主鍵中的其他字段進行分片劃分。
開啟Source復用,減少Binlog數據連接
在作業中包含了多張MySQL源表時,開啟Source復用能夠復用Binlog連接,從而減少數據庫的壓力。該功能僅在實時計算Flink版本提供,社區版MySQL CDC連接器不支持。
您可以在SQL作業中使用SET命令開啟Source復用功能:
SET 'table.optimizer.source-merge.enabled' = 'true';
建議在新創建的作業中就開啟Source復用功能。對已有作業啟用Source復用后,需要無狀態啟動。原因是Source復用會導致作業拓撲改變,從原有作業狀態可能無法啟動或者丟失數據。
開啟Source復用后,具有相同配置參數的MySQL源表會進行合并。如果您的作業中所有源表的配置都相同,作業的Binlog連接數可以按照如下方式計算:
全量讀取階段,Binlog連接數等于Source并發度。
增量讀取階段,Binlog連接數等于1。
VVR 8.0.8及8.0.9版本,在開啟CDC Source復用時,還需要額外設置
SET 'sql-gateway.exec-plan.enabled' = 'false';
。在開啟CDC Source復用后,不建議將作業配置項
pipeline.operator-chaining
設為false,因為將算子鏈斷開后,Source發送給下游算子的數據會增加序列化和反序列的開銷,當合并的Source越多時,開銷會越大。在實時計算引擎VVR 8.0.7版本,將
pipeline.operator-chaining
設為false時會出現序列化的問題。
開啟Binlog解析參數,加速增量數據讀取
MySQL連接器作為源表或數據攝入數據源使用時,在增量階段會解析Binlog文件生成各種變更消息,Binlog文件使用二進制記錄著所有表的變更,可以通過以下方式加速Binlog文件解析。
開啟并行解析和解析過濾配置(該功能僅在實時計算Flink版本提供,社區版MySQL CDC連接器不支持)
開啟配置項
scan.only.deserialize.captured.tables.changelog.enabled
:僅對指定表的變更事件進行解析。開啟配置項
scan.parallel-deserialize-changelog.enabled
:采用多線程對Binlog文件進行解析,并按順序投放到消費隊列。開啟該配置時通常需要增加Task Manager CPU進行配合。
優化Debezium參數
debezium.max.queue.size: 162580 debezium.max.batch.size: 40960 debezium.poll.interval.ms: 50
debezium.max.queue.size
:阻塞隊列可以容納的記錄的最大數量。當Debezium從數據庫讀取事件流時,它會在將事件寫入下游之前將它們放入阻塞隊列。默認值為8192。debezium.max.batch.size
:該連接器每次迭代處理的事件條數最大值。默認值為2048。debezium.poll.interval.ms
:連接器應該在請求新的變更事件前等待多少毫秒。默認值為1000毫秒,即1秒。
使用示例:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
-- Debezium配置
'debezium.max.queue.size' = '162580',
'debezium.max.batch.size' = '40960',
'debezium.poll.interval.ms' = '50',
-- 開啟并行解析和解析過濾
'scan.only.deserialize.captured.tables.changelog.enabled' = 'true', -- 僅對指定表的變更事件進行解析。
'scan.parallel-deserialize-changelog.enabled' = 'true' -- 使用多線程對Binlog進行解析。
...
)
分析數據延遲,優化作業吞吐
在增量階段出現數據延遲時,可以按照以下步驟進行分析:
參見概覽中的currentFetchEventTimeLag和currentEmitEventTimeLag兩個指標,currentFetchEventTimeLag代表從Binlog讀取到數據的延遲,currentEmitEventTimeLag代表從Binlog讀取到作業相關的表的數據的延遲。
場景
詳情
currentFetchEventTimeLag延遲較小而currentEmitEventTimeLag延遲較大,并且currentEmitEventTimeLag幾乎不更新。
currentFetchEventTimeLag延遲較小說明從數據庫拉取Binlog的延遲較低,但是Binlog中屬于作業需要讀取的表的數據較少,因此currentEmitEventTimeLag幾乎不更新,屬于正常現象。
currentFetchEventTimeLag延遲和currentEmitEventTimeLag延遲都比較大。
說明Source表拉取能力較弱,可以參見本小節的后續步驟進行調優。
反壓的存在會導致Source端數據發送至下游算子的速率下降,您可能會觀察到sourceIdleTime周期性上升,currentFetchEventTimeLag和currentEmitEventTimeLag不斷增長。可以通過增大反壓源頭所在節點的并發度來避免該情況。
參見CPU中的TM CPU Usage指標和JVM中的TM GC Time指標,確認是否出現CPU或者內存資源不足的情況,可以適當增加作業資源以優化讀取性能,還可以開啟mini-batch參數以提升吞吐量,參見高性能Flink SQL優化技巧。
在作業中存在SinkUpsertMaterializer算子并且存在大狀態時,會影響讀取性能,請考慮增加作業并發度或者避免使用SinkUpsertMaterializer算子,詳情請參見避免使用SinkUpsertMaterializer。對已有作業配置去掉SinkUpsertMaterializer算子時,需要無狀態啟動。原因是作業拓撲發生改變,從原有作業狀態可能無法啟動或者丟失數據。
開啟讀取 RDS Binlog,避免Binlog過期
使用阿里云RDS MySQL實例作為Source數據源時,支持讀取保存在OSS的日志備份。當指定的時間戳或者Binlog位點對應的文件保存在OSS時,會自動拉取OSS日志文件到Flink集群本地進行讀取,當指定的時間戳或者Binlog位點對應的文件保存在數據庫本地時,會自動切換到使用數據庫連接進行讀取。該功能僅在實時計算Flink版本提供,社區版MySQL CDC連接器不支持。
開啟讀取OSS日志備份功能需要配置RDS的連接參數,使用示例:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
'rds.region-id' = 'cn-beijing',
'rds.access-key-id' = 'your_access_key_id',
'rds.access-key-secret' = 'your_access_key_secret',
'rds.db-instance-id' = 'rm-xxxxxxxx', // 數據庫實例id。
'rds.main-db-id' = '12345678', // 主庫編號。
'rds.endpoint' = 'rds.aliyuncs.com'
...
)
使用數據攝入進行整庫同步,表結構變更同步
對于只包含數據同步邏輯的作業,建議使用數據攝入運行,數據攝入作業基于數據集成場景進行了深度優化,使用方式參見數據攝入YAML作業快速入門以及數據攝入YAML作業開發(公測中)。
如下代碼提供了將MySQL的app_db整庫同步到Hologres的示例,對于上游app_db庫中的表結構變更,數據攝入作業會將該變更同步到下游數據庫:
source:
type: mysql
hostname: <hostname>
port: 3306
username: ${secret_values.mysqlusername}
password: ${secret_values.mysqlpassword}
tables: app_db.\.*
server-id: 5400-5404
sink:
type: hologres
name: Hologres Sink
endpoint: <endpoint>
dbname: <database-name>
username: ${secret_values.holousername}
password: ${secret_values.holopassword}
pipeline:
name: Sync MySQL Database to Hologres