本文介紹如何使用MirrorMaker將自建Kafka集群的數據遷移到云消息隊列 Kafka 版集群。

前提條件

您已完成以下操作:

背景信息

Kafka的鏡像特性可實現Kafka集群的數據備份。實現這一特性的工具就是MirrorMaker。您可以使用MirrorMaker將源集群中的數據鏡像拷貝到目標集群。如下圖所示,Mirror Maker使用一個內置的Consumer從源自建Kafka集群消費消息,然后再使用一個內置的Producer將這些消息重新發送到目標云消息隊列 Kafka 版集群。

dg_data_migration

更多信息,請參見Apache Kafka MirrorMaker

注意事項

  • Topic名稱必須一致。
  • 分區數量可以不一致。
  • 在同一個分區中的數據遷移后并不保證依舊在同一個分區中。
  • 默認情況下,Key相同的消息會分布在同一分區中。
  • 普通消息在宕機時可能會亂序,分區順序消息在宕機時依然保持順序。

VPC接入

  1. 配置consumer.properties
    ## 自建Kafka集群的接入點
    bootstrap.servers=XXX.XXX.XXX.XXX:9092
    
    ## 消費者分區分配策略
    partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
    
    ## Group的名稱
    group.id=test-consumer-group
  2. 配置producer.properties
    ## 云消息隊列 Kafka 版集群的默認接入點(可在云消息隊列 Kafka 版控制臺獲取)
    bootstrap.servers=XXX.XXX.XXX.XXX:9092
    
    ## 數據壓縮方式
    compression.type=none                                
  3. 執行以下命令開啟遷移進程。
    sh bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist topicName

公網接入

  1. 下載kafka.client.truststore.jks
  2. 配置kafka_client_jaas.conf
    KafkaClient {
       org.apache.kafka.common.security.plain.PlainLoginModule required
       username="your username"
       password="your password";
    };
  3. 配置consumer.properties
    ## 自建Kafka集群的接入點
    bootstrap.servers=XXX.XXX.XXX.XXX:9092
    
    ## 消費者分區分配策略
    partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
    
    ## Group名稱
    group.id=test-consumer-group
  4. 配置producer.properties
    ## 云消息隊列 Kafka 版集群的SSL接入點(可在云消息隊列 Kafka 版控制臺獲取)
    bootstrap.servers=XXX.XXX.XXX.XXX:9093
    
    ## 數據壓縮方式
    compression.type=none
    
    ## truststore(使用步驟1下載的文件)
    ssl.truststore.location=kafka.client.truststore.jks
    ssl.truststore.password=KafkaOnsClient
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    
    ## 云消息隊列 Kafka 版2.X版本在配置SASL接入時需要做以下配置,2.X以下版本不需要配置。
    ssl.endpoint.identification.algorithm=
  5. 設置java.security.auth.login.config
    export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"                              
  6. 執行以下命令開啟遷移進程。
    sh bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist topicName

結果驗證

您可通過以下任一方法驗證MirrorMaker是否運行成功。

  • 通過kafka-consumer-groups.sh查看自建集群消費進度。

    bin/kafka-consumer-groups.sh --new-consumer --describe --bootstrap-server自建集群接入點 --group test-consumer-group

  • 往自建集群中發送消息,在云消息隊列 Kafka 版控制臺中查看Topic的分區狀態,確認當前服務器上消息總量是否正確。您還可以通過云消息隊列 Kafka 版控制臺來查看具體消息內容。具體操作,請參見查詢消息