本文以MySQL數據庫為例介紹如何使用Canal接入云消息隊列 RocketMQ 版,實現MySQL數據庫Binlog數據的變更處理。
背景信息
CDC(Change Data Capture)是一種監測并捕獲數據庫變更的典型技術方案,常應用于異構數據源之間的數據同步。Canal作為一款輕量級的CDC工具,可基于數據庫增量日志解析,提供增量變更數據的訂閱和消費能力。Canal可以將變更記錄可靠地投遞到云消息隊列 RocketMQ 版中,借助云消息隊列 RocketMQ 版豐富的消息處理策略實現多樣化的業務邏輯。
Canal是一個開源項目,倉庫地址請參見Canal。
應用場景
基于Binlog日志實現增量訂閱和消費的典型業務場景如下:
數據庫鏡像
數據庫實時備份
索引構建和實時維護(拆分異構索引、倒排索引等)
業務Cache刷新
帶業務邏輯的增量數據處理
方案介紹
基于Canal和云消息隊列 RocketMQ 版的CDC方案如下:
如上圖所示,Canal將自己偽裝成庫,監聽并接收數據庫的Binlog,并同步到云消息隊列 RocketMQ 版等存儲或其他中間件系統。
具體操作步驟如下:
配置MySQL:開啟MySQL的Binlog功能,創建測試需要的數據庫和表。
部署Canal:部署一個canal-deployer(server),監聽并接收MySQL數據庫的Binlog。
測試驗證:驗證數據變動后消息發送的情況。
環境要求
資源要求
處于運行中狀態的云消息隊列 RocketMQ 版實例。實例創建,請參見創建消息隊列RocketMQ版實例。
處于運行中的MySQL實例。本文以阿里云RDS MySQL為例,實例創建,請參見創建RDS MySQL實例。
用于部署運行Canal相關組件的機器。本文以ECS為例,實例創建和使用,請參見通過控制臺使用ECS實例(快捷版)。
網絡要求
部署canal-deployer(server)的節點可以連接數據庫和云消息隊列 RocketMQ 版實例,一般位于VPC內的ECS和容器都可以連接。
版本要求
服務 | 版本 | 說明 |
Canal | 1.1.6 | 其他版本請參見Canal Release。 |
MySQL | 8.0 | 支持源端MySQL版本包括5.1.x、5.5.x、5.6.x、5.7.x、8.0.x。 |
云消息隊列 RocketMQ 版 | 5.x |
|
1.配置MySQL
1.1 開啟Binlog功能
阿里云RDS MySQL
阿里云RDS MySQL默認已開啟Binlog功能,并且賬號默認具有Binlog dump權限,可以直接跳過這一步。
自建MySQL
開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下:
[mysqld] log-bin=mysql-bin # 開啟binlog binlog-format=ROW # 選擇ROW模式 server_id=1 # 配置MySQL replaction需要定義,不要和canal的slaveId重復
創建用戶canal并授權MySQL slave 的權限。
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
1.2 創建數據庫
執行下面的SQL,創建一個名為canal的數據庫。
CREATE DATABASE canal DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
1.3 創建表
執行下面的SQL,在canal數據庫創建一個名為students的表。
CREATE TABLE students (
id INT AUTO_INCREMENT,
name VARCHAR(100) NOT NULL,
age INT,
gender VARCHAR(10),
PRIMARY KEY (id)
);
2.部署Canal
2.1 安裝JDK
執行下面的命令,安裝JDK。
sudo yum install java-1.8.0-openjdk
2.2 下載canal-deployer
執行下面的命令,下載canal-deployer安裝包。
sudo wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
2.3 解壓縮安裝包
執行下面的命令,創建目錄canal-server,并將下載的安裝包解壓縮到canal-server目錄中。
# 創建目錄canal-server
sudo mkdir -p /usr/local/canal-server
# 將下載的安裝包解壓縮到canal-server目錄中
sudo tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-server
2.4 修改配置
配置Canal啟動的參數,詳細配置請參見參數說明。
執行下面的命令,配置canal.properties。
sudo vi /usr/local/canal-server/conf/canal.properties
# 服務端模式
canal.serverMode = rocketMQ
# AccessKey ID,阿里云身份驗證標識。
canal.aliyun.accessKey = 6W0xz2uPf******
# AccessKey Secret,阿里云身份驗證密鑰
canal.aliyun.secretKey = sK56k1DrGx******
# 消息隊列接入的方式
canal.mq.accessChannel = cloud
# 云消息隊列 RocketMQ 版實例中Group名
rocketmq.producer.group = canal_test
# 是否開啟消息軌跡
rocketmq.enable.message.trace = false
# message trace的topic
rocketmq.customized.trace.topic =
# 云消息隊列 RocketMQ 版實例的命名空間。云消息隊列 RocketMQ 版5.x實例無需填寫該參數。
rocketmq.namespace =
# 云消息隊列 RocketMQ 版實例的接入點。在云消息隊列 RocketMQ 版控制臺實例詳情頁面獲取。
rocketmq.namesrv.addr = rmq-cn-xxx.{$RegionId}.rmq.aliyuncs.com:8080
# 重試次數
rocketmq.retry.times.when.send.failed = 0
# 是否啟用VIP Netty通道發送消息
rocketmq.vip.channel.enabled = false
# 消息的tag配置
rocketmq.tag =
獲取阿里云訪問密鑰AccessKey ID和AccessKey Secret。更多信息,請參見創建AccessKey。
執行下面的命令,配置instance.properties。
sudo vi /usr/local/canal-server/conf/example/instance.properties
# 阿里云RDS MySQL數據庫的連接地址
canal.instance.master.address=rm-uf62****.rwlb.rds.aliyuncs.com:3306
# 阿里云RDS MySQL數據庫的賬號
canal.instance.dbUsername=xxx
# 阿里云RDS MySQL數據庫的密碼
canal.instance.dbPassword=xxx
# mysql 數據解析關注的表,Perl正則表達式,canal\\..*表示canal schema下所有表
canal.instance.filter.regex=canal\\..*
# 云消息隊列 RocketMQ 版實例的topic名稱
canal.mq.topic=canal_topic
2.5 啟動canal-deployer
執行下面的命令啟動canal-deployer。
/usr/local/canal-server/bin/startup.sh
2.6 驗證啟動
執行下面的命令查看canal.log日志文件,確認Canal成功啟動。
sudo vi /usr/local/canal-server/logs/canal/canal.log
2024-07-15 17:24:12.154 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2024-07-15 17:24:12.202 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2024-07-15 17:24:12.497 [main] INFO c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer##
2024-07-15 17:24:12.799 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2024-07-15 17:24:12.984 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.XX.XX:11111]
2024-07-15 17:24:16.208 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
執行下面的命令查看example.log日志文件,確認Canal Instance成功啟動。
sudo vi /usr/local/canal-server/logs/example/example.log
2024-07-15 18:22:15.667 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2024-07-15 18:22:15.699 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal\..*$
2024-07-15 18:22:15.699 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2024-07-15 18:22:16.030 [main] INFO c.a.otter.canal.instance.core.AbstractCanalIn
3.測試驗證
3.1 向MYSQL數據庫中添加數據
執行下面的SQL,向步驟1.3 創建表所創建的表students添加一條數據。
INSERT INTO`students` (`name`, `age`, `gender`)VALUES('Tome', 18, 'male');
3.2 查看Canal發送的消息
登錄云消息隊列 RocketMQ 版控制臺,找到部署時配置的實例,在消息查詢頁面查看消息如下:
相關參考
Canal支持更多的高級功能設置,完整的參數設置請參見官網文檔Canal快速入門。