日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Debezium將MySQL數據同步至云消息隊列 Kafka 版

本文介紹如何創建Debezium MySQL Source Connector,將MySQL的數據同步至云消息隊列 Kafka 版

前提條件

步驟一:創建數據表

  1. 登錄RDS管理控制臺,創建RDS MySQL實例。更多信息,請參見快速創建RDS MySQL實例

    創建實例時,請選擇與前提條件中已購買部署的Kafka實例相同的VPC,并將此VPC網段加入白名單。image..png

  2. 實例創建完成后,在實例列表頁面單擊目標實例,然后在實例詳情頁面的左側導航欄,完成以下操作。

    1. 單擊賬號管理,創建一個新賬號,也可使用已有賬號。更多信息,請參見創建數據庫和賬號

    2. 單擊數據庫管理,創建數據庫,也可使用已有數據庫。更多信息,請參見創建數據庫和賬號

    3. 單擊數據庫連接,記錄內網地址和端口號。image..png

  3. 基本信息頁面,單擊登錄數據庫進入DMS數據管理服務平臺,完成以下操作。

    1. 在左側雙擊數據庫名稱,切換到已創建的數據庫。image..png

    2. SQL Console頁簽,使用SQL語句創建表格。例如,創建一個列參數分別為idnumber的表格,命令如下。更多信息,請參見SQL Commands

      CREATE TABLE sql_table(id INT ,number INT);

步驟二:創建Connector任務

  1. 下載Debezium MySQL Source Connector文件,上傳至提前創建好的OSS bucket,更多信息,請參見控制臺上傳文件

    重要

    下載Debezium MySQL Connector文件時請選擇適配Java 8的版本。

  2. 登錄云消息隊列 Kafka 版控制臺,在概覽頁面的資源分布區域,選擇地域。

  3. 在左側導航欄,選擇Connector生態集成 > 任務列表

  4. 任務列表頁面,單擊創建任務

  5. 創建任務面板,設置任務名稱,選擇流入類型Apache Kafka Connect

    1. 連接器配置區域,設置以下參數。

      配置項

      參數

      說明

      Kafka Connect插件

      Bucket存儲桶

      選擇OSS Bucket。

      文件

      選擇上傳的.ZIP文件。

      Kafka資源信息

      Kafka實例

      選擇前提條件中創建的實例。

      專有網絡VPC

      默認選擇部署Kafka實例時選擇的VPC ID且不可更改。

      交換機

      默認選擇部署Kafka實例時選擇的vSwitch ID且不可更改。

      安全組

      選擇安全組。

      Kafka Connect配置信息

      解析當前ZIP包下的properties文件

      選擇.ZIP文件中包含的SourceConnector對應的.properties文件。路徑為/etc/xxx.properties。在輸入框中更新下列字段的取值。

      • tasks.max:Task的最大數量。該場景下只支持設置為1。

      • database.hostname:填寫步驟一:創建數據表中獲取的源數據庫內網地址。

      • database.port:填寫步驟一:創建數據表中獲取的端口號。

      • database.user:數據庫登錄賬號。

      • database.password:數據庫登錄密碼。

      • database.server.name:MySQL數據庫服務的邏輯名稱。

        • 僅允許包含英文字母、數字以及下劃線(_)。

        • 該字段用于組成數據庫及表格的目標Topic的名稱,數據庫目標Topic的命名規則為{database.server.name},表格目標Topic的命名規則為{database.server.name}.{databaseName}.{tableName}。請在運行Connector任務前按照命名規則提前創建對應的目標Topic。

      • database.include.list:源數據庫名稱。若有多個數據庫,用英文逗號(,)分隔。

      • table.include.list:源表格名稱,單個表格的格式為 {databaseName}.{tableName};若有多個表格,用英文逗號(,)分隔。

      • database.history.kafka.bootstrap.servers:Kafka實例連接地址。

        • 該實例用于記錄數據庫所有Schema變動記錄。

        • 可使用Connector任務中已配置的目標Kafka實例,也可使用新實例。

        • 實例需要與數據庫實例處于同一個VPC中。

        • 該實例需要開啟自由使用Group能力。更多信息,請參見自由使用Group

      • database.history.kafka.topic:此Topic用于記錄數據庫所有Schema的變動記錄,請在運行Connector任務前提前創建。

      • include.schema.changes:是否監控Schema變動記錄,若取值為true,則會將這些變動記錄寫入名為{database.server.name}的Topic中。

      Connector全量參數,請參見Debezium Connector Properties

    2. 實例配置區域,設置以下參數。

      配置項

      參數

      說明

      Worker規格

      Worker規格

      選擇合適的Worker規格。

      最小Worker數

      設置為1。

      最大Worker數

      設置為1。此數值不得超過Task的最大數量。

      Kafka Connect Worker 配置

      自動創建Kafka Connect Worker依賴資源

      建議勾選此項,此時會在選擇的Kafka實例中自動創建Kafka Connect運行所需的一些Internal Topic以及ConsumerGroup,并將這些必填配置自動填入配置框中,包括以下配置項:

      • Offset Topic:用于存儲源數據偏移量,命名規則為connect-eb-offset-<任務名稱>

      • Config Topic:用于存儲Connectors以及Tasks的配置信息,命名規則為connect-eb-config-<任務名稱>

      • Status Topic:用于存儲Connectors以及Tasks狀態信息,命名規則為connect-eb-status-<任務名稱>

      • Kafka Connect Consumer Group:Kafka Connect Worker用于消費Internal Topics的消費組,命名規則為connect-eb-cluster-<任務名稱>

      • Kafka Source Connector Consumer Group:只針對Sink Connector有效,用于消費源Topic中的數據,命名規則為connector-eb-cluster-<任務名稱>-<connector名稱>

    3. 運行配置區域,將日志投遞方式設置為投遞至SLS或者投遞至Kafka,在角色授權卡片設置Connect依賴的角色配置

      重要

      建議配置的角色包含AliyunSAEFullAccess權限,否則可能會導致任務運行失敗。

    4. 單擊確定

    等待任務狀態變為運行中,此時Connector已經在正常工作中。

步驟三:測試Connector任務

  1. 在DMS數據管理服務平臺,向步驟一:創建數據表中創建的數據表插入一條數據。例如,插入一條id為123,number為20000的數據,命令如下。

    INSERT INTO sql_table(id, number) VALUES(123,20000);
  2. 登錄云消息隊列 Kafka 版控制臺,在實例列表頁面,單擊目標實例。

  3. 在目標實例頁面,單擊目標Topic,然后單擊消息查詢,查看插入的消息數據,消息Value示例如下。

    • 數據庫目標Topic(命名規則為{database.server.name})示例:

      {
          "source":{
              "version":"1.5.0.Final",
              "connector":"mysql",
              "name":"fulfillment",
              "ts_ms":1686283675404,
              "snapshot":"true",
              "db":"wbdb",
              "sequence":null,
              "table":"sql_table",
              "server_id":0,
              "gtid":null,
              "file":"mysql-bin.000006",
              "pos":188032,
              "row":0,
              "thread":null,
              "query":null
          },
          "databaseName":"wbdb",
          "schemaName":null,
          "ddl":"DROP TABLE IF EXISTS sql_table",
          "tableChanges":[
      
          ]
      }
    • 數據表格目標Topic(命名規則為{database.server.name}.{databaseName}.{tableName})示例:

      {
          "before":null,
          "after":{
              "id":123,
              "number":20000
          },
          "source":{
              "version":"1.5.0.Final",
              "connector":"mysql",
              "name":"fulfillment",
              "ts_ms":1686283675675,
              "snapshot":"last",
              "db":"wbdb",
              "sequence":null,
              "table":"sql_table",
              "server_id":0,
              "gtid":null,
              "file":"mysql-bin.000006",
              "pos":188032,
              "row":0,
              "thread":null,
              "query":null
          },
          "op":"r",
          "ts_ms":1686283675675,
          "transaction":null
      }
    • database.history.kafka.topic字段指定的Topic示例:

      {
          "source":{
              "server":"fulfillment"
          },
          "position":{
              "ts_sec":1686283675,
              "file":"mysql-bin.000006",
              "pos":188032,
              "gtids":"be4286e6-05ce-11ee-b8c2-00163e20****:1-****",
              "snapshot":true
          },
          "databaseName":"wbdb",
          "ddl":"CREATE DATABASE `wbdb` CHARSET utf8 COLLATE utf8_general_ci",
          "tableChanges":[
      
          ]
      }

    常見報錯

    場景一:所有Tasks運行失敗

    錯誤信息:

    All tasks under connector mongo-source failed, please check the error trace of the task.

    解決方法:在消息流入任務詳情頁面,單擊基礎信息區域的診斷鏈接,即可跳轉到Connector監控頁面,可以看到Tasks運行失敗的詳細錯誤信息。

    場景二:Kafka Connect退出

    錯誤信息:

    Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.

    解決方法:由于狀態獲取可能會有延遲,建議您先嘗試刷新頁面。若刷新后仍然是失敗狀態,您可以按照以下步驟查看錯誤信息。

    1. 在消息流入任務詳情頁面的Worker信息區域,單擊SAE應用后的實例名稱,跳轉到SAE應用詳情頁面。

    2. 基本信息頁面,單擊實例部署信息頁簽。

    3. 在實例右側操作列,單擊Webshell登錄Kafka Connect運行環境。實例部署信息

      • 執行vi /home/admin/connector-bootstrap.log命令,查看Connector啟動日志,查找其中是否包含錯誤信息。

      • 執行vi /opt/kafka/logs/connect.log命令,查看Connector運行日志,在其中查找ERROR或者WARN字段來查看是否有錯誤信息。

    基于錯誤信息提示進行修復操作后,可以重新啟動對應任務。

    場景三:Connector參數校驗失敗

    錯誤信息:

    Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
    Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
    You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

    解決方法:此時需要根據錯誤信息,找出具體哪個參數出錯,更新對應參數即可。若基于上述錯誤信息無法定位具體的出錯參數,可以參考上文場景二中的步驟登錄Kafka Connect運行環境,執行以下命令,查詢參數是否校驗通過。

    curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

    該指令會返回Connector參數中每個參數是否校驗通過,若不通過,則errors屬性非空,如下所示。

    "value":{
        "name":"snapshot.mode",
        "value":null,
        "recommended_values":[
            "never",
            "initial_only",
            "when_needed",
            "initial",
            "schema_only",
            "schema_only_recovery"
        ],
        "errors":[
            "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
        ],
        "visible":true
    }

    場景四:獲取Topic元數據超時

    錯誤信息:

    org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

    解決方法:出現此報錯的原因是您在啟動Connector任務前未按照命名規則提前創建數據庫目標Topic{database.server.name}和表格目標Topic{database.server.name}.{databaseName}.{tableName},建議您停用任務后創建目標Topic再啟用任務。