日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用JDBC Connector同步MySQL數據

本文介紹如何創建JDBC Connector,將MySQL和云消息隊列 Kafka 版的數據進行互相同步。

前提條件

步驟一:創建數據表

  1. 登錄RDS管理控制臺,創建RDS MySQL實例。更多信息,請參見創建RDS MySQL實例

    創建實例時,請選擇與前提條件中相同的VPC,并將此VPC網段加入白名單。加入白名單

  2. 實例創建完成后,在實例列表頁面單擊目標實例,然后在實例詳情頁面的左側導航欄,完成以下操作。

    1. 單擊創建賬號,也可使用已有賬號。更多信息,請參見創建賬號和數據庫

    2. 單擊創建數據庫,也可使用已有數據庫。更多信息,請參見創建賬號和數據庫

    3. 單擊數據庫連接,記錄內網地址和端口號。

      內網地址

  3. 在實例詳情頁面,單擊登錄數據庫進入DMS數據管理服務平臺,單擊目標數據庫,使用SQL語句創建表格。例如,創建一個列參數分別為idnumber的表格,命令如下。更多信息,請參見SQL Commands

    CREATE TABLE sql_table(id INT ,number INT);
    重要

    創建表格時,請將任一列參數設置為主鍵并且設為遞增。更多信息,請參見查詢與變更表結構

步驟二:創建Connector

Source Connector

  1. 下載JDBC Connector文件,上傳至提前創建好的OSS Bucket。更多信息,請參見控制臺上傳文件

    重要

    下載JDBC Connector文件時請選擇適配Java 8的版本。

  2. 登錄云消息隊列 Kafka 版控制臺,在概覽頁面的資源分布區域,選擇地域。

  3. 在左側導航欄,選擇Connector生態集成 > 任務列表

  4. 任務列表頁面,單擊創建任務

  5. 創建任務面板。設置任務名稱,配置以下配置項。

    • 任務創建

      1. Source(源)配置向導,選擇數據提供方Apache Kafka Connect,單擊下一步

      2. 連接器配置配置向導,設置以下配置項,然后單擊下一步

        配置項

        參數

        說明

        Kafka Connect插件

        Bucket存儲桶

        選擇OSS Bucket。

        文件

        選擇上傳的.ZIP文件。

        Kafka資源信息

        Kafka參數配置

        選擇Source Connect。

        Kafka實例

        選擇前提條件中創建的實例。

        專有網絡VPC

        選擇VPC ID。

        交換機

        選擇vSwitch ID。

        安全組

        選擇安全組。

        Kafka Connect配置信息

        解析當前ZIP包下的properties文件

        選擇新建properties文件。選擇.ZIP文件中包含的SourceConnector對應的.properties文件。路徑為/etc/source-xxx.properties。

        在輸入框中更新相關字段的取值。展開查看字段描述

        字段名

        描述

        connector.class

        運行的Connector包名稱,無需修改。

        tasks.max

        Task的最大數量,不能超過步驟一:創建數據表中創建的表格的數量。

        connection.url

        數據庫連接地址,不同數據庫有不同的地址格式。本文以MySQL為例,請將步驟一:創建數據表中獲取的內網地址和端口號分別填入host和port字段。

        • MySQL:jdbc:mysql://<host>:<port>/<database>

        • PostgreSQL:jdbc:postgresql://<host>:<port>/<database>

        • IBM DB2:jdbc:db2://<host>:<port>/<database>

        • IBM Informix:jdbc:informix-sqli://<ip>:<port>/<database>:informixserver=<debservername>

        • MS SQL:jdbc:sqlserver://<host>[:<port>];databaseName=<database>

        • Oracle:jdbc:oracle:thin://<host>:<port>/<service> 或者jdbc:oracle:thin:<host>:<port>:<SID>

        incrementing.column.name

        填入步驟一:創建數據表中設置的遞增列名稱。

        topic.prefix

        目標Topic前綴。目標Topic名稱為<topic.prefix><tableName>,在投遞數據前,請按照命名規范提前創建好目標Topic。

        connection.user

        數據庫登錄用戶名。

        connection.password

        數據庫登錄密碼。

        table.whitelist

        數據庫表格列表,不同數據庫之間以英文逗號(,)分隔。

        展開查看示例

        name=test-source-mysql
        connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
        
        # 用戶可通過該參數指定最大任務數,這些任務數將均分分布在SAE不同實例上。
        # tasks.max取值不超過源表格數量。
        tasks.max=1
        
        # 數據庫連接地址,不同數據庫驅動有不同的地址格式。
        # MySQL: jdbc:mysql://<host>:<port>/<database>
        # PostgreSQL: jdbc:postgresql://<host>:<port>/<database>
        # IBM DB2: jdbc:db2://<host>:<port>/<database>
        # IBM Informix: jdbc:informix-sqli://<ip>:<port>/<database>:informixserver=<debservername>
        # MS SQL: jdbc:sqlserver://<host>[:<port>];databaseName=<database>
        # Oracle: jdbc:oracle:thin://<host>:<port>/<service> 或者jdbc:oracle:thin:<host>:<port>:<SID>
        connection.url=jdbc:mysql://rm-******.mysql.rds.aliyuncs.com:3306/test_database
        
        # 更新表格的模式,incrementing表示在每個表上使用嚴格遞增的列以檢測新行。
        # 請注意,這不會檢測現有行的修改或刪除。
        mode=incrementing
        
        # 遞增列名稱。
        incrementing.column.name=id
        # 目標 topic 名稱為 ${topic.prefix}${tableName}
        # 在本例中,需要提前創建名為mysql-topic-source_table的Topic,否則運行時會報錯。
        topic.prefix=mysql-topic-
        
        # 數據庫賬號。
        connection.user=root
        # 數據庫密碼。
        connection.password=123456
        
        # 指定源表格。
        table.whitelist=source_table
        
        value.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter.schemas.enable=true
        
        key.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=true

        Connector全量參數配置,請參見JDBC Source Connector Configuration Properties

      3. 實例配置配置向導,設置以下參數,然后單擊下一步

        配置項

        參數

        說明

        Worker規格

        Worker規格

        選擇合適的Worker規格。

        最小Worker數

        設置為1。

        最大Worker數

        設置為1。

        Kafka Connect Worker 配置

        自動創建Kafka Connect Worker依賴資源

        建議勾選此項,此時會在選擇的Kafka實例中自動創建Kafka Connect運行所需的一些Internal Topic以及ConsumerGroup,并將這些必填配置自動填入配置框中,包括以下配置項:

        • Offset Topic:用于存儲源數據偏移量,命名規則為connect-eb-offset-<任務名稱>

        • Config Topic:用于存儲Connectors以及Tasks的配置信息,命名規則為connect-eb-config-<任務名稱>

        • Status Topic:用于存儲Connectors以及Tasks狀態信息,命名規則為connect-eb-status-<任務名稱>

        • Kafka Connect Consumer Group:Kafka Connect Worker用于消費Internal Topics的消費組,命名規則為connect-eb-cluster-<任務名稱>

        • Kafka Source Connector Consumer Group:只針對Sink Connector有效,用于消費源Topic中的數據,命名規則為connector-eb-cluster-<任務名稱>-<connector名稱>

      4. 運行配置配置向導,將日志投遞方式設置為投遞至SLS或者投遞至Kafka,在角色授權卡片設置Connect依賴的角色配置,然后單擊保存

        重要

        建議配置的角色包含AliyunSAEFullAccess權限,否則可能會導致任務運行失敗。

    • 任務屬性

      設置此任務的重試策略及死信隊列。更多信息,請參見重試和死信

    等待任務狀態變為運行中,此時Connector已經在正常工作中。

Sink Connector

  1. 下載JDBC Connector文件,上傳至提前創建好的OSS Bucket。更多信息,請參見控制臺上傳文件

    重要

    下載JDBC Connector文件時請選擇適配Java 8的版本。

  2. 登錄云消息隊列 Kafka 版控制臺,在概覽頁面的資源分布區域,選擇地域。

  3. 在左側導航欄,選擇Connector生態集成 > 任務列表

  4. 任務列表頁面,單擊創建任務

  5. 創建任務面板。設置任務名稱,配置以下配置項。

    • 任務創建

      1. Source(源)配置向導,選擇數據提供方Apache Kafka Connect,單擊下一步

      2. 連接器配置配置向導,設置以下配置項,然后單擊下一步

        配置項

        參數

        說明

        Kafka Connect插件

        Bucket存儲桶

        選擇OSS Bucket。

        文件

        選擇上傳的.ZIP文件。

        Kafka資源信息

        Kafka參數配置

        選擇Sink Connect。

        Kafka實例

        選擇前提條件中創建的實例。

        專有網絡VPC

        選擇VPC ID。

        交換機

        選擇vSwitch ID。

        安全組

        選擇安全組。

        Kafka Connect配置信息

        解析當前ZIP包下的properties文件

        選擇新建properties文件。選擇.ZIP文件中包含的SinkConnector對應的.properties文件。路徑為/etc/sink-xxx.properties。

        在輸入框中更新相關字段的取值。展開查看字段描述

        字段名

        描述

        connector.class

        運行Connector的包名稱,無需修改。

        tasks.max

        Task的最大數量。

        topics

        數據源Topic名稱。

        connection.url

        數據庫連接地址,不同數據庫有不同的地址格式。本文以MySQL為例,請將步驟一:創建數據表中獲取的內網地址和端口號分別填入host和port字段。

        • MySQL:jdbc:mysql://<host>:<port>/<database>

        • PostgreSQL:jdbc:postgresql://<host>:<port>/<database>

        • IBM DB2:jdbc:db2://<host>:<port>/<database>

        • IBM Informix:jdbc:informix-sqli://<ip>:<port>/<database>:informixserver=<debservername>

        • MS SQL:jdbc:sqlserver://<host>[:<port>];databaseName=<database>

        • Oracle:jdbc:oracle:thin://<host>:<port>/<service> 或者jdbc:oracle:thin:<host>:<port>:<SID>

        pk.fields

        主鍵名稱。若有多個主鍵以英文逗號(,)分隔。

        connection.user

        數據庫登錄用戶名。

        connection.password

        數據庫登錄密碼。

        table.name.format

        目標表格名稱。

        展開查看示例

        name=test-sink-mysql
        connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
        
        # 用戶可通過該參數指定最大任務數,這些任務數將均分分布在SAE不同實例上。
        tasks.max=2
        
        # 數據源Topic名稱。
        topics=sink-topic
        
        # 數據庫連接地址,不同數據庫驅動有不同的地址格式。
        # MySQL: jdbc:mysql://<host>:<port>/<database>
        # PostgreSQL: jdbc:postgresql://<host>:<port>/<database>
        # IBM DB2: jdbc:db2://<host>:<port>/<database>
        # IBM Informix: jdbc:informix-sqli://<ip>:<port>/<database>:informixserver=<debservername>
        # MS SQL: jdbc:sqlserver://<host>[:<port>];databaseName=<database>
        # Oracle: jdbc:oracle:thin://<host>:<port>/<service>或者jdbc:oracle:thin:<host>:<port>:<SID>
        connection.url=jdbc:mysql://rm-******.mysql.rds.aliyuncs.com:3306/test_database
        
        # 數據庫賬號。
        connection.user=root
        # 數據庫密碼。
        connection.password=123456
        
        insert.mode=upsert
        # 是否自動創建目標表格。
        auto.create=false
        
        # record_value 表示從Kafka消息體中讀取數據至目標表格中。
        pk.mode=record_value
        # 主鍵名稱,以逗號分隔。
        pk.fields=id
        
        # 指定目標表格名稱。
        table.name.format=sink_table
        
        value.converter=org.apache.kafka.connect.json.JsonConverter
        value.converter.schemas.enable=true
        
        key.converter=org.apache.kafka.connect.json.JsonConverter
        key.converter.schemas.enable=false
        
        # 容錯配置,all表示錯誤發生時繼續執行;none表示錯誤發生時拋出異常,connector運行停止。
        errors.tolerance=none

        Connector全量參數配置,請參見JDBC Sink Connector Configuration Properties

      3. 實例配置配置向導,設置以下參數,然后單擊下一步

        配置項

        參數

        說明

        Worker規格

        Worker規格

        選擇合適的Worker規格。

        最小Worker數

        設置為1。

        最大Worker數

        設置為1。此值不能超過tasks.max的取值。

        Kafka Connect Worker 配置

        自動創建Kafka Connect Worker依賴資源

        建議勾選此項,此時會在選擇的Kafka實例中自動創建Kafka Connect運行所需的一些Internal Topic以及ConsumerGroup,并將這些必填配置自動填入配置框中,包括以下配置項:

        • Offset Topic:用于存儲源數據偏移量,命名規則為connect-eb-offset-<任務名稱>

        • Config Topic:用于存儲Connectors以及Tasks的配置信息,命名規則為connect-eb-config-<任務名稱>

        • Status Topic:用于存儲Connectors以及Tasks狀態信息,命名規則為connect-eb-status-<任務名稱>

        • Kafka Connect Consumer Group:Kafka Connect Worker用于消費Internal Topics的消費組,命名規則為connect-eb-cluster-<任務名稱>

        • Kafka Source Connector Consumer Group:只針對Sink Connector有效,用于消費源Topic中的數據,命名規則為connector-eb-cluster-<任務名稱>-<connector名稱>

      4. 運行配置區域,將日志投遞方式設置為投遞至SLS或者投遞至Kafka,在角色授權卡片設置Connect依賴的角色配置,然后單擊保存

        重要

        建議配置的角色包含AliyunSAEFullAccess權限,否則可能會導致任務運行失敗。

    • 任務屬性

      設置此任務的重試策略及死信隊列。更多信息,請參見重試和死信

    等待任務狀態變為運行中,此時Connector已經在正常工作中。

步驟三:測試Connector

Source Connector

  1. 在DMS數據管理服務平臺,向步驟一:創建數據表中創建的數據表插入一條數據。例如,插入一條id為12,number為20的數據,命令如下。

    INSERT INTO sql_table(id, number) VALUES(12,20);
  2. 登錄云消息隊列 Kafka 版控制臺,在實例列表頁面,單擊目標實例。

  3. 在目標實例頁面,單擊目標Topic,然后單擊消息查詢,查看插入的消息數據,消息Value示例如下。

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":true,"field":"number"}],"optional":false,"name":"sql_table"},"payload":{"id":12,"number":20}}

Sink Connector

  1. 登錄云消息隊列 Kafka 版控制臺,在實例列表頁面,單擊目標實例。

  2. 在左側導航欄,單擊Topic管理,然后單擊目標Topic。

  3. 在Topic詳情頁面右上角,單擊體驗發送消息

  4. 快速體驗消息收發面板,設置消息內容。例如在目標表格中添加一條id為13,number為14的數據,消息內容如下。

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"int32","optional":true,"field":"number"}],"optional":false,"name":"sql_table"},"payload":{"id":13,"number":14}}
  5. 在DMS數據管理服務平臺,查看目標表格中是否有數據寫入。

    接收數據

常見報錯

場景一:所有Tasks運行失敗

錯誤信息:

All tasks under connector mongo-source failed, please check the error trace of the task.

解決方法:在消息流入任務詳情頁面,單擊基礎信息區域的診斷鏈接,即可跳轉到Connector監控頁面,可以看到Tasks運行失敗的詳細錯誤信息。

場景二:Kafka Connect退出

錯誤信息:

Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.

解決方法:由于狀態獲取可能會有延遲,建議您先嘗試刷新頁面。若刷新后仍然是失敗狀態,您可以按照以下步驟查看錯誤信息。

  1. 在消息流入任務詳情頁面的Worker信息區域,單擊SAE應用后的實例名稱,跳轉到SAE應用詳情頁面。

  2. 基本信息頁面,單擊實例部署信息頁簽。

  3. 在實例右側操作列,單擊Webshell登錄Kafka Connect運行環境。實例部署信息

    • 執行vi /home/admin/connector-bootstrap.log命令,查看Connector啟動日志,查找其中是否包含錯誤信息。

    • 執行vi /opt/kafka/logs/connect.log命令,查看Connector運行日志,在其中查找ERROR或者WARN字段來查看是否有錯誤信息。

基于錯誤信息提示進行修復操作后,可以重新啟動對應任務。

場景三:Connector參數校驗失敗

錯誤信息:

Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

解決方法:此時需要根據錯誤信息,找出具體哪個參數出錯,更新對應參數即可。若基于上述錯誤信息無法定位具體的出錯參數,可以參考上文場景二中的步驟登錄Kafka Connect運行環境,執行以下命令,查詢參數是否校驗通過。

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

該指令會返回Connector參數中每個參數是否校驗通過,若不通過,則errors屬性非空,如下所示。

"value":{
    "name":"snapshot.mode",
    "value":null,
    "recommended_values":[
        "never",
        "initial_only",
        "when_needed",
        "initial",
        "schema_only",
        "schema_only_recovery"
    ],
    "errors":[
        "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
    ],
    "visible":true
}