本文以Java SDK為例介紹如何在公網環境下使用SDK接入云消息隊列 Kafka 版的SSL接入點并使用PLAIN機制收發消息。

前提條件

安裝Java依賴庫

pom.xml中添加以下依賴。
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.6</version>
</dependency>
說明 建議您保持服務端和客戶端版本一致,即保持客戶端庫版本和云消息隊列 Kafka 版實例的大版本一致。您可以在云消息隊列 Kafka 版控制臺的實例詳情頁面獲取云消息隊列 Kafka 版實例的大版本。

準備配置

  1. 創建Log4j配置文件log4j.properties
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    log4j.rootLogger=INFO, STDOUT
    
    log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
    log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
    log4j.appender.STDOUT.layout.ConversionPattern=[%d] %p %m (%c)%n
  2. 下載SSL根證書
  3. 創建JAAS配置文件kafka_client_jaas.conf
    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="xxxx"
      password="xxxx";
    };                       
    說明
    • 如果實例未開啟ACL,您可以在云消息隊列 Kafka 版控制臺的實例詳情頁面獲取默認用戶的用戶名和密碼。
    • 如果實例已開啟ACL,請確保要使用的SASL用戶為PLAIN類型且已授權收發消息的權限,詳情請參見SASL用戶授權
  4. 創建云消息隊列 Kafka 版配置文件kafka.properties
    ##SSL接入點,通過控制臺獲取。
    bootstrap.servers=xxxx
    ##Topic,通過控制臺創建。
    topic=xxxx
    ##Group,通過控制臺創建。
    group.id=xxxx
    ##SSL根證書。
    ssl.truststore.location=/xxxx/kafka.client.truststore.jks
    ##JAAS配置文件。
    java.security.auth.login.config=/xxxx/kafka_client_jaas.conf                       
  5. 創建配置文件加載程序JavaKafkaConfigurer.java
    import java.util.Properties;
    
    public class JavaKafkaConfigurer {
    
        private static Properties properties;
    
        public static void configureSasl() {
            //如果用-D或者其它方式設置過,這里不再設置。
            if (null == System.getProperty("java.security.auth.login.config")) {
                //請注意將XXX修改為自己的路徑。
                //這個路徑必須是一個文件系統可讀的路徑,不能被打包到JAR中。
                System.setProperty("java.security.auth.login.config", getKafkaProperties().getProperty("java.security.auth.login.config"));
            }
        }
    
        public synchronized static Properties getKafkaProperties() {
            if (null != properties) {
                return properties;
            }
            //獲取配置文件kafka.properties的內容。
            Properties kafkaProperties = new Properties();
            try {
                kafkaProperties.load(KafkaProducerDemo.class.getClassLoader().getResourceAsStream("kafka.properties"));
            } catch (Exception e) {
                //沒加載到文件,程序要考慮退出。
                e.printStackTrace();
            }
            properties = kafkaProperties;
            return kafkaProperties;
        }
    }                    

發送消息

  1. 創建發送消息程序KafkaProducerDemo.java
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    import java.util.concurrent.Future;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.config.SslConfigs;
    
    public class KafkaProducerDemo {
    
        public static void main(String args[]) {
            //設置JAAS配置文件的路徑。
            JavaKafkaConfigurer.configureSasl();
    
            //加載kafka.properties。
            Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();
    
            Properties props = new Properties();
            //設置接入點,請通過控制臺獲取對應Topic的接入點。
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
            //設置SSL根證書的路徑,請記得將XXX修改為自己的路徑。
            //與sasl路徑類似,該文件也不能被打包到jar中。
            props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
            //根證書store的密碼,保持不變。
            props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
            //接入協議,目前支持使用SASL_SSL協議接入。
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            //SASL鑒權方式,保持不變。
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            //云消息隊列 Kafka 版消息的序列化方式。
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            //請求的最長等待時間。
            props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 30 * 1000);
            //設置客戶端內部重試次數。
            props.put(ProducerConfig.RETRIES_CONFIG, 5);
            //設置客戶端內部重試間隔。
            props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3000);
    
            //Hostname校驗改成空。
            props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
    
            //構造Producer對象,注意,該對象是線程安全的,一般來說,一個進程內一個Producer對象即可。
            //如果想提高性能,可以多構造幾個對象,但不要太多,最好不要超過5個。
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
    
            //構造一個云消息隊列 Kafka 版消息。
            String topic = kafkaProperties.getProperty("topic"); //消息所屬的Topic,請在控制臺申請之后,填寫在這里。
            String value = "this is the message's value"; //消息的內容。
    
            try {
                //批量獲取 futures 可以加快速度, 但注意,批量不要太大。
                List<Future<RecordMetadata>> futures = new ArrayList<Future<RecordMetadata>>(128);
                for (int i =0; i < 100; i++) {
                    //發送消息,并獲得一個Future對象。
                    ProducerRecord<String, String> kafkaMessage =  new ProducerRecord<String, String>(topic, value + ": " + i);
                    Future<RecordMetadata> metadataFuture = producer.send(kafkaMessage);
                    futures.add(metadataFuture);
    
                }
                producer.flush();
                for (Future<RecordMetadata> future: futures) {
                    //同步獲得Future對象的結果。
                    try {
                        RecordMetadata recordMetadata = future.get();
                        System.out.println("Produce ok:" + recordMetadata.toString());
                    } catch (Throwable t) {
                        t.printStackTrace();
                    }
                }
            } catch (Exception e) {
                //客戶端內部重試之后,仍然發送失敗,業務要應對此類錯誤。
                System.out.println("error occurred");
                e.printStackTrace();
            }
        }
    }  
  2. 編譯并運行KafkaProducerDemo.java發送消息。

訂閱消息

選擇以下任意一種方式訂閱消息。
  • 單Consumer訂閱消息。
    1. 創建單Consumer訂閱消息程序KafkaConsumerDemo.java
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Properties;
      import org.apache.kafka.clients.CommonClientConfigs;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.common.config.SaslConfigs;
      import org.apache.kafka.common.config.SslConfigs;
      
      public class KafkaConsumerDemo {
      
          public static void main(String args[]) {
              //設置JAAS配置文件的路徑。
              JavaKafkaConfigurer.configureSasl();
      
              //加載kafka.properties。
              Properties kafkaProperties =  JavaKafkaConfigurer.getKafkaProperties();
      
              Properties props = new Properties();
              //設置接入點,請通過控制臺獲取對應Topic的接入點。
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
              //設置SSL根證書的路徑,請記得將XXX修改為自己的路徑。
              //與SASL路徑類似,該文件也不能被打包到jar中。
              props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
              //根證書存儲的密碼,保持不變。
              props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
              //接入協議,目前支持使用SASL_SSL協議接入。
              props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
              //SASL鑒權方式,保持不變。
              props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
              //兩次Poll之間的最大允許間隔。
              //消費者超過該值沒有返回心跳,服務端判斷消費者處于非存活狀態,服務端將消費者從Group移除并觸發Rebalance,默認30s。
              props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
              //設置單次拉取的量,走公網訪問時,該參數會有較大影響。
              props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 32000);
              props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 32000);
              //每次Poll的最大數量。
              //注意該值不要改得太大,如果Poll太多數據,而不能在下次Poll之前消費完,則會觸發一次負載均衡,產生卡頓。
              props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
              //消息的反序列化方式。
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              //當前消費實例所屬的消費組,請在控制臺申請之后填寫。
              //屬于同一個組的消費實例,會負載消費消息。
              props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
              //Hostname校驗改成空。
              props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
      
              //構造消費對象,也即生成一個消費實例。
              KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
              //設置消費組訂閱的Topic,可以訂閱多個。
              //如果GROUP_ID_CONFIG是一樣,則訂閱的Topic也建議設置成一樣。
              List<String> subscribedTopics =  new ArrayList<String>();
              //如果需要訂閱多個Topic,則在這里加進去即可。
              //每個Topic需要先在控制臺進行創建。
              subscribedTopics.add(kafkaProperties.getProperty("topic"));
              consumer.subscribe(subscribedTopics);
      
              //循環消費消息。
              while (true){
                  try {
                      ConsumerRecords<String, String> records = consumer.poll(1000);
                      //必須在下次Poll之前消費完這些數據, 且總耗時不得超過SESSION_TIMEOUT_MS_CONFIG。
                      //建議開一個單獨的線程池來消費消息,然后異步返回結果。
                      for (ConsumerRecord<String, String> record : records) {
                          System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
                      }
                  } catch (Exception e) {
                      try {
                          Thread.sleep(1000);
                      } catch (Throwable ignore) {
      
                      }
                      e.printStackTrace();
                  }
              }
          }
      }
    2. 編譯并運行KafkaConsumerDemo.java消費消息。
  • 多Consumer訂閱消息。
    1. 創建多Consumer訂閱消息程序KafkaMultiConsumerDemo.java
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Properties;
      import java.util.concurrent.atomic.AtomicBoolean;
      import org.apache.kafka.clients.CommonClientConfigs;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.common.config.SaslConfigs;
      import org.apache.kafka.common.config.SslConfigs;
      import org.apache.kafka.common.errors.WakeupException;
      
      /**
       * 本教程演示如何在一個進程內開啟多個Consumer同時消費Topic。
       * 注意全局Consumer數量不要超過訂閱的Topic總分區數。
       */
      public class KafkaMultiConsumerDemo {
      
          public static void main(String args[]) throws InterruptedException {
              //設置JAAS配置文件的路徑。
              JavaKafkaConfigurer.configureSasl();
      
              //加載kafka.properties。
              Properties kafkaProperties = JavaKafkaConfigurer.getKafkaProperties();
      
              Properties props = new Properties();
              //設置接入點,請通過控制臺獲取對應Topic的接入點。
              props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getProperty("bootstrap.servers"));
              //設置SSL根證書的路徑,請記得將XXX修改為自己的路徑。
              //與SASL路徑類似,該文件也不能被打包到JAR中。
              props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaProperties.getProperty("ssl.truststore.location"));
              //根證書存儲的密碼,保持不變。
              props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "KafkaOnsClient");
              //接入協議,目前支持使用SASL_SSL協議接入。
              props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
              //SASL鑒權方式,保持不變。
              props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
              //兩次Poll之間的最大允許間隔。
              //消費者超過該值沒有返回心跳,服務端判斷消費者處于非存活狀態,服務端將消費者從Group移除并觸發Rebalance,默認30s。
              props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);
              //每次Poll的最大數量。
              //注意該值不要改得太大,如果Poll太多數據,而不能在下次Poll之前消費完,則會觸發一次負載均衡,產生卡頓。
              props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 30);
              //消息的反序列化方式。
              props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
              //當前消費實例所屬的消費組,請在控制臺申請之后填寫。
              //屬于同一個組的消費實例,會負載消費消息。
              props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getProperty("group.id"));
              //構造消費對象,也即生成一個消費實例。
      
              //Hostname校驗改成空。
              props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
      
              int consumerNum = 2;
              Thread[] consumerThreads = new Thread[consumerNum];
              for (int i = 0; i < consumerNum; i++) {
                  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
      
                  List<String> subscribedTopics = new ArrayList<String>();
                  subscribedTopics.add(kafkaProperties.getProperty("topic"));
                  consumer.subscribe(subscribedTopics);
      
                  KafkaConsumerRunner kafkaConsumerRunner = new KafkaConsumerRunner(consumer);
                  consumerThreads[i] = new Thread(kafkaConsumerRunner);
              }
      
              for (int i = 0; i < consumerNum; i++) {
                  consumerThreads[i].start();
              }
      
              for (int i = 0; i < consumerNum; i++) {
                  consumerThreads[i].join();
              }
          }
      
          static class KafkaConsumerRunner implements Runnable {
              private final AtomicBoolean closed = new AtomicBoolean(false);
              private final KafkaConsumer consumer;
      
              KafkaConsumerRunner(KafkaConsumer consumer) {
                  this.consumer = consumer;
              }
      
              @Override
              public void run() {
                  try {
                      while (!closed.get()) {
                          try {
                              ConsumerRecords<String, String> records = consumer.poll(1000);
                              //必須在下次Poll之前消費完這些數據, 且總耗時不得超過SESSION_TIMEOUT_MS_CONFIG。
                              for (ConsumerRecord<String, String> record : records) {
                                  System.out.println(String.format("Thread:%s Consume partition:%d offset:%d", Thread.currentThread().getName(), record.partition(), record.offset()));
                              }
                          } catch (Exception e) {
                              try {
                                  Thread.sleep(1000);
                              } catch (Throwable ignore) {
      
                              }
                              e.printStackTrace();
                          }
                      }
                  } catch (WakeupException e) {
                      //如果關閉則忽略異常。
                      if (!closed.get()) {
                          throw e;
                      }
                  } finally {
                      consumer.close();
                  }
              }
      
              //可以被另一個線程調用的關閉Hook。
              public void shutdown() {
                  closed.set(true);
                  consumer.wakeup();
              }
          }
      }
    2. 編譯并運行KafkaMultiConsumerDemo.java消費消息。