日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Debezium將MySQL數(shù)據(jù)同步至云消息隊(duì)列 Kafka 版

更新時(shí)間:

本文介紹如何創(chuàng)建Debezium MySQL Source Connector,將MySQL的數(shù)據(jù)同步至云消息隊(duì)列 Kafka 版。

前提條件

步驟一:創(chuàng)建數(shù)據(jù)表

  1. 登錄RDS管理控制臺(tái),創(chuàng)建RDS MySQL實(shí)例。更多信息,請(qǐng)參見快速創(chuàng)建RDS MySQL實(shí)例。

    創(chuàng)建實(shí)例時(shí),請(qǐng)選擇與前提條件中已購買部署的Kafka實(shí)例相同的VPC,并將此VPC網(wǎng)段加入白名單。image..png

  2. 實(shí)例創(chuàng)建完成后,在實(shí)例列表頁面單擊目標(biāo)實(shí)例,然后在實(shí)例詳情頁面的左側(cè)導(dǎo)航欄,完成以下操作。

    1. 單擊賬號(hào)管理,創(chuàng)建一個(gè)新賬號(hào),也可使用已有賬號(hào)。更多信息,請(qǐng)參見創(chuàng)建數(shù)據(jù)庫和賬號(hào)

    2. 單擊數(shù)據(jù)庫管理,創(chuàng)建數(shù)據(jù)庫,也可使用已有數(shù)據(jù)庫。更多信息,請(qǐng)參見創(chuàng)建數(shù)據(jù)庫和賬號(hào)。

    3. 單擊數(shù)據(jù)庫連接,記錄內(nèi)網(wǎng)地址和端口號(hào)。image..png

  3. 基本信息頁面,單擊登錄數(shù)據(jù)庫進(jìn)入DMS數(shù)據(jù)管理服務(wù)平臺(tái),完成以下操作。

    1. 在左側(cè)雙擊數(shù)據(jù)庫名稱,切換到已創(chuàng)建的數(shù)據(jù)庫。image..png

    2. SQL Console頁簽,使用SQL語句創(chuàng)建表格。例如,創(chuàng)建一個(gè)列參數(shù)分別為idnumber的表格,命令如下。更多信息,請(qǐng)參見SQL Commands。

      CREATE TABLE sql_table(id INT ,number INT);

步驟二:創(chuàng)建Connector任務(wù)

  1. 下載Debezium MySQL Source Connector文件,上傳至提前創(chuàng)建好的OSS bucket,更多信息,請(qǐng)參見控制臺(tái)上傳文件。

    重要

    下載Debezium MySQL Connector文件時(shí)請(qǐng)選擇適配Java 8的版本。

  2. 登錄云消息隊(duì)列 Kafka 版控制臺(tái),在概覽頁面的資源分布區(qū)域,選擇地域。

  3. 在左側(cè)導(dǎo)航欄,選擇Connector生態(tài)集成 > 消息流入(Source)。

  4. 消息流入(Source)頁面,單擊創(chuàng)建任務(wù)

  5. 消息流入創(chuàng)建面板,設(shè)置任務(wù)名稱,選擇流入類型Apache Kafka Connect。

    1. 連接器配置區(qū)域,設(shè)置以下參數(shù)。

      配置項(xiàng)

      參數(shù)

      說明

      Kafka Connect插件

      Bucket存儲(chǔ)桶

      選擇OSS Bucket。

      文件

      選擇上傳的.ZIP文件。

      Kafka資源信息

      Kafka實(shí)例

      選擇前提條件中創(chuàng)建的實(shí)例。

      專有網(wǎng)絡(luò)VPC

      默認(rèn)選擇部署Kafka實(shí)例時(shí)選擇的VPC ID且不可更改。

      交換機(jī)

      默認(rèn)選擇部署Kafka實(shí)例時(shí)選擇的vSwitch ID且不可更改。

      安全組

      選擇安全組。

      Kafka Connect配置信息

      解析當(dāng)前ZIP包下的properties文件

      選擇.ZIP文件中包含的SourceConnector對(duì)應(yīng)的.properties文件。路徑為/etc/xxx.properties。在輸入框中更新下列字段的取值。

      • tasks.max:Task的最大數(shù)量。該場(chǎng)景下只支持設(shè)置為1。

      • database.hostname:填寫步驟一:創(chuàng)建數(shù)據(jù)表中獲取的源數(shù)據(jù)庫內(nèi)網(wǎng)地址。

      • database.port:填寫步驟一:創(chuàng)建數(shù)據(jù)表中獲取的端口號(hào)。

      • database.user:數(shù)據(jù)庫登錄賬號(hào)。

      • database.password:數(shù)據(jù)庫登錄密碼。

      • database.server.name:MySQL數(shù)據(jù)庫服務(wù)的邏輯名稱。

        • 僅允許包含英文字母、數(shù)字以及下劃線(_)。

        • 該字段用于組成數(shù)據(jù)庫及表格的目標(biāo)Topic的名稱,數(shù)據(jù)庫目標(biāo)Topic的命名規(guī)則為{database.server.name},表格目標(biāo)Topic的命名規(guī)則為{database.server.name}.{databaseName}.{tableName}。請(qǐng)?jiān)谶\(yùn)行Connector任務(wù)前按照命名規(guī)則提前創(chuàng)建對(duì)應(yīng)的目標(biāo)Topic。

      • database.include.list:源數(shù)據(jù)庫名稱。若有多個(gè)數(shù)據(jù)庫,用英文逗號(hào)(,)分隔。

      • table.include.list:源表格名稱,單個(gè)表格的格式為 {databaseName}.{tableName};若有多個(gè)表格,用英文逗號(hào)(,)分隔。

      • database.history.kafka.bootstrap.servers:Kafka實(shí)例連接地址。

        • 該實(shí)例用于記錄數(shù)據(jù)庫所有Schema變動(dòng)記錄。

        • 可使用Connector任務(wù)中已配置的目標(biāo)Kafka實(shí)例,也可使用新實(shí)例。

        • 實(shí)例需要與數(shù)據(jù)庫實(shí)例處于同一個(gè)VPC中。

        • 該實(shí)例需要開啟自由使用Group能力。更多信息,請(qǐng)參見自由使用Group

      • database.history.kafka.topic:此Topic用于記錄數(shù)據(jù)庫所有Schema的變動(dòng)記錄,請(qǐng)?jiān)谶\(yùn)行Connector任務(wù)前提前創(chuàng)建。

      • include.schema.changes:是否監(jiān)控Schema變動(dòng)記錄,若取值為true,則會(huì)將這些變動(dòng)記錄寫入名為{database.server.name}的Topic中。

      Connector全量參數(shù),請(qǐng)參見Debezium Connector Properties。

    2. 實(shí)例配置區(qū)域,設(shè)置以下參數(shù)。

      配置項(xiàng)

      參數(shù)

      說明

      Worker規(guī)格

      Worker規(guī)格

      選擇合適的Worker規(guī)格。

      最小Worker數(shù)

      設(shè)置為1。

      最大Worker數(shù)

      設(shè)置為1。此數(shù)值不得超過Task的最大數(shù)量。

      Kafka Connect Worker 配置

      自動(dòng)創(chuàng)建Kafka Connect Worker依賴資源

      建議勾選此項(xiàng),此時(shí)會(huì)在選擇的Kafka實(shí)例中自動(dòng)創(chuàng)建Kafka Connect運(yùn)行所需的一些Internal Topic以及ConsumerGroup,并將這些必填配置自動(dòng)填入配置框中,包括以下配置項(xiàng):

      • Offset Topic:用于存儲(chǔ)源數(shù)據(jù)偏移量,命名規(guī)則為connect-eb-offset-<任務(wù)名稱>。

      • Config Topic:用于存儲(chǔ)Connectors以及Tasks的配置信息,命名規(guī)則為connect-eb-config-<任務(wù)名稱>。

      • Status Topic:用于存儲(chǔ)Connectors以及Tasks狀態(tài)信息,命名規(guī)則為connect-eb-status-<任務(wù)名稱>

      • Kafka Connect Consumer Group:Kafka Connect Worker用于消費(fèi)Internal Topics的消費(fèi)組,命名規(guī)則為connect-eb-cluster-<任務(wù)名稱>。

      • Kafka Source Connector Consumer Group:只針對(duì)Sink Connector有效,用于消費(fèi)源Topic中的數(shù)據(jù),命名規(guī)則為connector-eb-cluster-<任務(wù)名稱>-<connector名稱>。

    3. 運(yùn)行配置區(qū)域,將日志投遞方式設(shè)置為投遞至SLS或者投遞至Kafka,在角色授權(quán)卡片設(shè)置Connect依賴的角色配置。

      重要

      建議配置的角色包含AliyunSAEFullAccess權(quán)限,否則可能會(huì)導(dǎo)致任務(wù)運(yùn)行失敗。

    4. 單擊確定。

    等待任務(wù)狀態(tài)變?yōu)?b data-tag="uicontrol" id="88548e1005asr" class="uicontrol">運(yùn)行中,此時(shí)Connector已經(jīng)在正常工作中。

步驟三:測(cè)試Connector任務(wù)

  1. 在DMS數(shù)據(jù)管理服務(wù)平臺(tái),向步驟一:創(chuàng)建數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)表插入一條數(shù)據(jù)。例如,插入一條id為123,number為20000的數(shù)據(jù),命令如下。

    INSERT INTO sql_table(id, number) VALUES(123,20000);
  2. 登錄云消息隊(duì)列 Kafka 版控制臺(tái),在實(shí)例列表頁面,單擊目標(biāo)實(shí)例。

  3. 在目標(biāo)實(shí)例頁面,單擊目標(biāo)Topic,然后單擊消息查詢,查看插入的消息數(shù)據(jù),消息Value示例如下。

    • 數(shù)據(jù)庫目標(biāo)Topic(命名規(guī)則為{database.server.name})示例:

      {
          "source":{
              "version":"1.5.0.Final",
              "connector":"mysql",
              "name":"fulfillment",
              "ts_ms":1686283675404,
              "snapshot":"true",
              "db":"wbdb",
              "sequence":null,
              "table":"sql_table",
              "server_id":0,
              "gtid":null,
              "file":"mysql-bin.000006",
              "pos":188032,
              "row":0,
              "thread":null,
              "query":null
          },
          "databaseName":"wbdb",
          "schemaName":null,
          "ddl":"DROP TABLE IF EXISTS sql_table",
          "tableChanges":[
      
          ]
      }
    • 數(shù)據(jù)表格目標(biāo)Topic(命名規(guī)則為{database.server.name}.{databaseName}.{tableName})示例:

      {
          "before":null,
          "after":{
              "id":123,
              "number":20000
          },
          "source":{
              "version":"1.5.0.Final",
              "connector":"mysql",
              "name":"fulfillment",
              "ts_ms":1686283675675,
              "snapshot":"last",
              "db":"wbdb",
              "sequence":null,
              "table":"sql_table",
              "server_id":0,
              "gtid":null,
              "file":"mysql-bin.000006",
              "pos":188032,
              "row":0,
              "thread":null,
              "query":null
          },
          "op":"r",
          "ts_ms":1686283675675,
          "transaction":null
      }
    • database.history.kafka.topic字段指定的Topic示例:

      {
          "source":{
              "server":"fulfillment"
          },
          "position":{
              "ts_sec":1686283675,
              "file":"mysql-bin.000006",
              "pos":188032,
              "gtids":"be4286e6-05ce-11ee-b8c2-00163e20****:1-****",
              "snapshot":true
          },
          "databaseName":"wbdb",
          "ddl":"CREATE DATABASE `wbdb` CHARSET utf8 COLLATE utf8_general_ci",
          "tableChanges":[
      
          ]
      }

    常見報(bào)錯(cuò)

    場(chǎng)景一:所有Tasks運(yùn)行失敗

    錯(cuò)誤信息:

    All tasks under connector mongo-source failed, please check the error trace of the task.

    解決方法:在消息流入任務(wù)詳情頁面,單擊基礎(chǔ)信息區(qū)域的診斷鏈接,即可跳轉(zhuǎn)到Connector監(jiān)控頁面,可以看到Tasks運(yùn)行失敗的詳細(xì)錯(cuò)誤信息。

    場(chǎng)景二:Kafka Connect退出

    錯(cuò)誤信息:

    Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.

    解決方法:由于狀態(tài)獲取可能會(huì)有延遲,建議您先嘗試刷新頁面。若刷新后仍然是失敗狀態(tài),您可以按照以下步驟查看錯(cuò)誤信息。

    1. 在消息流入任務(wù)詳情頁面的Worker信息區(qū)域,單擊SAE應(yīng)用后的實(shí)例名稱,跳轉(zhuǎn)到SAE應(yīng)用詳情頁面。

    2. 基本信息頁面,單擊實(shí)例部署信息頁簽。

    3. 在實(shí)例右側(cè)操作列,單擊Webshell登錄Kafka Connect運(yùn)行環(huán)境。實(shí)例部署信息

      • 執(zhí)行vi /home/admin/connector-bootstrap.log命令,查看Connector啟動(dòng)日志,查找其中是否包含錯(cuò)誤信息。

      • 執(zhí)行vi /opt/kafka/logs/connect.log命令,查看Connector運(yùn)行日志,在其中查找ERROR或者WARN字段來查看是否有錯(cuò)誤信息。

    基于錯(cuò)誤信息提示進(jìn)行修復(fù)操作后,可以重新啟動(dòng)對(duì)應(yīng)任務(wù)。

    場(chǎng)景三:Connector參數(shù)校驗(yàn)失敗

    錯(cuò)誤信息:

    Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
    Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
    You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

    解決方法:此時(shí)需要根據(jù)錯(cuò)誤信息,找出具體哪個(gè)參數(shù)出錯(cuò),更新對(duì)應(yīng)參數(shù)即可。若基于上述錯(cuò)誤信息無法定位具體的出錯(cuò)參數(shù),可以參考上文場(chǎng)景二中的步驟登錄Kafka Connect運(yùn)行環(huán)境,執(zhí)行以下命令,查詢參數(shù)是否校驗(yàn)通過。

    curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

    該指令會(huì)返回Connector參數(shù)中每個(gè)參數(shù)是否校驗(yàn)通過,若不通過,則errors屬性非空,如下所示。

    "value":{
        "name":"snapshot.mode",
        "value":null,
        "recommended_values":[
            "never",
            "initial_only",
            "when_needed",
            "initial",
            "schema_only",
            "schema_only_recovery"
        ],
        "errors":[
            "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
        ],
        "visible":true
    }

    場(chǎng)景四:獲取Topic元數(shù)據(jù)超時(shí)

    錯(cuò)誤信息:

    org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

    解決方法:出現(xiàn)此報(bào)錯(cuò)的原因是您在啟動(dòng)Connector任務(wù)前未按照命名規(guī)則提前創(chuàng)建數(shù)據(jù)庫目標(biāo)Topic{database.server.name}和表格目標(biāo)Topic{database.server.name}.{databaseName}.{tableName},建議您停用任務(wù)后創(chuàng)建目標(biāo)Topic再啟用任務(wù)。