Spring Cloud是用于構建消息驅動的微服務應用程序的框架,提供服務發現、配置管理、消息傳遞、負載均衡等微服務相關的解決方案,可以更容易地構建分布式系統和進行服務間通信。本文介紹如何使用Spring Cloud框架接入云消息隊列 Kafka 版并收發消息。
前提條件
下載kafka-spring-stream-demo,并將其上傳在準備好的Linux操作系統。
確保您的云消息隊列 Kafka 版實例為2.x或以上版本。具體操作,請參見升級實例版本。
公網環境(消息傳輸需鑒權與加密)
公網環境,消息采用SASL_SSL協議進行鑒權并加密。客戶端通過SSL接入點訪問云消息隊列 Kafka 版。接入點的詳細信息,請參見接入點對比。
本示例將Demo包上傳在/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo路徑。
登錄Linux系統,執行以下命令,進入Demo包所在路徑/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo。
cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
執行以下命令,進入配置文件路徑。
cd sasl-ssl/src/main/resources/
執行以下命令,編輯application.properties文件,并根據參數列表配置實例信息。
vi application.properties
##以下參數,您需配置為實際使用的實例信息。 kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-zv**********-3.alikafka.aliyuncs.com:9093 kafka.consumer.group=test-spring kafka.output.topic.name=test-output kafka.input.topic.name=test-input kafka.ssl.truststore.location=/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl/src/main/resources/kafka.client.truststore.jks ### 配置Binding參數可以把消息隊列Kafka版和Spring Cloud Stream的Binder綁定在一起,以下參數保持默認即可。 spring.cloud.stream.bindings.MyOutput.destination=${kafka.output.topic.name} spring.cloud.stream.bindings.MyOutput.contentType=text/plain spring.cloud.stream.bindings.MyInput.group=${kafka.consumer.group} spring.cloud.stream.bindings.MyInput.destination=${kafka.input.topic.name} spring.cloud.stream.bindings.MyInput.contentType=text/plain ### Binder是Spring Cloud對消息中間件的封裝模塊,以下參數保持默認即可。 spring.cloud.stream.kafka.binder.autoCreateTopics=false spring.cloud.stream.kafka.binder.brokers=${kafka.bootstrap-servers} spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=PLAIN spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=${kafka.ssl.truststore.location} spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=KafkaOnsClient ### 如果Demo中沒有以下參數,請手動增加。該參數表示是否需要進行服務器主機名驗證。因消息傳輸使用SASL身份校驗,可設置為空字符串關閉服務器主機名驗證。 ### 服務器主機名驗證是驗證SSL證書的主機名與服務器的主機名是否匹配,默認為HTTPS。 spring.cloud.stream.kafka.binder.configuration.ssl.endpoint.identification.algorithm=
表 1. 參數列表 參數
描述
kafka.bootstrap-servers
云消息隊列 Kafka 版實例接入點。您可在云消息隊列 Kafka 版控制臺的實例詳情頁面的接入點信息區域獲取。
kafka.consumer.group
訂閱消息的Group。您可以在云消息隊列 Kafka 版控制臺的Group 管理頁面創建。具體操作,請參見步驟三:創建資源。
kafka.output.topic.name
消息的Topic。控制臺程序通過此Topic每隔一段時間發送消息,內容是固定的。您可以在云消息隊列 Kafka 版控制臺的Topic 管理頁面創建。具體操作,請參見步驟三:創建資源。
kafka.input.topic.name
消息的Topic。您可以通過此Topic在控制臺發送消息,Demo程序會消費消息,并將消息打印在日志中。
kafka.ssl.truststore.location
SSL根證書kafka.client.truststore.jks的存放路徑。
執行以下命令,打開kafka_client_jaas.conf文件,配置實例的用戶名與密碼。
vi kafka_client_jaas.conf
說明如果實例未開啟ACL,您可以在云消息隊列 Kafka 版控制臺的實例詳情頁面獲取默認用戶的用戶名和密碼。
如果實例已開啟ACL,請確保要使用的SASL用戶為PLAIN類型且已授權收發消息的權限。具體信息,請參見SASL用戶授權。
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX"; };
進入/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/sasl-ssl路徑,執行以下命令,運行Demo。
sh run_demo.sh
程序打印如下信息,說明接收到控制臺程序通過kafka.output.topic.name配置的Topic所發送的消息。
Send: hello world !! Send: hello world !! Send: hello world !! Send: hello world !!
登錄云消息隊列 Kafka 版控制臺驗證消息收發是否成功。
VPC環境(消息傳輸不鑒權不加密)
VPC環境,消息可以采用PLAINTEXT協議不鑒權不加密傳輸。客戶端通過默認接入點訪問云消息隊列 Kafka 版。接入點的詳細信息,請參見接入點對比。
本示例將Demo包上傳在/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo路徑。
登錄Linux系統,執行以下命令,進入Demo包所在路徑/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo。
cd /home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo
執行以下命令,進入配置文件路徑。
cd vpc/src/main/resources/
執行以下命令,編輯application.properties文件,并根據參數列表配置實例信息。
vi application.properties
###以下參數請修改為實際使用的實例的信息。 kafka.bootstrap-servers=alikafka-pre-cn-zv**********-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-zv**********-3-vpc.alikafka.aliyuncs.com:9092 kafka.consumer.group=test-spring kafka.output.topic.name=test-output kafka.input.topic.name=test-input
進入/home/doc/project/aliware-kafka-demos/kafka-spring-stream-demo/vpc路徑,執行以下命令,運行Demo。
sh run_demo.sh
程序打印如下信息。
Send: hello world !! Send: hello world !! Send: hello world !! Send: hello world !!
登錄云消息隊列 Kafka 版控制臺驗證消息收發是否成功。
相關文檔
關于Spring Cloud框架的更多信息,請參見Spring Cloud Stream。