日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Canal插件

canal [k?’n?l],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費。 早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基于業務 trigger 獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。 當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

一、背景介紹

說明

canal [k?’n?l],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費。 早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基于業務 trigger 獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。 當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

目前canal支持寫入kafka,并且DataHub兼容kafka協議,所以您可以使用canal將MySql的增量數據寫入DataHub。同時為了保證Canal能夠像寫Kafka一樣寫入DataHub,對開源Canal做了一些必要的改動,主要改動如下:

  • kafka的TopicName為DataHub的ProjectName.TopicName的組合,因此去掉canal將kafka TopicName中的”.”替換為”_”的行為,保證kafka TopicName能夠正確映射到DataHub的Topic

  • DataHub使用SASL PLAIN進行認證,所以修改了啟動腳本,添加了環境變量-Djava.security.auth.login.config=$kafka_jaas_conf

二、使用說明

本文只是給出了一個canal寫入DataHub(kafka)的基礎示例,更多參數配置和參數含義請參考canal官網

1. 下載canal deployer壓縮包

首先下載canal壓縮包canal.deployer-1.1.5-SNAPSHOT.tar.gz。如果使用未經DataHub改動的canal,可能會無法寫入DataHub。

2. 將canal.deployer 復制到固定目錄并解壓

mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /usr/local/canal

3.修改配置參數

3.1 修改instance配置 conf/example/instance.properties

#  按需修改成自己的數據庫信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,數據庫的用戶名和密碼
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=test_project.test_topic
# 針對庫名或者表名發送動態topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#庫名.表名: 唯一主鍵,多個表之間用逗號分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

對應IP地址的MySQL 數據庫需進行相關初始化與設置, 可參考 Canal QuickStart。針對庫名的動態TopicName和根據主鍵哈希的設置,可參考mq參數說明

3.2 修改canal配置文件 conf/canal.properties

# ...
canal.serverMode = kafka
# ...
kafka.bootstrap.servers = dh-cn-hangzhou.aliyuncs.com:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.security.protocol = SASL_SSL
kafka.sasl.mechanism = PLAIN

其中必須配置參數為canal.serverMode、kafka.bootstrap.servers、kafka.security.protocol、kafka.sasl.mechanism,其他參數用戶可根據實際情況自主進行調優,kafka.bootstrap.servers需要選擇topic所在region的endpoint,endpoint可前往DataHub兼容kafka協議查看。

3.3 修改jass配置文件 conf/kafka_client_producer_jaas.conf

kafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="accessId"
  password="accessKey";
};

4. 啟動和關閉

啟動之前需要保證DataHub有相應的Topic,創建的Topic要求可以參考DataHub兼容kafka協議

4.1 啟動

cd /usr/local/canal/
sh bin/startup.sh

4.2 查看日志

查看 logs/canal/canal.logvi logs/canal/canal.log

2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

查看instance的日志:vi logs/example/example.log

2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

查看meta的日志,vi logs/example/meta.log

數據庫的每次增刪改操作,都會在meta.log中生成一條記錄,查看該日志可以確認canal是否有采集到數據。

tail -f example/meta.log
2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/127.0.0.1:3306]
2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/127.0.0.1:3306]
2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/127.0.0.1:3306]
2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/127.0.0.1:3306]

4.3 關閉

cd /usr/local/canal/
sh bin/stop.sh

三、 數據示例

DataHub Topic

DataHub Topic為tuple topic,Schema為

+-------+------+----------+-------------+
| Index | name |   type   |  allow NULL |
+-------+------+----------+-------------+
|   0   |  key |  STRING  |     true    |
|   1   |  val |  STRING  |     true    |
+-------+------+----------+-------------+

MySQL

MySQL表結構

mysql> desc orders;
+-------+---------+------+-----+---------+-------+
| Field | Type    | Null | Key | Default | Extra |
+-------+---------+------+-----+---------+-------+
| oid   | int(11) | YES  |     | NULL    |       |
| pid   | int(11) | YES  |     | NULL    |       |
| num   | int(11) | YES  |     | NULL    |       |
+-------+---------+------+-----+---------+-------+
3 rows in set (0.00 sec)

數據

寫入DataHub后,key為null,val為JSON字符串。

mysql> insert into orders values(1,2,3);

{
    "data":[
        {
            "oid":"1",
            "pid":"2",
            "num":"3"
        }
    ],
    "database":"ggtt",
    "es":1591092305000,
    "id":2,
    "isDdl":false,
    "mysqlType":{
        "oid":"int(11)",
        "pid":"int(11)",
        "num":"int(11)"
    },
    "old":null,
    "pkNames":null,
    "sql":"",
    "sqlType":{
        "oid":4,
        "pid":4,
        "num":4
    },
    "table":"orders",
    "ts":1591092305813,
    "type":"INSERT"
}