本教程介紹如何使用Kafka Connect的Source Connector將MySQL的數據同步至云消息隊列 Kafka 版。
背景信息
Kafka Connect主要用于將數據流輸入和輸出云消息隊列 Kafka 版。Kafka Connect主要通過各種Source Connector的實現,將數據從第三方系統輸入到Kafka Broker,通過各種Sink Connector實現,將數據從Kafka Broker中導入到第三方系統。
前提條件
在開始本教程前,請確保您已完成以下操作:
步驟一:配置Kafka Connect
將下載完成的MySQL Connector解壓到指定目錄。
在Kafka Connect的配置文件connect-distributed.properties中配置插件安裝位置。
plugin.path=/kafka/connect/plugins
重要Kafka Connect的早期版本不支持配置plugin.path,您需要在CLASSPATH中指定插件位置。
export CLASSPATH=/kafka/connect/plugins/mysql-connector/*
步驟二:啟動Kafka Connect
在配置好connect-distributed.properties后,執行以下命令啟動Kafka Connect。
公網接入
執行命令
export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"
設置java.security.auth.login.config。執行命令
bin/connect-distributed.sh config/connect-distributed.properties
啟動Kafka Connect。
VPC接入
執行命令
bin/connect-distributed.sh config/connect-distributed.properties
啟動Kafka Connect。
步驟三:安裝MySQL
執行以下命令安裝MySQL。
export DEBEZIUM_VERSION=0.5 docker-compose -f docker-compose-mysql.yaml up
步驟四:配置MySQL
在配置文件中配置以下內容,開啟MySQL的binlog寫入功能,并配置binlog模式為row。
[mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1
執行以下命令設置MySQL的User權限。
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'dbz';
說明示例中MySQL的User為debezium,密碼為dbz。
步驟五:啟動MySQL Connector
編輯register-mysql.json。
VPC接入
## 云消息隊列 Kafka 版接入點,通過控制臺獲取。 ## 您在控制臺獲取的默認接入點。 "database.history.kafka.bootstrap.servers" : "kafka:9092", ## 需要提前在控制臺創建同名Topic,在本例中創建Topic:server1。 ## 所有Table的變更數據,會記錄在server1.$DATABASE.$TABLE的Topic中,如 server1.inventory.products。 ## 因此用戶需要提前在控制臺中創建所有相關Topic。 "database.server.name": "server1", ## 記錄schema變化信息將記錄在這個Topic中。 ## 需要提前在控制臺創建。 "database.history.kafka.topic": "schema-changes-inventory"
公網接入
## 云消息隊列 Kafka 版接入點,通過控制臺獲取。存儲db中schema變化信息。 ## 您在控制臺獲取的SSL接入點。 "database.history.kafka.bootstrap.servers" : "kafka:9092", ## 需要提前在控制臺創建同名Topic,在本例中創建Topic:server1。 ## 所有Table的變更數據,會記錄在server1.$DATABASE.$TABLE的Topic中,如 server1.testDB.products。 ## 因此用戶需要提前在控制臺中創建所有相關Topic。 "database.server.name": "server1", ## schema變化信息將記錄在這個Topic中。 ## 需要提前在控制臺創建。 "database.history.kafka.topic": "schema-changes-inventory", ## SSL公網方式訪問配置。 "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.producer.ssl.truststore.password": "KafkaOnsClient", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.consumer.ssl.truststore.password": "KafkaOnsClient", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "PLAIN",
配置好register-mysql.json后,您需要根據配置在控制臺創建相應的Topic,相關操作步驟,請參見步驟一:創建Topic。
按照本教程中的方式安裝的MySQL,您可以看到MySQL中已經提前創建好了database:inventory。其中有四張表:
customers
orders
products
products_on_hand
根據以上配置,您需要使用OpenAPI創建Topic:
server1
server1.inventory.customers
server1.inventory.orders
server1.inventory.products
server1.inventory.products_on_hand
在register-mysql.json中,配置了將schema變化信息記錄在schema-changes-testDB,因此您還需要使用OpenAPI創建Topic:schema-changes-inventory。 使用OpenAPI創建Topic,請參見CreateTopic。
執行以下命令啟動MySQL Connector。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json
結果驗證
按照以下步驟操作確認云消息隊列 Kafka 版能否接收到MySQL的變更數據。
變更MySQL Table中的數據。
在控制臺的消息查詢頁面,查詢變更數據。