日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Canal將MySQL的數據同步至云消息隊列 Kafka 版

本教程介紹如何使用Canal將MySQL的數據同步至云消息隊列 Kafka 版

背景信息

Canal的主要用途是基于MySQL數據庫增量日志解析,提供增量數據訂閱和消費。Canal偽裝自己為MySQL Slave,向MySQL Master發送dump請求。MySQL Master收到dump請求,開始推送Binary log給Canal,Canal解析Binary log來同步數據。Canal與云消息隊列 Kafka 版建立對接,您可以把MySQL更新的數據寫入到云消息隊列 Kafka 版中來分析。其詳細的工作原理,請參見Canal官網背景介紹

前提條件

在開始本教程前,請確保您已完成以下操作:

操作步驟

  1. 下載Canal壓縮包,本教程以1.1.5版本為例。

  2. 執行以下命令,創建目錄文件夾。本教程以/home/doc/tools/canal.deployer-1.1.5路徑為例。

    mkdir -p /home/doc/tools/canal.deployer-1.1.5
  3. Canal壓縮包復制到/home/doc/tools/canal.deployer-1.1.5路徑并解壓。

    tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C /home/doc/tools/canal.deployer-1.1.5
  4. /home/doc/tools/canal.deployer-1.1.5路徑,執行以下命令,編輯instance.properties文件。

    vi conf/example/instance.properties

    根據instance.properties參數列表配置參數。

    #  根據實際情況修改為您的數據庫信息。
    #################################################
    ...
    # 數據庫地址。
    canal.instance.master.address=192.168.XX.XX:3306
    # username/password為數據庫的用戶名和密碼。
    ...
    canal.instance.dbUsername=****
    canal.instance.dbPassword=****
    ...
    # mq config
    # 您在云消息隊列 Kafka 版控制臺創建的Topic。
    canal.mq.topic=mysql_test
    # 針對數據庫名或者表名發送動態Topic。
    #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
    # 數據同步到云消息隊列 Kafka 版Topic的指定分區。
    canal.mq.partition=0
    # 以下兩個參數配置與canal.mq.partition互斥。配置以下兩個參數可以使數據發送至云消息隊列 Kafka 版Topic的不同分區。
    #canal.mq.partitionsNum=3
    #庫名.表名: 唯一主鍵,多個表之間用逗號分隔。
    #canal.mq.partitionHash=mytest.person:id,mytest.role:id
    #################################################
    表 1. instance.properties參數列表

    參數

    是否必選

    描述

    canal.instance.master.address

    MySQL數據庫的連接地址。

    canal.instance.dbUsername

    MySQL數據庫的用戶名。

    canal.instance.dbPassword

    MySQL數據庫的密碼。

    canal.mq.topic

    云消息隊列 Kafka 版實例的Topic。您可以在云消息隊列 Kafka 版控制臺Topic 管理頁面創建。具體操作,請參見步驟三:創建資源

    canal.mq.dynamicTopic

    動態Topic規則表達式。設置Topic匹配規則表達式,可以將不同的數據表數據同步至不同的Topic。具體設置方法,請參見參數說明

    canal.mq.partition

    數據庫數據同步到云消息隊列 Kafka 版Topic的指定分區。

    canal.mq.partitionsNum

    Topic的分區數量。該參數與canal.mq.partitionHash一起使用,可以將數據同步至云消息隊列 Kafka 版Topic不同的分區。

    canal.mq.partitionHash

    分區的規則表達式。具體設置方法,請參見參數說明

  5. 執行以下命令,編輯canal.properties文件。

    vi conf/canal.properties

    根據canal.properties參數列表說明配置參數。

    • 公網環境,消息采用SASL_SSL協議進行鑒權并加密,通過SSL接入點訪問云消息隊列 Kafka 版。接入點的詳細信息,請參見接入點對比

      # ...
      # 您需設置為kafka。
      canal.serverMode = kafka
      # ...
      # kafka配置。
      #在云消息隊列 Kafka 版實例詳情頁面獲取的SSL接入點。
      kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093
      # 參數的默認設置如下所示,您可以根據實際情況調整。
      kafka.acks = all
      kafka.compression.type = none
      kafka.batch.size = 16384
      kafka.linger.ms = 1
      kafka.max.request.size = 1048576
      kafka.buffer.memory = 33554432
      kafka.max.in.flight.requests.per.connection = 1
      kafka.retries = 0
      
      # 公網環境,通過SASL_SSL鑒權并加密,您需配置網絡協議與身份校驗機制。
      kafka.ssl.truststore.location= ../conf/kafka_client_truststore_jks
      kafka.ssl.truststore.password= KafkaOnsClient
      kafka.security.protocol= SASL_SSL
      kafka.sasl.mechanism = PLAIN
      kafka.ssl.endpoint.identification.algorithm =
      表 2. canal.properties參數列表

      參數

      是否必選

      描述

      canal.serverMode

      您需設置為kafka

      kafka.bootstrap.servers

      云消息隊列 Kafka 版實例接入點。您可在云消息隊列 Kafka 版控制臺實例詳情頁面的接入點信息區域獲取。

      kafka.ssl.truststore.location

      SSL根證書kafka.client.truststore.jks的存放路徑。

      說明

      公網環境下,消息必須進行鑒權與加密,才能確保傳輸的安全。即需通過SSL接入點采用SASL_SSL協議進行傳輸。具體信息,請參見接入點對比

      kafka.acks

      云消息隊列 Kafka 版接收到數據之后給客戶端發出的確認信號。取值說明如下:

      • 0:表示客戶端不需要等待任何確認收到的信息。

      • 1:表示等待Leader成功寫入而不等待所有備份是否成功寫入。

      • all:表示等待Leader成功寫入并且所有備份都成功寫入。

      kafka.compression.type

      壓縮數據的壓縮算法,默認是無壓縮。取值如下:

      • none

      • gzip

      • snappy

      kafka.batch.size

      客戶端數據塊攢批的大小。單位:Byte。

      該參數控制批量處理消息的字節數。客戶端發送到Brokers的請求將包含多個批量處理,以減少請求次數。較小的批量處理數值可能降低吞吐量,而較大的批量處理數值將會浪費更多內存空間,分配一個指定的批量處理消息緩沖區有助于提高客戶端和服務端的性能。

      說明

      kafka.batch.sizekafka.linger.ms參數都是控制批量處理消息的條件,滿足其中一個參數的條件,客戶端就攢批完成,消息進入待發送狀態。

      kafka.linger.ms

      客戶端數據塊攢批的最大時長。單位:ms。

      客戶端設定攢批消息的延遲時間,以批量處理消息,減少請求次數。

      kafka.max.request.size

      客戶端每次請求的最大字節數。

      kafka.buffer.memory

      緩存數據的內存大小。

      kafka.max.in.flight.requests.per.connection

      限制客戶端在單個連接上能夠發送的未響應請求的個數。設置此值是1表示Broker在響應請求之前客戶端不能再向同一個Broker發送請求。

      kafka.retries

      消息發送失敗時,是否重復發送。設置為0,表示不會重復發送;設置大于0的值,客戶端重新發送數據。

      kafka.ssl.truststore.password

      SSL根證書的密碼,設置為KafkaOnsClient

      kafka.security.protocol

      采用SASL_SSL協議進行鑒權并加密,即設置為SASL_SSL

      kafka.sasl.mechanism

      SASL身份認證的機制。SSL接入點采用PLAIN機制驗證身份。

      公網環境,需通過SASL進行身份校驗,需要在bin/startup.sh配置環境變量,并編輯kafka_client_producer_jaas.conf文件,配置云消息隊列 Kafka 版實例的用戶名與密碼。

      1. 執行vi bin/startup.sh命令,編輯startup.sh文件,配置環境變量。

        JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8 -Djava.security.auth.login.config=/home/doc/tools/canal.deployer-1.1.5/conf/kafka_client_jaas.conf"
      2. 執行vi conf/kafka_client_producer_jaas.conf命令,編輯kafka_client_producer_jaas.conf文件,配置實例用戶名與密碼信息。

        說明
        • 如果實例未開啟ACL,您可以在云消息隊列 Kafka 版控制臺的實例詳情頁面獲取默認用戶的用戶名和密碼。

        • 如果實例已開啟ACL,請確保要使用的SASL用戶為PLAIN類型且已授權收發消息的權限。具體信息,請參見SASL用戶授權

        KafkaClient {  org.apache.kafka.common.security.plain.PlainLoginModule required
                       username="實例的用戶名"  
                       password="實例的用戶名密碼";
        };
    • VPC環境,消息采用PLAINTEXT協議不鑒權不加密傳輸,通過默認接入點訪問云消息隊列 Kafka 版,僅需配置canal.serverModekafka.bootstrap.servers參數。接入點的詳細信息,請參見接入點對比

      # ...
      # 您需設置為kafka。
      canal.serverMode = kafka
      # ...
      # kafka配置。
      # 在云消息隊列 Kafka 版實例詳情頁面獲取的默認接入點。
      kafka.bootstrap.servers = alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092
      # 以下參數請您可以按照實際情況調整,也可以保持默認設置。
      kafka.acks = all
      kafka.compression.type = none
      kafka.batch.size = 16384
      kafka.linger.ms = 1
      kafka.max.request.size = 1048576
      kafka.buffer.memory = 33554432
      kafka.max.in.flight.requests.per.connection = 1
      kafka.retries = 0
  6. /home/doc/tools/canal.deployer-1.1.5路徑,執行以下命令,啟動Canal。

    sh bin/startup.sh
    • 查看/home/doc/tools/canal.deployer-1.1.5/logs/canal/canal.log日志文件,確認Canal與云消息隊列 Kafka 版連接成功,Canal正在運行。

      2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
      2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.XX.XX:11111]
      2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
    • 查看/home/doc/tools/canal.deployer-1.1.5/logs/example/example.log日志文件,確認Canal Instance已啟動。

      2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
      2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
      2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
      2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

測試驗證

啟動Canal之后,進行數據同步驗證。

  1. 在MySQL數據庫,新建數據表T_Student。數據表數據示例如下:

    mysql> select * from T_Student;
    +--------+---------+------+------+
    | stuNum | stuName | age  | sex  |
    +--------+---------+------+------+
    |      1 | 小王    |   18 | girl |
    |      2 | 小張    |   17 | boy  |
    +--------+---------+------+------+
    2 rows in set (0.00 sec)

    查看/home/doc/tools/canal.deployer-1.1.5/logs/example/meta.log日志文件,數據庫的每次增刪改操作,都會在meta.log中生成一條記錄,查看該日志可以確認Canal是否有采集到數據。

    tail -f example/meta.log
    2020-07-29 09:21:05.110 - clientId:1001 cursor:[log.000001,29723,1591190230000,1,] address[/192.168.XX.XX:3306]
    2020-07-29 09:23:46.109 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:24:50.547 - clientId:1001 cursor:[log.000001,30047,1595985825000,1,] address[/192.168.XX.XX:3306]
    2020-07-29 09:26:45.547 - clientId:1001 cursor:[log.000001,30143,1595986005000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:04.546 - clientId:1001 cursor:[log.000001,30467,1595986204000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:16.546 - clientId:1001 cursor:[log.000001,30734,1595986215000,1,] address[localhost/192.168.XX.XX:3306]
    2020-07-29 09:30:36.547 - clientId:1001 cursor:[log.000001,31001,1595986236000,1,] address[localhost/192.168.XX.XX:3306]
  2. 登錄云消息隊列 Kafka 版控制臺,查詢消息,確認MySQL的數據被同步在云消息隊列 Kafka 版。控制臺查詢消息的具體操作,請參見查詢消息

    查詢消息
  3. 數據同步完畢,執行以下命令,關閉Canal。

    sh bin/stop.sh