本教程介紹如何使用Canal將MySQL的數據同步至云消息隊列 Kafka 版。
背景信息
Canal的主要用途是基于MySQL數據庫增量日志解析,提供增量數據訂閱和消費。Canal偽裝自己為MySQL Slave,向MySQL Master發送dump請求。MySQL Master收到dump請求,開始推送Binary log給Canal,Canal解析Binary log來同步數據。Canal與云消息隊列 Kafka 版建立對接,您可以把MySQL更新的數據寫入到云消息隊列 Kafka 版中來分析。其詳細的工作原理,請參見Canal官網。
前提條件
在開始本教程前,請確保您已完成以下操作:
安裝MySQL,并進行相關初始化與設置。具體操作,請參見 Canal QuickStart。
在云消息隊列 Kafka 版控制臺創建實例以及Topic資源。具體操作,請參見步驟三:創建資源。
操作步驟
下載Canal壓縮包,本教程以1.1.5版本為例。
執行以下命令,創建目錄文件夾。本教程以/home/doc/tools/canal.deployer-1.1.5路徑為例。
mkdir -p /home/doc/tools/canal.deployer-1.1.5
將Canal壓縮包復制到/home/doc/tools/canal.deployer-1.1.5路徑并解壓。
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /home/doc/tools/canal.deployer-1.1.5
在/home/doc/tools/canal.deployer-1.1.5路徑,執行以下命令,編輯instance.properties文件。
vi conf/example/instance.properties
根據instance.properties參數列表配置參數。
# 根據實際情況修改為您的數據庫信息。 ################################################# ... # 數據庫地址。 canal.instance.master.address=192.168.XX.XX:3306 # username/password為數據庫的用戶名和密碼。 ... canal.instance.dbUsername=**** canal.instance.dbPassword=**** ... # mq config # 您在云消息隊列 Kafka 版控制臺創建的Topic。 canal.mq.topic=mysql_test # 針對數據庫名或者表名發送動態Topic。 #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* # 數據同步到云消息隊列 Kafka 版Topic的指定分區。 canal.mq.partition=0 # 以下兩個參數配置與canal.mq.partition互斥。配置以下兩個參數可以使數據發送至云消息隊列 Kafka 版Topic的不同分區。 #canal.mq.partitionsNum=3 #庫名.表名: 唯一主鍵,多個表之間用逗號分隔。 #canal.mq.partitionHash=mytest.person:id,mytest.role:id #################################################
表 1. instance.properties參數列表 參數
是否必選
描述
canal.instance.master.address
是
MySQL數據庫的連接地址。
canal.instance.dbUsername
是
MySQL數據庫的用戶名。
canal.instance.dbPassword
是
MySQL數據庫的密碼。
canal.mq.topic
是
云消息隊列 Kafka 版實例的Topic。您可以在云消息隊列 Kafka 版控制臺的Topic 管理頁面創建。具體操作,請參見步驟三:創建資源。
canal.mq.dynamicTopic
否
動態Topic規則表達式。設置Topic匹配規則表達式,可以將不同的數據表數據同步至不同的Topic。具體設置方法,請參見參數說明。
canal.mq.partition
否
數據庫數據同步到云消息隊列 Kafka 版Topic的指定分區。
canal.mq.partitionsNum
否
Topic的分區數量。該參數與canal.mq.partitionHash一起使用,可以將數據同步至云消息隊列 Kafka 版Topic不同的分區。
canal.mq.partitionHash
否
分區的規則表達式。具體設置方法,請參見參數說明。
執行以下命令,編輯canal.properties文件。
vi conf/canal.properties
根據canal.properties參數列表說明配置參數。
公網環境,消息采用SASL_SSL協議進行鑒權并加密,通過SSL接入點訪問云消息隊列 Kafka 版。接入點的詳細信息,請參見接入點對比。
# ... # 您需設置為kafka。 canal.serverMode = kafka # ... # kafka配置。 #在云消息隊列 Kafka 版實例詳情頁面獲取的SSL接入點。 kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093 # 參數的默認設置如下所示,您可以根據實際情況調整。 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0 # 公網環境,通過SASL_SSL鑒權并加密,您需配置網絡協議與身份校驗機制。 kafka.ssl.truststore.location= ../conf/kafka_client_truststore_jks kafka.ssl.truststore.password= KafkaOnsClient kafka.security.protocol= SASL_SSL kafka.sasl.mechanism = PLAIN kafka.ssl.endpoint.identification.algorithm =
表 2. canal.properties參數列表 參數
是否必選
描述
canal.serverMode
是
您需設置為kafka。
kafka.bootstrap.servers
是
云消息隊列 Kafka 版實例接入點。您可在云消息隊列 Kafka 版控制臺的實例詳情頁面的接入點信息區域獲取。
kafka.ssl.truststore.location
是
SSL根證書kafka.client.truststore.jks的存放路徑。
說明公網環境下,消息必須進行鑒權與加密,才能確保傳輸的安全。即需通過SSL接入點采用SASL_SSL協議進行傳輸。具體信息,請參見接入點對比。
kafka.acks
是
云消息隊列 Kafka 版接收到數據之后給客戶端發出的確認信號。取值說明如下:
0:表示客戶端不需要等待任何確認收到的信息。
1:表示等待Leader成功寫入而不等待所有備份是否成功寫入。
all:表示等待Leader成功寫入并且所有備份都成功寫入。
kafka.compression.type
是
壓縮數據的壓縮算法,默認是無壓縮。取值如下:
none。
gzip。
snappy。
kafka.batch.size
是
客戶端數據塊攢批的大小。單位:Byte。
該參數控制批量處理消息的字節數。客戶端發送到Brokers的請求將包含多個批量處理,以減少請求次數。較小的批量處理數值可能降低吞吐量,而較大的批量處理數值將會浪費更多內存空間,分配一個指定的批量處理消息緩沖區有助于提高客戶端和服務端的性能。
說明kafka.batch.size與kafka.linger.ms參數都是控制批量處理消息的條件,滿足其中一個參數的條件,客戶端就攢批完成,消息進入待發送狀態。
kafka.linger.ms
是
客戶端數據塊攢批的最大時長。單位:ms。
客戶端設定攢批消息的延遲時間,以批量處理消息,減少請求次數。
kafka.max.request.size
是
客戶端每次請求的最大字節數。
kafka.buffer.memory
是
緩存數據的內存大小。
kafka.max.in.flight.requests.per.connection
是
限制客戶端在單個連接上能夠發送的未響應請求的個數。設置此值是1表示Broker在響應請求之前客戶端不能再向同一個Broker發送請求。
kafka.retries
是
消息發送失敗時,是否重復發送。設置為0,表示不會重復發送;設置大于0的值,客戶端重新發送數據。
kafka.ssl.truststore.password
是
SSL根證書的密碼,設置為KafkaOnsClient。
kafka.security.protocol
是
采用SASL_SSL協議進行鑒權并加密,即設置為SASL_SSL。
kafka.sasl.mechanism
是
SASL身份認證的機制。SSL接入點采用PLAIN機制驗證身份。
公網環境,需通過SASL進行身份校驗,需要在bin/startup.sh配置環境變量,并編輯kafka_client_producer_jaas.conf文件,配置云消息隊列 Kafka 版實例的用戶名與密碼。
執行
vi bin/startup.sh
命令,編輯startup.sh文件,配置環境變量。JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8 -Djava.security.auth.login.config=/home/doc/tools/canal.deployer-1.1.5/conf/kafka_client_jaas.conf"
執行
vi conf/kafka_client_producer_jaas.conf
命令,編輯kafka_client_producer_jaas.conf文件,配置實例用戶名與密碼信息。說明如果實例未開啟ACL,您可以在云消息隊列 Kafka 版控制臺的實例詳情頁面獲取默認用戶的用戶名和密碼。
如果實例已開啟ACL,請確保要使用的SASL用戶為PLAIN類型且已授權收發消息的權限。具體信息,請參見SASL用戶授權。
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="實例的用戶名" password="實例的用戶名密碼"; };
VPC環境,消息采用PLAINTEXT協議不鑒權不加密傳輸,通過默認接入點訪問云消息隊列 Kafka 版,僅需配置canal.serverMode與kafka.bootstrap.servers參數。接入點的詳細信息,請參見接入點對比。
# ... # 您需設置為kafka。 canal.serverMode = kafka # ... # kafka配置。 # 在云消息隊列 Kafka 版實例詳情頁面獲取的默認接入點。 kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092 # 以下參數請您可以按照實際情況調整,也可以保持默認設置。 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0
在/home/doc/tools/canal.deployer-1.1.5路徑,執行以下命令,啟動Canal。
sh bin/startup.sh
查看/home/doc/tools/canal.deployer-1.1.5/logs/canal/canal.log日志文件,確認Canal與云消息隊列 Kafka 版連接成功,Canal正在運行。
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server. 2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.XX.XX:11111] 2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
查看/home/doc/tools/canal.deployer-1.1.5/logs/example/example.log日志文件,確認Canal Instance已啟動。
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties] 2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties] 2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
測試驗證
啟動Canal之后,進行數據同步驗證。
在MySQL數據庫,新建數據表T_Student。數據表數據示例如下:
mysql> select * from T_Student; +--------+---------+------+------+ | stuNum | stuName | age | sex | +--------+---------+------+------+ | 1 | 小王 | 18 | girl | | 2 | 小張 | 17 | boy | +--------+---------+------+------+ 2 rows in set (0.00 sec)
查看/home/doc/tools/canal.deployer-1.1.5/logs/example/meta.log日志文件,數據庫的每次增刪改操作,都會在meta.log中生成一條記錄,查看該日志可以確認Canal是否有采集到數據。
tail -f example/meta.log 2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/192.168.XX.XX:3306] 2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/192.168.XX.XX:3306] 2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/192.168.XX.XX:3306] 2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/192.168.XX.XX:3306]
登錄云消息隊列 Kafka 版控制臺,查詢消息,確認MySQL的數據被同步在云消息隊列 Kafka 版。控制臺查詢消息的具體操作,請參見查詢消息。
數據同步完畢,執行以下命令,關閉Canal。
sh bin/stop.sh