使用Python SDK通過消費(fèi)組消費(fèi)日志
通過消費(fèi)組(ConsumerGroup)消費(fèi)日志有顯著優(yōu)點(diǎn),您無需關(guān)注日志服務(wù)的實(shí)現(xiàn)細(xì)節(jié)和消費(fèi)者之間的負(fù)載均衡、Failover等,只需要專注于業(yè)務(wù)邏輯即可。本文通過Python代碼介紹如何使用消費(fèi)組消費(fèi)日志。
工作流程
一個(gè)Logstore中包含多個(gè)Shard,通過消費(fèi)組消費(fèi)數(shù)據(jù)就是將Shard分配給一個(gè)消費(fèi)組下面的消費(fèi)者,分配方式遵循以下原則。
在一個(gè)消費(fèi)組中,一個(gè)Shard只會(huì)分配到一個(gè)消費(fèi)者。
在一個(gè)消費(fèi)組中,一個(gè)消費(fèi)者可以被分配多個(gè)Shard。
新的消費(fèi)者加入消費(fèi)組后,這個(gè)消費(fèi)組下面的Shard從屬關(guān)系會(huì)調(diào)整,以實(shí)現(xiàn)消費(fèi)的負(fù)載均衡,但是仍遵循上述分配原則。
通過消費(fèi)組消費(fèi),程序發(fā)生故障時(shí),會(huì)默認(rèn)保存Checkpoint。在程序故障恢復(fù)時(shí),能夠從斷點(diǎn)處繼續(xù)消費(fèi),從而保證數(shù)據(jù)不會(huì)被重復(fù)消費(fèi)。
前提條件
已開通日志服務(wù)。更多信息,請參見開通日志服務(wù)。
已創(chuàng)建RAM用戶并完成授權(quán)。具體操作,請參見創(chuàng)建RAM用戶并完成授權(quán)。
已配置環(huán)境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具體操作,請參見在Linux、macOS和Windows系統(tǒng)配置環(huán)境變量。
重要阿里云賬號的AccessKey擁有所有API的訪問權(quán)限,建議您使用RAM用戶的AccessKey進(jìn)行API訪問或日常運(yùn)維。
強(qiáng)烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號下所有資源的安全。
已安裝日志服務(wù)Python SDK。具體操作,請參見安裝Python SDK。
已創(chuàng)建Project。具體操作,請參見創(chuàng)建項(xiàng)目Project。
注意事項(xiàng)
本示例以華東1(杭州)的公網(wǎng)Endpoint為例,其公網(wǎng)Endpoint為https://cn-hangzhou.log.aliyuncs.com
。如果您通過與Project同地域的其他阿里云產(chǎn)品訪問日志服務(wù),請使用內(nèi)網(wǎng)Endpointhttps://cn-hangzhou-intranet.log.aliyuncs.com
。關(guān)于日志服務(wù)支持的地域與Endpoint的對應(yīng)關(guān)系,請參見服務(wù)入口。
代碼示例
以下代碼用于創(chuàng)建Logstore、為Logstore寫入日志、創(chuàng)建消費(fèi)組并消費(fèi)日志。
import os
import time
from aliyun.log.consumer import *
from aliyun.log import *
from threading import RLock
class SampleConsumer(ConsumerProcessorBase):
shard_id = -1
last_check_time = 0
log_results = []
lock = RLock()
def initialize(self, shard):
self.shard_id = shard
def process(self, log_groups, check_point_tracker):
for log_group in log_groups.LogGroups:
items = []
for log in log_group.Logs:
item = dict()
item['time'] = log.Time
for content in log.Contents:
item[content.Key] = content.Value
items.append(item)
log_items = dict()
log_items['topic'] = log_group.Topic
log_items['source'] = log_group.Source
log_items['logs'] = items
with SampleConsumer.lock:
SampleConsumer.log_results.append(log_items)
print(log_items)
current_time = time.time()
if current_time - self.last_check_time > 3:
try:
self.last_check_time = current_time
check_point_tracker.save_check_point(True)
except Exception:
import traceback
traceback.print_exc()
else:
try:
check_point_tracker.save_check_point(False)
except Exception:
import traceback
traceback.print_exc()
# None means succesful process
# if need to roll-back to previous checkpoint,return check_point_tracker.get_check_point()
return None
def shutdown(self, check_point_tracker):
try:
check_point_tracker.save_check_point(True)
except Exception:
import traceback
traceback.print_exc()
test_item_count = 20
# 為Logstore寫入日志。
def _prepare_data(client, project, logstore):
topic = 'python-ide-test'
source = ''
for i in range(0, test_item_count):
logitemList = [] # LogItem list
contents = [
('user', 'magic_user_' + str(i)),
('avg', 'magic_age_' + str(i))
]
logItem = LogItem()
logItem.set_time(int(time.time()))
logItem.set_contents(contents)
logitemList.append(logItem)
# 將日志寫入Logstore。
request = PutLogsRequest(project, logstore, topic, source, logitemList)
response = client.put_logs(request)
print("successfully put logs in logstore")
def sleep_until(seconds, exit_condition=None, expect_error=False):
if not exit_condition:
time.sleep(seconds)
return
s = time.time()
while time.time() - s < seconds:
try:
if exit_condition():
break
except Exception:
if expect_error:
continue
time.sleep(1)
def sample_consumer_group():
# 日志服務(wù)的服務(wù)接入點(diǎn)。此處以杭州為例,其它地域請根據(jù)實(shí)際情況填寫。
endpoint = os.environ.get('ALIYUN_LOG_SAMPLE_ENDPOINT', 'cn-hangzhou.log.aliyuncs.com')
# 本示例從環(huán)境變量中獲取AccessKey ID和AccessKey Secret。
accessKeyId = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accessKey = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# Project名稱。本示例中,SDK不會(huì)創(chuàng)建Project,您需要在運(yùn)行代碼前創(chuàng)建該P(yáng)roject。
project = os.environ.get('ALIYUN_LOG_SAMPLE_PROJECT', 'ali-test-project-python')
# Logstore名稱。本示例中,SDK會(huì)自動(dòng)創(chuàng)建該Logstore,您無需提前創(chuàng)建該Logstore。
logstore = 'ali-test-logstore'
# 消費(fèi)組名稱。您無需提前創(chuàng)建,SDK會(huì)自動(dòng)創(chuàng)建該消費(fèi)組。
consumer_group = 'consumer-group-1'
consumer_name1 = "consumer-group-1-A"
consumer_name2 = "consumer-group-1-B"
token = ""
if not logstore:
logstore = 'consumer_group_test_' + str(time.time()).replace('.', '_')
assert endpoint and accessKeyId and accessKey and project, ValueError("endpoint/access_id/key and "
"project cannot be empty")
# 創(chuàng)建Logstore。
client = LogClient(endpoint, accessKeyId, accessKey, token)
ret = client.create_logstore(project, logstore, 2, 4)
print("successfully create logstore")
time.sleep(60)
SampleConsumer.log_results = []
try:
# 為Logstore寫入日志。
_prepare_data(client, project, logstore)
# 在消費(fèi)組中創(chuàng)建2個(gè)消費(fèi)者消費(fèi)數(shù)據(jù)。
option1 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
consumer_name1, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
data_fetch_interval=1)
option2 = LogHubConfig(endpoint, accessKeyId, accessKey, project, logstore, consumer_group,
consumer_name2, cursor_position=CursorPosition.BEGIN_CURSOR, heartbeat_interval=6,
data_fetch_interval=1)
print("*** start to consume data...")
client_worker1 = ConsumerWorker(SampleConsumer, consumer_option=option1)
client_worker1.start()
client_worker2 = ConsumerWorker(SampleConsumer, consumer_option=option2)
client_worker2.start()
sleep_until(300, lambda: len(SampleConsumer.log_results) >= test_item_count)
print("*** consumer group status ***")
ret = client.list_consumer_group(project, logstore)
print("successfully list consumergroup")
for c in ret.get_consumer_groups():
ret = client.get_check_point_fixed(project, logstore, c.get_consumer_group_name())
print("successfully get checkpoint fixed")
print("*** stopping workers")
client_worker1.shutdown()
client_worker2.shutdown()
finally:
# clean-up
# ret = client.delete_logstore(project, logstore)
ret = client.list_logstore(project, logstore)
print("successfully list logstore")
# validate
ret = str(SampleConsumer.log_results)
print("*** get content:")
print(ret)
assert 'magic_user_0' in ret and 'magic_age_0' in ret \
and 'magic_user_' + str(test_item_count-1) in ret \
and 'magic_age_' + str(test_item_count-1) in ret
if __name__ == '__main__':
sample_consumer_group()
預(yù)期結(jié)果如下:
successfully create logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
successfully put logs in logstore
......
*** start to consume data...
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}
{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}
......
*** consumer group status ***
successfully list consumergroup
successfully get checkpoint fixed
*** stopping workers
successfully list logstore
*** get content:
[{'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123452, 'user': 'magic_user_0', 'avg': 'magic_age_0'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_1', 'avg': 'magic_age_1'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_3', 'avg': 'magic_age_3'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_5', 'avg': 'magic_age_5'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_6', 'avg': 'magic_age_6'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_8', 'avg': 'magic_age_8'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_9', 'avg': 'magic_age_9'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_2', 'avg': 'magic_age_2'}]}, {'topic': 'python-ide-test', 'source': '47.100.XX.XX', 'logs': [{'time': 1672123453, 'user': 'magic_user_11', 'avg': 'magic_age_11'}]}, ......}]
相關(guān)文檔
阿里云OpenAPI開發(fā)者門戶提供調(diào)試、SDK、示例和配套文檔。通過OpenAPI,您無需手動(dòng)封裝請求和簽名操作,就可以快速對日志服務(wù)API進(jìn)行調(diào)試。更多信息,請參見OpenAPI開發(fā)者門戶。
為滿足越來越多的自動(dòng)化日志服務(wù)配置需求,日志服務(wù)提供命令行工具CLI(Command Line Interface)。更多信息,請參見日志服務(wù)命令行工具CLI。
更多示例代碼,請參見Aliyun Log Python SDK on GitHub。