本文介紹如何創建Debezium MySQL Source Connector,將MySQL的數據同步至云消息隊列 Kafka 版。
前提條件
已開通對象存儲OSS服務并創建存儲空間(Bucket)。更多信息,請參見控制臺創建存儲空間。
已開通Serverless應用引擎服務。更多信息,請參見準備工作。
已創建專有網絡及交換機。更多信息,請參見步驟一:創建專有網絡和交換機。
已購買并部署云消息隊列 Kafka 版實例。更多信息,請參見購買和部署實例。
步驟一:創建數據表
登錄RDS管理控制臺,創建RDS MySQL實例。更多信息,請參見快速創建RDS MySQL實例。
創建實例時,請選擇與前提條件中已購買部署的Kafka實例相同的VPC,并將此VPC網段加入白名單。
實例創建完成后,在實例列表頁面單擊目標實例,然后在實例詳情頁面的左側導航欄,完成以下操作。
在基本信息頁面,單擊登錄數據庫進入DMS數據管理服務平臺,完成以下操作。
在左側雙擊數據庫名稱,切換到已創建的數據庫。
在SQL Console頁簽,使用SQL語句創建表格。例如,創建一個列參數分別為id和number的表格,命令如下。更多信息,請參見SQL Commands。
CREATE TABLE sql_table(id INT ,number INT);
步驟二:創建Connector任務
下載Debezium MySQL Source Connector文件,上傳至提前創建好的OSS bucket,更多信息,請參見控制臺上傳文件。
重要下載Debezium MySQL Connector文件時請選擇適配Java 8的版本。
登錄云消息隊列 Kafka 版控制臺,在概覽頁面的資源分布區域,選擇地域。
在左側導航欄,選擇 。
在任務列表頁面,單擊創建任務。
在創建任務面板,設置任務名稱,選擇流入類型為Apache Kafka Connect。
在連接器配置區域,設置以下參數。
配置項
參數
說明
Kafka Connect插件
Bucket存儲桶
選擇OSS Bucket。
文件
選擇上傳的.ZIP文件。
Kafka資源信息
Kafka實例
選擇前提條件中創建的實例。
專有網絡VPC
默認選擇部署Kafka實例時選擇的VPC ID且不可更改。
交換機
默認選擇部署Kafka實例時選擇的vSwitch ID且不可更改。
安全組
選擇安全組。
Kafka Connect配置信息
解析當前ZIP包下的properties文件
選擇.ZIP文件中包含的SourceConnector對應的.properties文件。路徑為/etc/xxx.properties。在輸入框中更新下列字段的取值。
tasks.max:Task的最大數量。該場景下只支持設置為1。
database.hostname:填寫步驟一:創建數據表中獲取的源數據庫內網地址。
database.port:填寫步驟一:創建數據表中獲取的端口號。
database.user:數據庫登錄賬號。
database.password:數據庫登錄密碼。
database.server.name:MySQL數據庫服務的邏輯名稱。
僅允許包含英文字母、數字以及下劃線(_)。
該字段用于組成數據庫及表格的目標Topic的名稱,數據庫目標Topic的命名規則為
{database.server.name}
,表格目標Topic的命名規則為{database.server.name}.{databaseName}.{tableName}
。請在運行Connector任務前按照命名規則提前創建對應的目標Topic。
database.include.list:源數據庫名稱。若有多個數據庫,用英文逗號(,)分隔。
table.include.list:源表格名稱,單個表格的格式為
{databaseName}.{tableName}
;若有多個表格,用英文逗號(,)分隔。database.history.kafka.bootstrap.servers:Kafka實例連接地址。
該實例用于記錄數據庫所有Schema變動記錄。
可使用Connector任務中已配置的目標Kafka實例,也可使用新實例。
實例需要與數據庫實例處于同一個VPC中。
該實例需要開啟自由使用Group能力。更多信息,請參見自由使用Group。
database.history.kafka.topic:此Topic用于記錄數據庫所有Schema的變動記錄,請在運行Connector任務前提前創建。
include.schema.changes:是否監控Schema變動記錄,若取值為true,則會將這些變動記錄寫入名為
{database.server.name}
的Topic中。
Connector全量參數,請參見Debezium Connector Properties。
在實例配置區域,設置以下參數。
配置項
參數
說明
Worker規格
Worker規格
選擇合適的Worker規格。
最小Worker數
設置為1。
最大Worker數
設置為1。此數值不得超過Task的最大數量。
Kafka Connect Worker 配置
自動創建Kafka Connect Worker依賴資源
建議勾選此項,此時會在選擇的Kafka實例中自動創建Kafka Connect運行所需的一些Internal Topic以及ConsumerGroup,并將這些必填配置自動填入配置框中,包括以下配置項:
Offset Topic:用于存儲源數據偏移量,命名規則為
connect-eb-offset-<任務名稱>
。Config Topic:用于存儲Connectors以及Tasks的配置信息,命名規則為
connect-eb-config-<任務名稱>
。Status Topic:用于存儲Connectors以及Tasks狀態信息,命名規則為
connect-eb-status-<任務名稱>
。Kafka Connect Consumer Group:Kafka Connect Worker用于消費Internal Topics的消費組,命名規則為
connect-eb-cluster-<任務名稱>
。Kafka Source Connector Consumer Group:只針對Sink Connector有效,用于消費源Topic中的數據,命名規則為
connector-eb-cluster-<任務名稱>-<connector名稱>
。
在運行配置區域,將日志投遞方式設置為投遞至SLS或者投遞至Kafka,在角色授權卡片設置Connect依賴的角色配置。
重要建議配置的角色包含AliyunSAEFullAccess權限,否則可能會導致任務運行失敗。
單擊確定。
等待任務狀態變為運行中,此時Connector已經在正常工作中。
步驟三:測試Connector任務
在DMS數據管理服務平臺,向步驟一:創建數據表中創建的數據表插入一條數據。例如,插入一條id為123,number為20000的數據,命令如下。
INSERT INTO sql_table(id, number) VALUES(123,20000);
登錄云消息隊列 Kafka 版控制臺,在實例列表頁面,單擊目標實例。
在目標實例頁面,單擊目標Topic,然后單擊消息查詢,查看插入的消息數據,消息Value示例如下。
數據庫目標Topic(命名規則為
{database.server.name}
)示例:{ "source":{ "version":"1.5.0.Final", "connector":"mysql", "name":"fulfillment", "ts_ms":1686283675404, "snapshot":"true", "db":"wbdb", "sequence":null, "table":"sql_table", "server_id":0, "gtid":null, "file":"mysql-bin.000006", "pos":188032, "row":0, "thread":null, "query":null }, "databaseName":"wbdb", "schemaName":null, "ddl":"DROP TABLE IF EXISTS sql_table", "tableChanges":[ ] }
數據表格目標Topic(命名規則為
{database.server.name}.{databaseName}.{tableName}
)示例:{ "before":null, "after":{ "id":123, "number":20000 }, "source":{ "version":"1.5.0.Final", "connector":"mysql", "name":"fulfillment", "ts_ms":1686283675675, "snapshot":"last", "db":"wbdb", "sequence":null, "table":"sql_table", "server_id":0, "gtid":null, "file":"mysql-bin.000006", "pos":188032, "row":0, "thread":null, "query":null }, "op":"r", "ts_ms":1686283675675, "transaction":null }
database.history.kafka.topic字段指定的Topic示例:
{ "source":{ "server":"fulfillment" }, "position":{ "ts_sec":1686283675, "file":"mysql-bin.000006", "pos":188032, "gtids":"be4286e6-05ce-11ee-b8c2-00163e20****:1-****", "snapshot":true }, "databaseName":"wbdb", "ddl":"CREATE DATABASE `wbdb` CHARSET utf8 COLLATE utf8_general_ci", "tableChanges":[ ] }
常見報錯
場景一:所有Tasks運行失敗
錯誤信息:
All tasks under connector mongo-source failed, please check the error trace of the task.
解決方法:在消息流入任務詳情頁面,單擊基礎信息區域的診斷鏈接,即可跳轉到Connector監控頁面,可以看到Tasks運行失敗的詳細錯誤信息。
場景二:Kafka Connect退出
錯誤信息:
Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.
解決方法:由于狀態獲取可能會有延遲,建議您先嘗試刷新頁面。若刷新后仍然是失敗狀態,您可以按照以下步驟查看錯誤信息。
在消息流入任務詳情頁面的Worker信息區域,單擊SAE應用后的實例名稱,跳轉到SAE應用詳情頁面。
在基本信息頁面,單擊實例部署信息頁簽。
在實例右側操作列,單擊Webshell登錄Kafka Connect運行環境。
執行
vi /home/admin/connector-bootstrap.log
命令,查看Connector啟動日志,查找其中是否包含錯誤信息。執行
vi /opt/kafka/logs/connect.log
命令,查看Connector運行日志,在其中查找ERROR或者WARN字段來查看是否有錯誤信息。
基于錯誤信息提示進行修復操作后,可以重新啟動對應任務。
場景三:Connector參數校驗失敗
錯誤信息:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s): Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
解決方法:此時需要根據錯誤信息,找出具體哪個參數出錯,更新對應參數即可。若基于上述錯誤信息無法定位具體的出錯參數,可以參考上文場景二中的步驟登錄Kafka Connect運行環境,執行以下命令,查詢參數是否校驗通過。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate
該指令會返回Connector參數中每個參數是否校驗通過,若不通過,則errors屬性非空,如下所示。
"value":{ "name":"snapshot.mode", "value":null, "recommended_values":[ "never", "initial_only", "when_needed", "initial", "schema_only", "schema_only_recovery" ], "errors":[ "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery" ], "visible":true }
場景四:獲取Topic元數據超時
錯誤信息:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
解決方法:出現此報錯的原因是您在啟動Connector任務前未按照命名規則提前創建數據庫目標Topic
{database.server.name}
和表格目標Topic{database.server.name}.{databaseName}.{tableName}
,建議您停用任務后創建目標Topic再啟用任務。