日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

MySQL連接器最佳實踐

本文為您介紹MySQL連接器在常見使用場景下的最佳實踐。

設置Server ID,避免Binlog消費沖突

每個同步數據庫數據的客戶端,都會有一個唯一ID,即Server ID。如果不同作業使用了相同的Server ID,會因為沖突導致作業報錯。建議為每個MySQL CDC數據源配置不同的Server ID。

  • Server ID配置方式

    Server ID可以在Flink建表語句中指定,也可以通過動態Hints配置。

    建議通過動態Hints來配置Server ID,而不是在建表的WITH參數中配置Server ID。動態Hints詳情請參見動態Hints

  • 不同場景下Server ID的配置

    • 未開啟增量快照框架或并行度為1

      當未開啟增量快照框架或并行度為1時,可以指定一個特定的Server ID。

      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456') */ ;
    • 開啟增量快照框架且并行度大于1

      當開啟增量快照框架且并行度大于1時,需要指定Server ID范圍,確保范圍內可用的Server ID數量不小于并行度。假設并行度為3,可以如下配置:

      SELECT * FROM source_table /*+ OPTIONS('server-id'='123456-123458') */ ;
    • 結合CTAS進行數據同步

      當結合CTAS進行數據同步時,如果CDC數據源配置相同,會自動對數據源進行合并復用,此時可以為多個CDC數據源配置相同的Server ID。詳情請參見代碼示例四:多CTAS語句

    • 同一作業包含多個MySQL CDC源表(非CTAS)

      當作業中包含多個MySQL CDC源表,且不是使用CTAS語句同步時,如果沒有開啟Source復用(詳情請參見開啟Source復用,減少Binlog數據連接),需要為每一個CDC源表提供不同的Server ID。同理,如果開啟增量快照框架且并行度大于1,需要指定Server ID范圍。

      select * from 
        source_table1 /*+ OPTIONS('server-id'='123456-123457') */
      left join 
        source_table2 /*+ OPTIONS('server-id'='123458-123459') */
      on source_table1.id=source_table2.id;

設置分片參數,優化內存空間

MySQL CDC源表在啟動時掃描全表,將表按照主鍵分成多個分片(chunk),記錄下此時的Binlog位點。并使用增量快照算法通過Select語句,逐個讀取每個分片的數據。作業會周期性執行Checkpoint,記錄下已經完成的分片。當發生Failover時,只需要繼續讀取未完成的分片。當分片全部讀取完后,會從之前獲取的Binlog位點讀取增量的變更記錄。Flink作業會繼續周期性執行Checkpoint,記錄下Binlog位點,當作業發生Failover,便會從之前記錄的Binlog位點繼續處理,從而實現Exactly Once語義。

更詳細的增量快照算法,請參見MySQL CDC Connector

對于只有一個字段的主鍵表,默認使用該字段進行分片。對于有聯合主鍵的MySQL物理表,默認使用主鍵里的第一個字段進行分片。Flink計算引擎VVR 6.0.7及以上版本支持讀取無主鍵源表,需要設置scan.incremental.snapshot.chunk.key-column指定一個非空類型的字段進行分片。

分片參數優化

分片數據和分片信息會保存在內存中,在一些情況下,可能會出現OOM的問題。可以根據出現OOM的組件進行參數調整:

  • JobManager

    JobManager保存所有分片的信息,如果分片數量過多會出現OOM,需要通過增加scan.incremental.snapshot.chunk.size值來減少分片數。也可以在運行參數配置中設置jobmanager.memory.heap.size以增大JobManager堆內存,參見Flink參數配置

  • TaskManager

    • TaskManager讀取每個分片的數據,如果分片里數據條數過多會出現OOM,需要通過減少scan.incremental.snapshot.chunk.size值來減少分片里的數據條數。也可以在運行參數配置中調整Task Manager Memory為更大值以增加TaskManager堆內存。

    • 在VVR 8.0.8及之前版本,最后一個分片需要讀取的數據量可能比較大,導致TaskManager出現OOM,建議升級到VVR 8.0.9及以上以避免該問題。

    • 對于有聯合主鍵的MySQL CDC源表,會使用主鍵里的第一個字段進行分片,如果存在大量的數據在該字段中為相同字段值的情況,對應分片的數據會更多,可能會導致TaskManager出現OOM,可以設置scan.incremental.snapshot.chunk.key-column指定主鍵中的其他字段進行分片劃分。

開啟Source復用,減少Binlog數據連接

在作業中包含了多張MySQL源表時,開啟Source復用能夠復用Binlog連接,從而減少數據庫的壓力。該功能僅在實時計算Flink版本提供,社區版MySQL CDC連接器不支持。

您可以在SQL作業中使用SET命令開啟Source復用功能:

SET 'table.optimizer.source-merge.enabled' = 'true';

建議在新創建的作業中就開啟Source復用功能。對已有作業啟用Source復用后,需要無狀態啟動。原因是Source復用會導致作業拓撲改變,從原有作業狀態可能無法啟動或者丟失數據。

開啟Source復用后,具有相同配置參數的MySQL源表會進行合并。如果您的作業中所有源表的配置都相同,作業的Binlog連接數可以按照如下方式計算:

  • 全量讀取階段,Binlog連接數等于Source并發度。

  • 增量讀取階段,Binlog連接數等于1。

重要
  • VVR 8.0.8及8.0.9版本,在開啟CDC Source復用時,還需要額外設置SET 'sql-gateway.exec-plan.enabled' = 'false';

  • 在開啟CDC Source復用后,不建議將作業配置項pipeline.operator-chaining設為false,因為將算子鏈斷開后,Source發送給下游算子的數據會增加序列化和反序列的開銷,當合并的Source越多時,開銷會越大。

  • 在實時計算引擎VVR 8.0.7版本,將pipeline.operator-chaining設為false時會出現序列化的問題。

開啟Binlog解析參數,加速增量數據讀取

MySQL連接器作為源表或數據攝入數據源使用時,在增量階段會解析Binlog文件生成各種變更消息,Binlog文件使用二進制記錄著所有表的變更,可以通過以下方式加速Binlog文件解析。

  • 開啟并行解析和解析過濾配置(該功能僅在實時計算Flink版本提供,社區版MySQL CDC連接器不支持)

    • 開啟配置項scan.only.deserialize.captured.tables.changelog.enabled:僅對指定表的變更事件進行解析。

    • 開啟配置項scan.parallel-deserialize-changelog.enabled:采用多線程對Binlog文件進行解析,并按順序投放到消費隊列。開啟該配置時通常需要增加Task Manager CPU進行配合。

  • 優化Debezium參數

    debezium.max.queue.size: 162580
    debezium.max.batch.size: 40960
    debezium.poll.interval.ms: 50
    • debezium.max.queue.size:阻塞隊列可以容納的記錄的最大數量。當Debezium從數據庫讀取事件流時,它會在將事件寫入下游之前將它們放入阻塞隊列。默認值為8192。

    • debezium.max.batch.size:該連接器每次迭代處理的事件條數最大值。默認值為2048。

    • debezium.poll.interval.ms:連接器應該在請求新的變更事件前等待多少毫秒。默認值為1000毫秒,即1秒。

使用示例:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    -- Debezium配置
    'debezium.max.queue.size' = '162580',
    'debezium.max.batch.size' = '40960',
    'debezium.poll.interval.ms' = '50',
    -- 開啟并行解析和解析過濾
    'scan.only.deserialize.captured.tables.changelog.enabled' = 'true',  -- 僅對指定表的變更事件進行解析。
    'scan.parallel-deserialize-changelog.enabled' = 'true'  -- 使用多線程對Binlog進行解析。
    ...
)

分析數據延遲,優化作業吞吐

在增量階段出現數據延遲時,可以按照以下步驟進行分析:

  1. 參見概覽中的currentFetchEventTimeLag和currentEmitEventTimeLag兩個指標,currentFetchEventTimeLag代表從Binlog讀取到數據的延遲,currentEmitEventTimeLag代表從Binlog讀取到作業相關的表的數據的延遲。

    場景

    詳情

    currentFetchEventTimeLag延遲較小而currentEmitEventTimeLag延遲較大,并且currentEmitEventTimeLag幾乎不更新。

    currentFetchEventTimeLag延遲較小說明從數據庫拉取Binlog的延遲較低,但是Binlog中屬于作業需要讀取的表的數據較少,因此currentEmitEventTimeLag幾乎不更新,屬于正常現象。

    currentFetchEventTimeLag延遲和currentEmitEventTimeLag延遲都比較大。

    說明Source表拉取能力較弱,可以參見本小節的后續步驟進行調優。

  2. 反壓的存在會導致Source端數據發送至下游算子的速率下降,您可能會觀察到sourceIdleTime周期性上升,currentFetchEventTimeLag和currentEmitEventTimeLag不斷增長。可以通過增大反壓源頭所在節點的并發度來避免該情況。

  3. 參見CPU中的TM CPU Usage指標和JVM中的TM GC Time指標,確認是否出現CPU或者內存資源不足的情況,可以適當增加作業資源以優化讀取性能,還可以開啟mini-batch參數以提升吞吐量,參見高性能Flink SQL優化技巧

  4. 在作業中存在SinkUpsertMaterializer算子并且存在大狀態時,會影響讀取性能,請考慮增加作業并發度或者避免使用SinkUpsertMaterializer算子,詳情請參見避免使用SinkUpsertMaterializer。對已有作業配置去掉SinkUpsertMaterializer算子時,需要無狀態啟動。原因是作業拓撲發生改變,從原有作業狀態可能無法啟動或者丟失數據。

開啟讀取 RDS Binlog,避免Binlog過期

使用阿里云RDS MySQL實例作為Source數據源時,支持讀取保存在OSS的日志備份。當指定的時間戳或者Binlog位點對應的文件保存在OSS時,會自動拉取OSS日志文件到Flink集群本地進行讀取,當指定的時間戳或者Binlog位點對應的文件保存在數據庫本地時,會自動切換到使用數據庫連接進行讀取。該功能僅在實時計算Flink版本提供,社區版MySQL CDC連接器不支持。

開啟讀取OSS日志備份功能需要配置RDS的連接參數,使用示例:

CREATE TABLE mysql_source (...) WITH (
    'connector' = 'mysql-cdc',
    'rds.region-id' = 'cn-beijing',
    'rds.access-key-id' = 'your_access_key_id',
    'rds.access-key-secret' = 'your_access_key_secret',
    'rds.db-instance-id' = 'rm-xxxxxxxx',  // 數據庫實例id。
    'rds.main-db-id' = '12345678', // 主庫編號。
    'rds.endpoint' = 'rds.aliyuncs.com'
    ...
)

使用數據攝入進行整庫同步,表結構變更同步

對于只包含數據同步邏輯的作業,建議使用數據攝入運行,數據攝入作業基于數據集成場景進行了深度優化,使用方式參見數據攝入YAML作業快速入門以及數據攝入YAML作業開發(公測中)

如下代碼提供了將MySQL的app_db整庫同步到Hologres的示例,對于上游app_db庫中的表結構變更,數據攝入作業會將該變更同步到下游數據庫:

source:
  type: mysql
  hostname: <hostname>
  port: 3306
  username: ${secret_values.mysqlusername}
  password: ${secret_values.mysqlpassword}
  tables: app_db.\.*
  server-id: 5400-5404

sink:
  type: hologres
  name: Hologres Sink
  endpoint: <endpoint>
  dbname: <database-name>
  username: ${secret_values.holousername}
  password: ${secret_values.holopassword}

pipeline:
  name: Sync MySQL Database to Hologres