canal [k?’n?l],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費。 早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基于業務 trigger 獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。 當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
一、背景介紹
canal [k?’n?l],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費。 早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基于業務 trigger 獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。 當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
目前canal支持寫入kafka,并且DataHub兼容kafka協議,所以您可以使用canal將MySql的增量數據寫入DataHub。同時為了保證Canal能夠像寫Kafka一樣寫入DataHub,對開源Canal做了一些必要的改動,主要改動如下:
kafka的TopicName為DataHub的ProjectName.TopicName的組合,因此去掉canal將kafka TopicName中的”.”替換為”_”的行為,保證kafka TopicName能夠正確映射到DataHub的Topic
DataHub使用SASL PLAIN進行認證,所以修改了啟動腳本,添加了環境變量
-Djava.security.auth.login.config=$kafka_jaas_conf
二、使用說明
本文只是給出了一個canal寫入DataHub(kafka)的基礎示例,更多參數配置和參數含義請參考canal官網。
1. 下載canal deployer壓縮包
首先下載canal壓縮包canal.deployer-1.1.5-SNAPSHOT.tar.gz。如果使用未經DataHub改動的canal,可能會無法寫入DataHub。
2. 將canal.deployer 復制到固定目錄并解壓
mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal
3.修改配置參數
3.1 修改instance配置 conf/example/instance.properties
# 按需修改成自己的數據庫信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,數據庫的用戶名和密碼
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=test_project.test_topic
# 針對庫名或者表名發送動態topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#庫名.表名: 唯一主鍵,多個表之間用逗號分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################
對應IP地址的MySQL 數據庫需進行相關初始化與設置, 可參考 Canal QuickStart。針對庫名的動態TopicName和根據主鍵哈希的設置,可參考mq參數說明。
3.2 修改canal配置文件 conf/canal.properties
# ...
canal.serverMode = kafka
# ...
kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN
其中必須配置參數為canal.serverMode、kafka.bootstrap.servers、kafka.security.protocol、kafka.sasl.mechanism
,其他參數用戶可根據實際情況自主進行調優,kafka.bootstrap.servers
需要選擇topic所在region的endpoint,endpoint可前往DataHub兼容kafka協議查看。
3.3 修改jass配置文件 conf/kafka_client_producer_jaas.conf
kafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="accessId"
password="accessKey";
};
4. 啟動和關閉
啟動之前需要保證DataHub有相應的Topic,創建的Topic要求可以參考DataHub兼容kafka協議。
4.1 啟動
cd /usr/local/canal/
sh bin/startup.sh
4.2 查看日志
查看 logs/canal/canal.logvi logs/canal/canal.log
2013-02-05 22:45:27.967 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
查看instance的日志:vi logs/example/example.log
2013-02-05 22:50:45.636 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
查看meta的日志,vi logs/example/meta.log
數據庫的每次增刪改操作,都會在meta.log中生成一條記錄,查看該日志可以確認canal是否有采集到數據。
tail -f example/meta.log
2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/127.0.0.1:3306]
2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/127.0.0.1:3306]
2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/127.0.0.1:3306]
4.3 關閉
cd /usr/local/canal/
sh bin/stop.sh
三、 數據示例
DataHub Topic
DataHub Topic為tuple topic,Schema為
+-------+------+----------+-------------+
| Index | name | type | allow NULL |
+-------+------+----------+-------------+
| 0 | key | STRING | true |
| 1 | val | STRING | true |
+-------+------+----------+-------------+
MySQL
MySQL表結構
mysql> desc orders;
+-------+---------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| oid | int(11) | YES | | NULL | |
| pid | int(11) | YES | | NULL | |
| num | int(11) | YES | | NULL | |
+-------+---------+------+-----+---------+-------+
3 rows in set (0.00 sec)
數據
寫入DataHub后,key為null,val為JSON字符串。
mysql> insert into orders values(1,2,3);
{
"data":[
{
"oid":"1",
"pid":"2",
"num":"3"
}
],
"database":"ggtt",
"es":1591092305000,
"id":2,
"isDdl":false,
"mysqlType":{
"oid":"int(11)",
"pid":"int(11)",
"num":"int(11)"
},
"old":null,
"pkNames":null,
"sql":"",
"sqlType":{
"oid":4,
"pid":4,
"num":4
},
"table":"orders",
"ts":1591092305813,
"type":"INSERT"
}