使用Debezium將MySQL數(shù)據(jù)同步至云消息隊(duì)列 Kafka 版
本文介紹如何創(chuàng)建Debezium MySQL Source Connector,將MySQL的數(shù)據(jù)同步至云消息隊(duì)列 Kafka 版。
前提條件
已開通對(duì)象存儲(chǔ)OSS服務(wù)并創(chuàng)建存儲(chǔ)空間(Bucket)。更多信息,請(qǐng)參見控制臺(tái)創(chuàng)建存儲(chǔ)空間。
已開通Serverless應(yīng)用引擎服務(wù)。更多信息,請(qǐng)參見準(zhǔn)備工作。
已創(chuàng)建專有網(wǎng)絡(luò)及交換機(jī)。更多信息,請(qǐng)參見步驟一:創(chuàng)建專有網(wǎng)絡(luò)和交換機(jī)。
已購買并部署云消息隊(duì)列 Kafka 版實(shí)例。更多信息,請(qǐng)參見步驟二:購買和部署實(shí)例。
步驟一:創(chuàng)建數(shù)據(jù)表
登錄RDS管理控制臺(tái),創(chuàng)建RDS MySQL實(shí)例。更多信息,請(qǐng)參見快速創(chuàng)建RDS MySQL實(shí)例。
創(chuàng)建實(shí)例時(shí),請(qǐng)選擇與前提條件中已購買部署的Kafka實(shí)例相同的VPC,并將此VPC網(wǎng)段加入白名單。
實(shí)例創(chuàng)建完成后,在實(shí)例列表頁面單擊目標(biāo)實(shí)例,然后在實(shí)例詳情頁面的左側(cè)導(dǎo)航欄,完成以下操作。
單擊賬號(hào)管理,創(chuàng)建一個(gè)新賬號(hào),也可使用已有賬號(hào)。更多信息,請(qǐng)參見創(chuàng)建數(shù)據(jù)庫和賬號(hào)。
單擊數(shù)據(jù)庫管理,創(chuàng)建數(shù)據(jù)庫,也可使用已有數(shù)據(jù)庫。更多信息,請(qǐng)參見創(chuàng)建數(shù)據(jù)庫和賬號(hào)。
單擊數(shù)據(jù)庫連接,記錄內(nèi)網(wǎng)地址和端口號(hào)。
在基本信息頁面,單擊登錄數(shù)據(jù)庫進(jìn)入DMS數(shù)據(jù)管理服務(wù)平臺(tái),完成以下操作。
在左側(cè)雙擊數(shù)據(jù)庫名稱,切換到已創(chuàng)建的數(shù)據(jù)庫。
在SQL Console頁簽,使用SQL語句創(chuàng)建表格。例如,創(chuàng)建一個(gè)列參數(shù)分別為id和number的表格,命令如下。更多信息,請(qǐng)參見SQL Commands。
CREATE TABLE sql_table(id INT ,number INT);
步驟二:創(chuàng)建Connector任務(wù)
下載Debezium MySQL Source Connector文件,上傳至提前創(chuàng)建好的OSS bucket,更多信息,請(qǐng)參見控制臺(tái)上傳文件。
重要下載Debezium MySQL Connector文件時(shí)請(qǐng)選擇適配Java 8的版本。
登錄云消息隊(duì)列 Kafka 版控制臺(tái),在概覽頁面的資源分布區(qū)域,選擇地域。
在左側(cè)導(dǎo)航欄,選擇 。
在消息流入(Source)頁面,單擊創(chuàng)建任務(wù)。
在消息流入創(chuàng)建面板,設(shè)置任務(wù)名稱,選擇流入類型為Apache Kafka Connect。
在連接器配置區(qū)域,設(shè)置以下參數(shù)。
配置項(xiàng)
參數(shù)
說明
Kafka Connect插件
Bucket存儲(chǔ)桶
選擇OSS Bucket。
文件
選擇上傳的.ZIP文件。
Kafka資源信息
Kafka實(shí)例
選擇前提條件中創(chuàng)建的實(shí)例。
專有網(wǎng)絡(luò)VPC
默認(rèn)選擇部署Kafka實(shí)例時(shí)選擇的VPC ID且不可更改。
交換機(jī)
默認(rèn)選擇部署Kafka實(shí)例時(shí)選擇的vSwitch ID且不可更改。
安全組
選擇安全組。
Kafka Connect配置信息
解析當(dāng)前ZIP包下的properties文件
選擇.ZIP文件中包含的SourceConnector對(duì)應(yīng)的.properties文件。路徑為/etc/xxx.properties。在輸入框中更新下列字段的取值。
tasks.max:Task的最大數(shù)量。該場(chǎng)景下只支持設(shè)置為1。
database.hostname:填寫步驟一:創(chuàng)建數(shù)據(jù)表中獲取的源數(shù)據(jù)庫內(nèi)網(wǎng)地址。
database.port:填寫步驟一:創(chuàng)建數(shù)據(jù)表中獲取的端口號(hào)。
database.user:數(shù)據(jù)庫登錄賬號(hào)。
database.password:數(shù)據(jù)庫登錄密碼。
database.server.name:MySQL數(shù)據(jù)庫服務(wù)的邏輯名稱。
僅允許包含英文字母、數(shù)字以及下劃線(_)。
該字段用于組成數(shù)據(jù)庫及表格的目標(biāo)Topic的名稱,數(shù)據(jù)庫目標(biāo)Topic的命名規(guī)則為
{database.server.name}
,表格目標(biāo)Topic的命名規(guī)則為{database.server.name}.{databaseName}.{tableName}
。請(qǐng)?jiān)谶\(yùn)行Connector任務(wù)前按照命名規(guī)則提前創(chuàng)建對(duì)應(yīng)的目標(biāo)Topic。
database.include.list:源數(shù)據(jù)庫名稱。若有多個(gè)數(shù)據(jù)庫,用英文逗號(hào)(,)分隔。
table.include.list:源表格名稱,單個(gè)表格的格式為
{databaseName}.{tableName}
;若有多個(gè)表格,用英文逗號(hào)(,)分隔。database.history.kafka.bootstrap.servers:Kafka實(shí)例連接地址。
該實(shí)例用于記錄數(shù)據(jù)庫所有Schema變動(dòng)記錄。
可使用Connector任務(wù)中已配置的目標(biāo)Kafka實(shí)例,也可使用新實(shí)例。
實(shí)例需要與數(shù)據(jù)庫實(shí)例處于同一個(gè)VPC中。
該實(shí)例需要開啟自由使用Group能力。更多信息,請(qǐng)參見自由使用Group。
database.history.kafka.topic:此Topic用于記錄數(shù)據(jù)庫所有Schema的變動(dòng)記錄,請(qǐng)?jiān)谶\(yùn)行Connector任務(wù)前提前創(chuàng)建。
include.schema.changes:是否監(jiān)控Schema變動(dòng)記錄,若取值為true,則會(huì)將這些變動(dòng)記錄寫入名為
{database.server.name}
的Topic中。
Connector全量參數(shù),請(qǐng)參見Debezium Connector Properties。
在實(shí)例配置區(qū)域,設(shè)置以下參數(shù)。
配置項(xiàng)
參數(shù)
說明
Worker規(guī)格
Worker規(guī)格
選擇合適的Worker規(guī)格。
最小Worker數(shù)
設(shè)置為1。
最大Worker數(shù)
設(shè)置為1。此數(shù)值不得超過Task的最大數(shù)量。
Kafka Connect Worker 配置
自動(dòng)創(chuàng)建Kafka Connect Worker依賴資源
建議勾選此項(xiàng),此時(shí)會(huì)在選擇的Kafka實(shí)例中自動(dòng)創(chuàng)建Kafka Connect運(yùn)行所需的一些Internal Topic以及ConsumerGroup,并將這些必填配置自動(dòng)填入配置框中,包括以下配置項(xiàng):
Offset Topic:用于存儲(chǔ)源數(shù)據(jù)偏移量,命名規(guī)則為
connect-eb-offset-<任務(wù)名稱>
。Config Topic:用于存儲(chǔ)Connectors以及Tasks的配置信息,命名規(guī)則為
connect-eb-config-<任務(wù)名稱>
。Status Topic:用于存儲(chǔ)Connectors以及Tasks狀態(tài)信息,命名規(guī)則為
connect-eb-status-<任務(wù)名稱>
。Kafka Connect Consumer Group:Kafka Connect Worker用于消費(fèi)Internal Topics的消費(fèi)組,命名規(guī)則為
connect-eb-cluster-<任務(wù)名稱>
。Kafka Source Connector Consumer Group:只針對(duì)Sink Connector有效,用于消費(fèi)源Topic中的數(shù)據(jù),命名規(guī)則為
connector-eb-cluster-<任務(wù)名稱>-<connector名稱>
。
在運(yùn)行配置區(qū)域,將日志投遞方式設(shè)置為投遞至SLS或者投遞至Kafka,在角色授權(quán)卡片設(shè)置Connect依賴的角色配置。
重要建議配置的角色包含AliyunSAEFullAccess權(quán)限,否則可能會(huì)導(dǎo)致任務(wù)運(yùn)行失敗。
單擊確定。
等待任務(wù)狀態(tài)變?yōu)?b data-tag="uicontrol" id="88548e1005asr" class="uicontrol">運(yùn)行中,此時(shí)Connector已經(jīng)在正常工作中。
步驟三:測(cè)試Connector任務(wù)
在DMS數(shù)據(jù)管理服務(wù)平臺(tái),向步驟一:創(chuàng)建數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)表插入一條數(shù)據(jù)。例如,插入一條id為123,number為20000的數(shù)據(jù),命令如下。
INSERT INTO sql_table(id, number) VALUES(123,20000);
登錄云消息隊(duì)列 Kafka 版控制臺(tái),在實(shí)例列表頁面,單擊目標(biāo)實(shí)例。
在目標(biāo)實(shí)例頁面,單擊目標(biāo)Topic,然后單擊消息查詢,查看插入的消息數(shù)據(jù),消息Value示例如下。
數(shù)據(jù)庫目標(biāo)Topic(命名規(guī)則為
{database.server.name}
)示例:{ "source":{ "version":"1.5.0.Final", "connector":"mysql", "name":"fulfillment", "ts_ms":1686283675404, "snapshot":"true", "db":"wbdb", "sequence":null, "table":"sql_table", "server_id":0, "gtid":null, "file":"mysql-bin.000006", "pos":188032, "row":0, "thread":null, "query":null }, "databaseName":"wbdb", "schemaName":null, "ddl":"DROP TABLE IF EXISTS sql_table", "tableChanges":[ ] }
數(shù)據(jù)表格目標(biāo)Topic(命名規(guī)則為
{database.server.name}.{databaseName}.{tableName}
)示例:{ "before":null, "after":{ "id":123, "number":20000 }, "source":{ "version":"1.5.0.Final", "connector":"mysql", "name":"fulfillment", "ts_ms":1686283675675, "snapshot":"last", "db":"wbdb", "sequence":null, "table":"sql_table", "server_id":0, "gtid":null, "file":"mysql-bin.000006", "pos":188032, "row":0, "thread":null, "query":null }, "op":"r", "ts_ms":1686283675675, "transaction":null }
database.history.kafka.topic字段指定的Topic示例:
{ "source":{ "server":"fulfillment" }, "position":{ "ts_sec":1686283675, "file":"mysql-bin.000006", "pos":188032, "gtids":"be4286e6-05ce-11ee-b8c2-00163e20****:1-****", "snapshot":true }, "databaseName":"wbdb", "ddl":"CREATE DATABASE `wbdb` CHARSET utf8 COLLATE utf8_general_ci", "tableChanges":[ ] }
常見報(bào)錯(cuò)
場(chǎng)景一:所有Tasks運(yùn)行失敗
錯(cuò)誤信息:
All tasks under connector mongo-source failed, please check the error trace of the task.
解決方法:在消息流入任務(wù)詳情頁面,單擊基礎(chǔ)信息區(qū)域的診斷鏈接,即可跳轉(zhuǎn)到Connector監(jiān)控頁面,可以看到Tasks運(yùn)行失敗的詳細(xì)錯(cuò)誤信息。
場(chǎng)景二:Kafka Connect退出
錯(cuò)誤信息:
Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.
解決方法:由于狀態(tài)獲取可能會(huì)有延遲,建議您先嘗試刷新頁面。若刷新后仍然是失敗狀態(tài),您可以按照以下步驟查看錯(cuò)誤信息。
在消息流入任務(wù)詳情頁面的Worker信息區(qū)域,單擊SAE應(yīng)用后的實(shí)例名稱,跳轉(zhuǎn)到SAE應(yīng)用詳情頁面。
在基本信息頁面,單擊實(shí)例部署信息頁簽。
在實(shí)例右側(cè)操作列,單擊Webshell登錄Kafka Connect運(yùn)行環(huán)境。
執(zhí)行
vi /home/admin/connector-bootstrap.log
命令,查看Connector啟動(dòng)日志,查找其中是否包含錯(cuò)誤信息。執(zhí)行
vi /opt/kafka/logs/connect.log
命令,查看Connector運(yùn)行日志,在其中查找ERROR或者WARN字段來查看是否有錯(cuò)誤信息。
基于錯(cuò)誤信息提示進(jìn)行修復(fù)操作后,可以重新啟動(dòng)對(duì)應(yīng)任務(wù)。
場(chǎng)景三:Connector參數(shù)校驗(yàn)失敗
錯(cuò)誤信息:
Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s): Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`
解決方法:此時(shí)需要根據(jù)錯(cuò)誤信息,找出具體哪個(gè)參數(shù)出錯(cuò),更新對(duì)應(yīng)參數(shù)即可。若基于上述錯(cuò)誤信息無法定位具體的出錯(cuò)參數(shù),可以參考上文場(chǎng)景二中的步驟登錄Kafka Connect運(yùn)行環(huán)境,執(zhí)行以下命令,查詢參數(shù)是否校驗(yàn)通過。
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate
該指令會(huì)返回Connector參數(shù)中每個(gè)參數(shù)是否校驗(yàn)通過,若不通過,則errors屬性非空,如下所示。
"value":{ "name":"snapshot.mode", "value":null, "recommended_values":[ "never", "initial_only", "when_needed", "initial", "schema_only", "schema_only_recovery" ], "errors":[ "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery" ], "visible":true }
場(chǎng)景四:獲取Topic元數(shù)據(jù)超時(shí)
錯(cuò)誤信息:
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
解決方法:出現(xiàn)此報(bào)錯(cuò)的原因是您在啟動(dòng)Connector任務(wù)前未按照命名規(guī)則提前創(chuàng)建數(shù)據(jù)庫目標(biāo)Topic
{database.server.name}
和表格目標(biāo)Topic{database.server.name}.{databaseName}.{tableName}
,建議您停用任務(wù)后創(chuàng)建目標(biāo)Topic再啟用任務(wù)。