Python Demo
更新時間:
依賴Python語言的阿里云SDK核心庫及dybaseapi,其中dybaseapi包用于拉取MNS消息。
Demo如下:
說明
調(diào)用接口前需配置環(huán)境變量,通過環(huán)境變量讀取訪問憑證。AccessKey ID和AccessKey Secret的環(huán)境變量名:SMS_ACCESS_KEY_ENV 、SMS_ACCESS_KEY_SECRET_ENV。配置詳情請參見在Linux、macOS和Windows系統(tǒng)配置環(huán)境變量。
#!/usr/bin/env python
# coding=utf8
import os
import time
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.client import AcsClient
from aliyunsdkdybaseapi.request.v20170525.QueryTokenForMnsQueueRequest import QueryTokenForMnsQueueRequest
from aliyunsdkcore.profile import region_provider
from datetime import datetime
from aliyunsdkdybaseapi.mns.account import Account
from aliyunsdkdybaseapi.mns.queue import *
from aliyunsdkdybaseapi.mns.mns_exception import *
try:
import json
except ImportError:
import simplejson as json
# TODO 需要替換成您需要接收的消息類型
message_type = "<MessageType>"
# TODO 需要替換成您的隊列名稱。在云通信頁面開通相應(yīng)業(yè)務(wù)消息后,就能在頁面上獲得對應(yīng)的queueName
queue_name = "<QueueName>"
# 云通信固定的endpoint地址
endpoint = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com/"
# 阿里云賬號AccessKey擁有所有API的訪問權(quán)限,建議您使用RAM用戶進(jìn)行API訪問或日常運維。
# 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號下所有資源的安全。
# 本示例以把AccessKey ID和AccessKey Secret保存在環(huán)境變量為例說明,來實現(xiàn)API訪問的身份驗證。
acs_client = AcsClient(os.getenv("SMS_ACCESS_KEY_ENV"), os.getenv("SMS_ACCESS_KEY_SECRET_ENV"), "cn-hangzhou")
region_provider.add_endpoint("Dybaseapi", "dybaseapi.aliyuncs.com", "cn-hangzhou")
# 云通信業(yè)務(wù)token存在失效時間,需動態(tài)更新。
class Token():
def __init__(self):
self.token = None
self.tmp_access_id = None
self.tmp_access_key = None
self.expire_time = None
def is_refresh(self):
if self.expire_time is None:
return 1
# 失效時間與當(dāng)前系統(tǒng)時間比較,提前2分鐘刷新token
now = datetime.now()
expire = datetime.strptime(self.expire_time, "%Y-%m-%d %H:%M:%S")
if expire <= now or (expire - now).seconds < 120:
return 1
return 0
def refresh(self):
print("start refresh token...")
request = QueryTokenForMnsQueueRequest()
request.set_MessageType(message_type)
request.set_QueueName(queue_name)
response = acs_client.do_action_with_exception(request)
# print response
if response is None:
raise ServerException("GET_TOKEN_FAIL", "獲取token時無響應(yīng)")
response_body = json.loads(response)
if response_body.get("Code") != "OK":
raise ServerException("GET_TOKEN_FAIL", "獲取token失敗")
sts_token = response_body.get("MessageTokenDTO")
self.tmp_access_key = sts_token.get("AccessKeySecret")
self.tmp_access_id = sts_token.get("AccessKeyId")
self.expire_time = sts_token.get("ExpireTime")
self.token = sts_token.get("SecurityToken")
print("finish refresh token...")
# 初始化 token, my_account, my_queue
token, my_account, my_queue = Token(), None, None
# 循環(huán)讀取刪除消息直到隊列空
# receive message請求使用long polling方式,通過wait_seconds指定長輪詢時間為3秒
# long polling 解析:
## 當(dāng)隊列中有消息時,請求立即返回;
## 當(dāng)隊列中沒有消息時,請求在MNS服務(wù)器端掛3秒鐘,在這期間,有消息寫入隊列,請求會立即返回消息,3秒后,請求返回隊列沒有消息;
wait_seconds = 3
print("%sReceive And Delete Message From Queue%s\nQueueName:%s\nWaitSeconds:%s\n" % (
10 * "=", 10 * "=", queue_name, wait_seconds))
while True:
receipt_handles = []
# 讀取消息
try:
# token過期是否需要刷新
if token.is_refresh() == 1:
# 刷新token
token.refresh()
if my_account:
my_account.mns_client.close_connection()
my_account = None
if not my_account:
my_account = Account(endpoint, token.tmp_access_id, token.tmp_access_key, token.token)
my_queue = my_account.get_queue(queue_name)
# 接收消息
recv_msgs = my_queue.batch_receive_message(10, wait_seconds)
for recv_msg in recv_msgs:
# TODO 業(yè)務(wù)處理
# receipt_handles.append(recv_msg.receipt_handle)
print("Receive Message Succeed! ReceiptHandle:%s MessageBody:%s MessageID:%s" % (
recv_msg.receipt_handle, recv_msg.message_body, recv_msg.message_id))
except MNSExceptionBase as e:
if e.type == "QueueNotExist":
print("Queue not exist, please create queue before receive message.")
break
elif e.type == "MessageNotExist":
print("Queue is empty! sleep 10s")
time.sleep(10)
continue
print("Receive Message Fail! Exception:%s\n" % e)
break
# 刪除消息
try:
if len(receipt_handles) > 0:
# my_queue.batch_delete_message(receipt_handles)
print("Delete Message Succeed! ReceiptHandles:%s" % receipt_handles)
except MNSExceptionBase as e:
print("Delete Message Fail! Exception:%s\n" % e)
文檔內(nèi)容是否對您有幫助?