日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Canal和RocketMQ實現數據庫變更訂閱處理

本文以MySQL數據庫為例介紹如何使用Canal接入云消息隊列 RocketMQ 版,實現MySQL數據庫Binlog數據的變更處理。

背景信息

CDC(Change Data Capture)是一種監測并捕獲數據庫變更的典型技術方案,常應用于異構數據源之間的數據同步。Canal作為一款輕量級的CDC工具,可基于數據庫增量日志解析,提供增量變更數據的訂閱和消費能力。Canal可以將變更記錄可靠地投遞到云消息隊列 RocketMQ 版中,借助云消息隊列 RocketMQ 版豐富的消息處理策略實現多樣化的業務邏輯。

Canal是一個開源項目,倉庫地址請參見Canal。

應用場景

基于Binlog日志實現增量訂閱和消費的典型業務場景如下:

  • 數據庫鏡像

  • 數據庫實時備份

  • 索引構建和實時維護(拆分異構索引、倒排索引等)

  • 業務Cache刷新

  • 帶業務邏輯的增量數據處理

方案介紹

基于Canal和云消息隊列 RocketMQ 版的CDC方案如下:

CDC方案

如上圖所示,Canal將自己偽裝成庫,監聽并接收數據庫的Binlog,并同步到云消息隊列 RocketMQ 版等存儲或其他中間件系統。

具體操作步驟如下:

  1. 配置MySQL:開啟MySQL的Binlog功能,創建測試需要的數據庫和表。

  2. 部署Canal:部署一個canal-deployer(server),監聽并接收MySQL數據庫的Binlog。

  3. 測試驗證:驗證數據變動后消息發送的情況。

環境要求

資源要求

網絡要求

  • 部署canal-deployer(server)的節點可以連接數據庫和云消息隊列 RocketMQ 版實例,一般位于VPC內的ECS和容器都可以連接。

版本要求

服務

版本

說明

Canal

1.1.6

其他版本請參見Canal Release。

MySQL

8.0

支持源端MySQL版本包括5.1.x、5.5.x、5.6.x、5.7.x、8.0.x。

云消息隊列 RocketMQ 版

5.x

1.配置MySQL

1.1 開啟Binlog功能

阿里云RDS MySQL

阿里云RDS MySQL默認已開啟Binlog功能,并且賬號默認具有Binlog dump權限,可以直接跳過這一步。

自建MySQL

  • 開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下:

    [mysqld]
    log-bin=mysql-bin  # 開啟binlog
    binlog-format=ROW  # 選擇ROW模式
    server_id=1  # 配置MySQL replaction需要定義,不要和canal的slaveId重復
  • 創建用戶canal并授權MySQL slave 的權限。

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    FLUSH PRIVILEGES;

1.2 創建數據庫

執行下面的SQL,創建一個名為canal的數據庫。

CREATE DATABASE canal DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;

1.3 創建表

執行下面的SQL,在canal數據庫創建一個名為students的表。

CREATE TABLE students (
    id INT AUTO_INCREMENT,
    name VARCHAR(100) NOT NULL,
    age INT,
    gender VARCHAR(10),
    PRIMARY KEY (id)
);

2.部署Canal

2.1 安裝JDK

執行下面的命令,安裝JDK。

sudo yum install java-1.8.0-openjdk

2.2 下載canal-deployer

執行下面的命令,下載canal-deployer安裝包。

sudo wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz

2.3 解壓縮安裝包

執行下面的命令,創建目錄canal-server,并將下載的安裝包解壓縮到canal-server目錄中。

# 創建目錄canal-server
sudo mkdir -p /usr/local/canal-server

# 將下載的安裝包解壓縮到canal-server目錄中
sudo tar -zxvf canal.deployer-1.1.6.tar.gz -C /usr/local/canal-server

2.4 修改配置

配置Canal啟動的參數,詳細配置請參見參數說明。

  1. 執行下面的命令,配置canal.properties。

sudo vi /usr/local/canal-server/conf/canal.properties
# 服務端模式
canal.serverMode = rocketMQ
# AccessKey ID,阿里云身份驗證標識。
canal.aliyun.accessKey = 6W0xz2uPf******
# AccessKey Secret,阿里云身份驗證密鑰
canal.aliyun.secretKey = sK56k1DrGx******
# 消息隊列接入的方式
canal.mq.accessChannel = cloud
# 云消息隊列 RocketMQ 版實例中Group名
rocketmq.producer.group = canal_test
# 是否開啟消息軌跡
rocketmq.enable.message.trace = false
# message trace的topic
rocketmq.customized.trace.topic = 
# 云消息隊列 RocketMQ 版實例的命名空間。云消息隊列 RocketMQ 版5.x實例無需填寫該參數。
rocketmq.namespace =
# 云消息隊列 RocketMQ 版實例的接入點。在云消息隊列 RocketMQ 版控制臺實例詳情頁面獲取。
rocketmq.namesrv.addr = rmq-cn-xxx.{$RegionId}.rmq.aliyuncs.com:8080
# 重試次數
rocketmq.retry.times.when.send.failed = 0
# 是否啟用VIP Netty通道發送消息
rocketmq.vip.channel.enabled = false
# 消息的tag配置
rocketmq.tag = 
說明

獲取阿里云訪問密鑰AccessKey ID和AccessKey Secret。更多信息,請參見創建AccessKey。

  1. 執行下面的命令,配置instance.properties。

sudo vi /usr/local/canal-server/conf/example/instance.properties
# 阿里云RDS MySQL數據庫的連接地址
canal.instance.master.address=rm-uf62****.rwlb.rds.aliyuncs.com:3306
# 阿里云RDS MySQL數據庫的賬號
canal.instance.dbUsername=xxx
# 阿里云RDS MySQL數據庫的密碼
canal.instance.dbPassword=xxx
# mysql 數據解析關注的表,Perl正則表達式,canal\\..*表示canal schema下所有表 
canal.instance.filter.regex=canal\\..*
# 云消息隊列 RocketMQ 版實例的topic名稱
canal.mq.topic=canal_topic

2.5 啟動canal-deployer

執行下面的命令啟動canal-deployer。

/usr/local/canal-server/bin/startup.sh

2.6 驗證啟動

  1. 執行下面的命令查看canal.log日志文件,確認Canal成功啟動。

sudo vi /usr/local/canal-server/logs/canal/canal.log  
2024-07-15 17:24:12.154 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2024-07-15 17:24:12.202 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2024-07-15 17:24:12.497 [main] INFO  c.a.o.c.c.rocketmq.producer.CanalRocketMQProducer - ##Start RocketMQ producer##
2024-07-15 17:24:12.799 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2024-07-15 17:24:12.984 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.17.XX.XX:11111]
2024-07-15 17:24:16.208 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
  1. 執行下面的命令查看example.log日志文件,確認Canal Instance成功啟動。

sudo vi /usr/local/canal-server/logs/example/example.log 
2024-07-15 18:22:15.667 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2024-07-15 18:22:15.699 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^canal\..*$
2024-07-15 18:22:15.699 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2024-07-15 18:22:16.030 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalIn

3.測試驗證

3.1 向MYSQL數據庫中添加數據

執行下面的SQL,向步驟1.3 創建表所創建的表students添加一條數據。

INSERT INTO`students` (`name`, `age`, `gender`)VALUES('Tome', 18, 'male');

3.2 查看Canal發送的消息

登錄云消息隊列 RocketMQ 版控制臺,找到部署時配置的實例,在消息查詢頁面查看消息如下:

image

相關參考

Canal支持更多的高級功能設置,完整的參數設置請參見官網文檔Canal快速入門