本教程介紹如何使用Kafka Connect的Source Connector將SQL Server的數(shù)據(jù)同步至云消息隊(duì)列 Kafka 版。
前提條件
在開(kāi)始本教程前,請(qǐng)確保您已完成以下操作:
- 已下載SQL Server Source Connector。具體信息,請(qǐng)參見(jiàn)SQL Server Source Connector。
- 已下載Kafka Connect。具體信息,請(qǐng)參見(jiàn)Kafka Connect。 說(shuō)明 SQL Server Source Connector目前只支持2.1.0及以上版本的Kafka Connect。
- 已下載Docker。具體信息,請(qǐng)參見(jiàn)Docker。
步驟一:配置Kafka Connect
- 將下載完成的SQL Server Connector解壓到指定目錄。
- 在Kafka Connect的配置文件connect-distributed.properties中配置插件安裝位置。
## 指定插件解壓后的路徑。 plugin.path=/kafka/connect/plugins
重要Kafka Connect的早期版本不支持配置plugin.path,您需要在CLASSPATH中指定插件位置。
export CLASSPATH=/kafka/connect/plugins/sqlserver-connector/*
步驟二:?jiǎn)?dòng)Kafka Connect
配置好connect-distributed.properties后,執(zhí)行以下命令啟動(dòng)Kafka Connect。
- 如果是公網(wǎng)接入,需先設(shè)置java.security.auth.login.config,如果是VPC接入,可以跳過(guò)這一步。
export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"
- 啟動(dòng)Kafka Connect。
bin/connect-distributed.sh config/connect-distributed.properties
步驟三:安裝SQL Server
重要 SQL Server 2016 SP1以上版本支持CDC,因此您的SQL Server版本必須高于該版本。
- 下載docker-compose-sqlserver.yaml。
- 執(zhí)行以下命令安裝SQL Server。
docker-compose -f docker-compose-sqlserver.yaml up
步驟四:配置SQL Server
- 下載inventory.sql。
- 執(zhí)行以下命令初始化SQL Server中的測(cè)試數(shù)據(jù)。
cat inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
- 可選:如果您需要監(jiān)聽(tīng)SQL Server中已有的數(shù)據(jù)表,請(qǐng)完成以下配置:
步驟五:?jiǎn)?dòng)SQL Server Connector
- 下載register-sqlserver.json。
- 編輯register-sqlserver.json。
- VPC接入
## 云消息隊(duì)列 Kafka 版實(shí)例的默認(rèn)接入點(diǎn),您可以在云消息隊(duì)列 Kafka 版控制臺(tái)獲取。 "database.history.kafka.bootstrap.servers" : "kafka:9092", ## 您需要提前在云消息隊(duì)列 Kafka 版控制臺(tái)創(chuàng)建同名Topic,在本例中創(chuàng)建topic:server1。 ## 所有table的變更數(shù)據(jù),會(huì)記錄在server1.$DATABASE.$TABLE的topic中,例如server1.testDB.products。 ## 因此您需要提前在云消息隊(duì)列 Kafka 版控制臺(tái)中創(chuàng)建所有相關(guān)Topic。 "database.server.name": "server1", ## 記錄schema變化信息將記錄在該Topic中。 ## 您需要提前在云消息隊(duì)列 Kafka 版控制臺(tái)創(chuàng)建該Topic。 "database.history.kafka.topic": "schema-changes-inventory"
- 公網(wǎng)接入
## 云消息隊(duì)列 Kafka 版實(shí)例的SSL接入點(diǎn),您可以在云消息隊(duì)列 Kafka 版控制臺(tái)獲取。 "database.history.kafka.bootstrap.servers" : "kafka:9092", ## 您需要提前在云消息隊(duì)列 Kafka 版控制臺(tái)創(chuàng)建同名Topic,在本例中創(chuàng)建topic:server1。 ## 所有table的變更數(shù)據(jù),會(huì)記錄在server1.$DATABASE.$TABLE的Topic中,例如server1.testDB.products。 ## 因此您需要提前在云消息隊(duì)列 Kafka 版控制臺(tái)中創(chuàng)建所有相關(guān)Topic。 "database.server.name": "server1", ## 記錄schema變化信息將記錄在該Topic中。 ## 您需要提前在云消息隊(duì)列 Kafka 版控制臺(tái)創(chuàng)建該Topic。 "database.history.kafka.topic": "schema-changes-inventory", ## 通過(guò)SSL接入點(diǎn)訪問(wèn),還需要修改以下配置。 "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.producer.ssl.truststore.password": "KafkaOnsClient", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks", "database.history.consumer.ssl.truststore.password": "KafkaOnsClient", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "PLAIN",
- VPC接入
- 完成register-sqlserver.json配置后,您需要根據(jù)配置在控制臺(tái)創(chuàng)建相應(yīng)的Topic,相關(guān)操作步驟請(qǐng)參見(jiàn)步驟一:創(chuàng)建Topic。按照本教程中的方式安裝的SQL Server,您可以看到SQL Server中已經(jīng)提前創(chuàng)建db name:testDB。其中有四張表:
- customers
- orders
- products
- products_on_hand
- server1
- server1.testDB.customers
- server1.testDB.orders
- server1.testDB.products
- server1.testDB.products_on_hand
在register-sqlserver.json中,配置了將schema變化信息記錄在schema-changes-testDB,因此您還需要使用OpenAPI創(chuàng)建Topic:schema-changes-inventory,相關(guān)操作請(qǐng)參見(jiàn)CreateTopic。
- 執(zhí)行以下命令啟動(dòng)SQL Server。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
結(jié)果驗(yàn)證
確認(rèn)云消息隊(duì)列 Kafka 版能否接收到SQL Server的變更數(shù)據(jù):
- 變更監(jiān)聽(tīng)SQL Server中的數(shù)據(jù)。
- 在控制臺(tái)的消息查詢頁(yè)面,查詢變更消息。具體操作步驟,請(qǐng)參見(jiàn)查詢消息。