Python SDK收發(fā)消息
本文介紹如何使用Python SDK通過接入點(diǎn)接入云消息隊(duì)列 Kafka 版并收發(fā)消息。
環(huán)境準(zhǔn)備
添加Python依賴庫
執(zhí)行以下命令安裝依賴庫。
pip install confluent-kafka==1.9.2
建議您安裝confluent-kafka 1.9.2及以下版本的依賴庫,否則使用公網(wǎng)發(fā)送消息會報(bào)SSL_HANDSHAKE
錯(cuò)誤。
準(zhǔn)備配置
可選:下載SSL根證書。如果是SSL接入點(diǎn),需下載該證書。
- 訪問aliware-kafka-demos,單擊圖標(biāo),然后在下拉框選擇Download ZIP,下載Demo包并解壓。
在解壓的Demo工程中,找到kafka-confluent-python-demo文件夾,將此文件夾上傳到Linux系統(tǒng)。
修改配置文件setting.py。
默認(rèn)接入點(diǎn)
登錄Linux系統(tǒng),進(jìn)入vpc文件目錄,修改配置文件setting.py。
kafka_setting = { 'bootstrap_servers': 'XXX:xxx,XXX:xxx', 'topic_name': 'XXX', 'group_name': 'XXX' }
參數(shù)
描述
bootstrap_servers
默認(rèn)接入點(diǎn)。您可在云消息隊(duì)列 Kafka 版控制臺的實(shí)例詳情頁面的接入點(diǎn)信息區(qū)域獲取。
topic_name
Topic名稱。您可在云消息隊(duì)列 Kafka 版控制臺的Topic 管理頁面獲取。
group_name
Group名稱。您可在云消息隊(duì)列 Kafka 版控制臺的Group 管理頁面獲取。
SSL接入點(diǎn)
登錄Linux系統(tǒng),進(jìn)入vpc-ssl文件目錄,修改配置文件setting.py。
kafka_setting = { 'sasl_plain_username': 'XXX', 'sasl_plain_password': 'XXX', 'ca_location': '/XXX/mix-4096-ca-cert', 'bootstrap_servers': 'XXX:xxx,XXX:xxx', 'topic_name': 'XXX', 'group_name': 'XXX' }
參數(shù)
描述
sasl_plain_username
SASL用戶名。
說明- 如果實(shí)例未開啟ACL,您可以在云消息隊(duì)列 Kafka 版控制臺的實(shí)例詳情頁面的配置信息區(qū)域獲取默認(rèn)的用戶名和密碼。
- 如果實(shí)例已開啟ACL,請確保要使用的SASL用戶已被授予向云消息隊(duì)列 Kafka 版實(shí)例收發(fā)消息的權(quán)限。具體操作,請參見SASL用戶授權(quán)。
sasl_plain_password
SASL用戶名密碼。
ca_location
SSL根證書的路徑。用本地路徑替換示例中的XXX。例如:/home/kafka-confluent-python-demo/vpc-ssl/mix-4096-ca-cert。
bootstrap_servers
SSL接入點(diǎn)。您可在云消息隊(duì)列 Kafka 版控制臺的實(shí)例詳情頁面的接入點(diǎn)信息區(qū)域獲取。
topic_name
Topic名稱。您可在云消息隊(duì)列 Kafka 版控制臺的Topic 管理頁面獲取。
group_name
Group名稱。您可在云消息隊(duì)列 Kafka 版控制臺的Group 管理頁面獲取。
發(fā)送消息
執(zhí)行以下命令發(fā)送消息(當(dāng)前示例的Python版本為3.9)。
python kafka_producer.py
消息程序kafka_producer.py示例代碼如下:
默認(rèn)接入點(diǎn)
from confluent_kafka import Producer import setting conf = setting.kafka_setting # 初始化一個(gè)Producer對象。 p = Producer({'bootstrap.servers': conf['bootstrap_servers']}) def delivery_report(err, msg): """ Called once for each message produced to indicate delivery result. Triggered by poll() or flush(). """ if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) # 異步發(fā)送消息。 p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report) p.poll(0) # 在程序結(jié)束時(shí),調(diào)用flush。 p.flush()
SSL接入點(diǎn)
from confluent_kafka import Producer import setting conf = setting.kafka_setting p = Producer({'bootstrap.servers':conf['bootstrap_servers'], 'ssl.endpoint.identification.algorithm': 'none', 'sasl.mechanisms':'PLAIN', 'ssl.ca.location':conf['ca_location'], 'security.protocol':'SASL_SSL', 'sasl.username':conf['sasl_plain_username'], 'sasl.password':conf['sasl_plain_password']}) def delivery_report(err, msg): if err is not None: print('Message delivery failed: {}'.format(err)) else: print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report) p.poll(0) p.flush()
訂閱消息
執(zhí)行以下命令訂閱消息(當(dāng)前示例的Python版本為3.9)。
python kafka_consumer.py
消息程序kafka_consumer.py示例代碼如下:
默認(rèn)接入點(diǎn)
from confluent_kafka import Consumer, KafkaError import setting conf = setting.kafka_setting c = Consumer({ 'bootstrap.servers': conf['bootstrap_servers'], 'group.id': conf['group_name'], 'auto.offset.reset': 'latest' }) c.subscribe([conf['topic_name']]) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) c.close()
SSL接入點(diǎn)
from confluent_kafka import Consumer, KafkaError import setting conf = setting.kafka_setting c = Consumer({ 'bootstrap.servers': conf['bootstrap_servers'], 'ssl.endpoint.identification.algorithm': 'none', 'sasl.mechanisms':'PLAIN', 'ssl.ca.location':conf['ca_location'], 'security.protocol':'SASL_SSL', 'sasl.username':conf['sasl_plain_username'], 'sasl.password':conf['sasl_plain_password'], 'group.id': conf['group_name'], 'auto.offset.reset': 'latest', 'fetch.message.max.bytes':'1024*512' }) c.subscribe([conf['topic_name']]) while True: msg = c.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue else: print("Consumer error: {}".format(msg.error())) continue print('Received message: {}'.format(msg.value().decode('utf-8'))) c.close()