輕量消息隊(duì)列(原MNS)消費(fèi)模式
本文為您介紹輕量消息隊(duì)列(原MNS)消費(fèi)模式接收回執(zhí)消息的示例代碼。
相關(guān)下載
語(yǔ)言 | 說(shuō)明 |
Java | 下載Java SDK安裝包拉取消息。 |
Node.js | 下載原Node.js SDK安裝包拉取消息。 |
C# | 依賴(lài).NET語(yǔ)言的阿里云SDK及dybaseapi,其中dybaseapi包用于拉取消息。 |
PHP | 依賴(lài)OpenAPI PHP Client包和OpenAPI PHP SDK包拉取消息。 |
Python | 依賴(lài)Python語(yǔ)言的阿里云SDK核心庫(kù)及dybaseapi,其中dybaseapi包用于拉取消息。 |
Go | 下載Go SDK安裝包拉取消息。 |
注意事項(xiàng)
使用示例時(shí),您需要注意如下信息:
配置AccessKey ID和AccessKey Secret信息。
說(shuō)明為避免在代碼中硬編碼訪問(wèn)密鑰(AccessKey)而造成泄露,請(qǐng)通過(guò)配置環(huán)境變量的方式獲取AccessKey。環(huán)境變量配置方法,請(qǐng)參見(jiàn)在Linux、macOS和Windows系統(tǒng)配置環(huán)境變量。
本文以環(huán)境變量名
VMS_AK_ENV
和VMS_SK_ENV
為例,進(jìn)行后續(xù)操作。通過(guò)環(huán)境變量獲取AccessKey的代碼示例如下:String accessKeyId = System.getenv("VMS_AK_ENV"); String accessKeySecret = System.getenv("VMS_SK_ENV");
messageType
替換為您需要的消息類(lèi)型,如訂閱呼叫記錄消息(VoiceReport)。語(yǔ)音服務(wù)支持的回執(zhí)消息類(lèi)型,請(qǐng)參見(jiàn)回執(zhí)消息簡(jiǎn)介與配置流程。String messageType="messageType";
queueName
是消息隊(duì)列名稱(chēng),您可以在語(yǔ)音服務(wù)控制臺(tái),通用設(shè)置>訂閱回執(zhí)消息頁(yè)面查看。String queueName="queueName";
本文示例代碼中您獲取到的上行信息內(nèi)容由
dealMessage
方法處理,您可以將需要的上行信息內(nèi)容業(yè)務(wù)邏輯寫(xiě)在該方法中。arg
代表回執(zhí)消息體參數(shù),可填寫(xiě)的值如:start_time、end_time、duration、status_code等,建議您根據(jù)具體場(chǎng)景填寫(xiě)參數(shù)。// 根據(jù)文檔中具體的消息格式進(jìn)行消息體的解析 String arg = (String) contentMap.get("arg"); // 編寫(xiě)您的業(yè)務(wù)代碼
示例
package com.alicom.mns.sample;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.alicom.mns.tools.DefaultAlicomMessagePuller;
import com.alicom.mns.tools.MessageListener;
import com.aliyun.mns.model.Message;
import com.google.gson.Gson;
/**
* 只能用于接收云通信的消息,不能用于接收其他業(yè)務(wù)的消息
* MNS消息接收demo
*/
public class ReceiveDemo {
private static Log logger=LogFactory.getLog(ReceiveDemo.class);
static class MyMessageListener implements MessageListener{
private Gson gson=new Gson();
@Override
public boolean dealMessage(Message message) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//消息的幾個(gè)關(guān)鍵值
System.out.println("message receiver time from mns:" + format.format(new Date()));
System.out.println("message handle: " + message.getReceiptHandle());
System.out.println("message body: " + message.getMessageBodyAsString());
System.out.println("message id: " + message.getMessageId());
System.out.println("message dequeue count:" + message.getDequeueCount());
System.out.println("Thread:" + Thread.currentThread().getName());
try{
Map<String,Object> contentMap=gson.fromJson(message.getMessageBodyAsString(), HashMap.class);
// 根據(jù)文檔中具體的消息格式進(jìn)行消息體的解析
String arg = (String) contentMap.get("arg");
// 這里開(kāi)始編寫(xiě)您的業(yè)務(wù)代碼
}catch(com.google.gson.JsonSyntaxException e){
logger.error("error_json_format:"+message.getMessageBodyAsString(),e);
//理論上不會(huì)出現(xiàn)格式錯(cuò)誤的情況,所以遇見(jiàn)格式錯(cuò)誤的消息,只能先delete,否則重新推送也會(huì)一直報(bào)錯(cuò)
return true;
} catch (Throwable e) {
//您自己的代碼部分導(dǎo)致的異常,應(yīng)該return false,這樣消息不會(huì)被delete掉,而會(huì)根據(jù)策略進(jìn)行重推
return false;
}
//消息處理成功,返回true, SDK將調(diào)用MNS的delete方法將消息從隊(duì)列中刪除掉
return true;
}
}
public static void main(String[] args) throws Exception, ParseException {
DefaultAlicomMessagePuller puller=new DefaultAlicomMessagePuller();
//設(shè)置異步線程池大小及任務(wù)隊(duì)列的大小,還有無(wú)數(shù)據(jù)線程休眠時(shí)間
puller.setConsumeMinThreadSize(6);
puller.setConsumeMaxThreadSize(16);
puller.setThreadQueueSize(200);
puller.setPullMsgThreadSize(1);
//和服務(wù)端聯(lián)調(diào)問(wèn)題時(shí)開(kāi)啟,平時(shí)無(wú)需開(kāi)啟,消耗性能
puller.openDebugLog(false);
// 從本地環(huán)境變量獲取AccessKey ID和AccessKey Secret信息
String accessKeyId = System.getenv("VMS_AK_ENV");
String accessKeySecret = System.getenv("VMS_SK_ENV");
/*
* 將messageType和queueName替換成您需要的消息類(lèi)型名稱(chēng)和對(duì)應(yīng)的隊(duì)列名稱(chēng)
*云通信產(chǎn)品下所有的回執(zhí)消息類(lèi)型:
*1:呼叫記錄:VoiceReport
*2:呼叫中間狀態(tài):VoiceCallReport
*3:錄音記錄消息:VoiceRecordReport
*4:ASR實(shí)時(shí)消息:VoiceRTASRReport
*/
//此處應(yīng)該替換成相應(yīng)產(chǎn)品的消息類(lèi)型
String messageType="messageType";
//在云通信頁(yè)面開(kāi)通相應(yīng)業(yè)務(wù)消息后,就能在頁(yè)面上獲得對(duì)應(yīng)的queueName,格式類(lèi)似Alicom-Queue-******-VoiceReport
String queueName="queueName";
puller.startReceiveMsg(accessKeyId,accessKeySecret,messageType,queueName,new MyMessageListener());
}
}
using Aliyun.Acs.Core;
using Aliyun.Acs.Core.Profile;
using Aliyun.Acs.Core.Exceptions;
using Aliyun.Acs.Dysmsapi.Model.V20170525;
using Aliyun.Acs.Dysmsapi.MNS;
using Aliyun.Acs.Dysmsapi.MNS.Model;
using System.Threading;
using System.Collections.Generic;
using System.Text;
using System;
using QueryTokenForMnsQueue_MessageTokenDTO = Aliyun.Acs.Dysmsapi.Model.V20170525.QueryTokenForMnsQueueResponse.QueryTokenForMnsQueue_MessageTokenDTO;
namespace CommonRpc
{
class Program
{
static void Main(string[] args)
{
// 阿里云賬號(hào)AccessKey擁有所有API的訪問(wèn)權(quán)限,建議您使用RAM用戶進(jìn)行API訪問(wèn)或日常運(yùn)維。
// 強(qiáng)烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號(hào)下所有資源的安全。
// 本示例以把AccessKey ID和AccessKey Secret保存在環(huán)境變量為例說(shuō)明,來(lái)實(shí)現(xiàn)API訪問(wèn)的身份驗(yàn)證。
IClientProfile profile = DefaultProfile.GetProfile("cn-hangzhou", Environment.GetEnvironmentVariable("VMS_AK_ENV"), Environment.GetEnvironmentVariable("VMS_SK_ENV")); // todo: 補(bǔ)充AK信息
DefaultProfile.AddEndpoint("cn-hangzhou", "cn-hangzhou", "Dysmsapi", "dysmsapi.aliyuncs.com");
DefaultAcsClient client = new DefaultAcsClient(profile);
String queueName = "<QueueName>"; // todo: 補(bǔ)充隊(duì)列名稱(chēng)
String messageType = "<MessageType>"; // todo: 補(bǔ)充消息類(lèi)型
int maxThread = 2;
for (int i = 0; i < maxThread; i++)
{
TestTask testTask = new TestTask("PullMessageTask-thread-" + i, messageType, queueName, client);
Thread t = new Thread(new ThreadStart(testTask.Handle));
//啟動(dòng)線程
t.Start();
}
Console.ReadKey();
try
{
QueryTokenForMnsQueueRequest request = new QueryTokenForMnsQueueRequest
{
MessageType = messageType,
QueueName = queueName
};
QueryTokenForMnsQueueResponse response = client.GetAcsResponse(request);
Console.WriteLine(response.MessageTokenDTO.SecurityToken);
}
catch (ServerException ex)
{
Console.WriteLine(ex.ToString());
}
catch (ClientException ex)
{
Console.WriteLine(ex.ToString());
}
}
}
class TestTask
{
object o = new object();
const int sleepTime = 50;
const long bufferTime = 60 * 2; // 過(guò)期時(shí)間小于2分鐘則重新獲取,防止服務(wù)器時(shí)間誤差
const String mnsAccountEndpoint = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com/"; // 阿里通信消息的endpoint,固定
public String name { get; private set; }
public String messageType { get; private set; }
public String QueueName { get; private set; }
public int TaskID { get; private set; }
public IAcsClient AcsClient { get; private set; }
public TestTask(String name, String messageType, String queueName, IAcsClient acsClient)
{
this.name = name;
this.messageType = messageType;
this.QueueName = queueName;
this.AcsClient = acsClient;
}
readonly Dictionary<string, QueryTokenForMnsQueue_MessageTokenDTO> tokenMap = new Dictionary<string, QueryTokenForMnsQueue_MessageTokenDTO>();
readonly Dictionary<string, Queue> queueMap = new Dictionary<string, Queue>();
public QueryTokenForMnsQueue_MessageTokenDTO GetTokenByMessageType(IAcsClient acsClient, String messageType)
{
QueryTokenForMnsQueueRequest request = new QueryTokenForMnsQueueRequest
{
MessageType = messageType
};
QueryTokenForMnsQueueResponse queryTokenForMnsQueueResponse = acsClient.GetAcsResponse(request);
QueryTokenForMnsQueue_MessageTokenDTO token = queryTokenForMnsQueueResponse.MessageTokenDTO;
return token;
}
/// 處理消息
public void Handle()
{
while (true)
{
try
{
QueryTokenForMnsQueue_MessageTokenDTO token = null;
Queue queue = null;
lock (o)
{
if (tokenMap.ContainsKey(messageType))
{
token = tokenMap[messageType];
}
if (queueMap.ContainsKey(QueueName))
{
queue = queueMap[QueueName];
}
TimeSpan ts = new TimeSpan(0);
if (token != null)
{
DateTime b = Convert.ToDateTime(token.ExpireTime);
DateTime c = Convert.ToDateTime(DateTime.Now);
ts = b - c;
}
if (token == null || ts.TotalSeconds < bufferTime || queue == null)
{
token = GetTokenByMessageType(AcsClient, messageType);
IMNS client = new MNSClient(token.AccessKeyId, token.AccessKeySecret, mnsAccountEndpoint, token.SecurityToken);
queue = client.GetNativeQueue(QueueName);
if (tokenMap.ContainsKey(messageType))
{
tokenMap.Remove(messageType);
}
if (queueMap.ContainsKey(QueueName))
{
queueMap.Remove(QueueName);
}
tokenMap.Add(messageType, token);
queueMap.Add(QueueName, queue);
}
}
BatchReceiveMessageResponse batchReceiveMessageResponse = queue.BatchReceiveMessage(16);
List<Message> messages = batchReceiveMessageResponse.Messages;
for (int i = 0; i <= messages.Count - 1; i++)
{
try
{
byte[] outputb = Convert.FromBase64String(messages[i].Body);
string orgStr = Encoding.UTF8.GetString(outputb);
Console.WriteLine(orgStr);
// TODO 具體消費(fèi)邏輯,待客戶自己實(shí)現(xiàn).
// 消費(fèi)成功的前提下刪除消息
// queue.DeleteMessage(messages[i].ReceiptHandle);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
Thread.Sleep(sleepTime);
}
}
}
}
<?php
use AlibabaCloud\Client\AlibabaCloud;
use AlibabaCloud\Client\Exception\ClientException;
use AlibabaCloud\Client\Exception\ServerException;
use AlibabaCloud\Dybaseapi\MNS\Requests\BatchReceiveMessage;
use AlibabaCloud\Dybaseapi\MNS\Requests\BatchDeleteMessage;
// 阿里云賬號(hào)AccessKey擁有所有API的訪問(wèn)權(quán)限,建議您使用RAM用戶進(jìn)行API訪問(wèn)或日常運(yùn)維。
// 強(qiáng)烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號(hào)下所有資源的安全。
// 本示例以將AccessKey ID和AccessKey Secret保存在環(huán)境變量為例說(shuō)明。您也可以根據(jù)業(yè)務(wù)需要,保存到配置文件里。
AlibabaCloud::accessKeyClient(getenv("VMS_AK_ENV"), getenv("VMS_SK_ENV"))
->regionId('cn-hangzhou')
->asGlobalClient();
$queueName = '<QueueName>'; // 隊(duì)列名稱(chēng),需要替換成您的隊(duì)列名稱(chēng)
$messageType = '<MessageType>'; // 需要接收的消息類(lèi)型,需要替換成您需要接收的消息類(lèi)型,回執(zhí)消息更多信息請(qǐng)參見(jiàn)回執(zhí)消息簡(jiǎn)介與配置流程。
$response = null;
$token = null;
$i = 0;
do {
try {
if (null == $token || strtotime($token['ExpireTime']) - time() > 2 * 60) {
$response = AlibabaCloud::rpcRequest()
->product('Dybaseapi')
->version('2017-05-25')
->action('QueryTokenForMnsQueue')
->method('POST')
->host("dybaseapi.aliyuncs.com")
->options([
'query' => [
'MessageType' => $messageType,
'QueueName' => $queueName,
],
])
->request()
->toArray();
}
$token = $response['MessageTokenDTO'];
$mnsClient = new \AlibabaCloud\Dybaseapi\MNS\MnsClient(
"http://1943695596114318.mns.cn-hangzhou.aliyuncs.com",
$token['AccessKeyId'],
$token['AccessKeySecret'],
$token['SecurityToken']
);
$mnsRequest = new BatchReceiveMessage(10, 5);
$mnsRequest->setQueueName($queueName);
$mnsResponse = $mnsClient->sendRequest($mnsRequest);
$receiptHandles = Array();
foreach ($mnsResponse->Message as $message) {
// 用戶邏輯:
// $receiptHandles[] = $message->ReceiptHandle; // 加入$receiptHandles數(shù)組中的記錄將會(huì)被刪除
$messageBody = base64_decode($message->MessageBody); // base64解碼后的JSON字符串
print_r($messageBody . "\n");
}
if (count($receiptHandles) > 0) {
$deleteRequest = new BatchDeleteMessage($queueName, $receiptHandles);
$mnsClient->sendRequest($deleteRequest);
}
} catch (ClientException $e) {
echo $e->getErrorMessage() . PHP_EOL;
} catch (ServerException $e) {
if ($e->getCode() == 404) {
$i++;
}
echo $e->getErrorMessage() . PHP_EOL;
}
} while ($i < 3);
#!/usr/bin/env python
# coding=utf8
import os
import time
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkcore.client import AcsClient
from aliyunsdkdybaseapi.request.v20170525.QueryTokenForMnsQueueRequest import QueryTokenForMnsQueueRequest
from aliyunsdkcore.profile import region_provider
from datetime import datetime
from aliyunsdkdybaseapi.mns.account import Account
from aliyunsdkdybaseapi.mns.queue import *
from aliyunsdkdybaseapi.mns.mns_exception import *
try:
import json
except ImportError:
import simplejson as json
# TODO 需要替換成您需要接收的消息類(lèi)型
message_type = "<MessageType>"
# TODO 需要替換成您的隊(duì)列名稱(chēng)。在云通信頁(yè)面開(kāi)通相應(yīng)業(yè)務(wù)消息后,就能在頁(yè)面上獲得對(duì)應(yīng)的queueName
queue_name = "<QueueName>"
# 云通信固定的endpoint地址
endpoint = "https://1943695596114318.mns.cn-hangzhou.aliyuncs.com/"
# 阿里云賬號(hào)AccessKey擁有所有API的訪問(wèn)權(quán)限,建議您使用RAM用戶進(jìn)行API訪問(wèn)或日常運(yùn)維。
# 強(qiáng)烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號(hào)下所有資源的安全。
# 本示例以把AccessKey ID和AccessKey Secret保存在環(huán)境變量為例說(shuō)明,來(lái)實(shí)現(xiàn)API訪問(wèn)的身份驗(yàn)證。
acs_client = AcsClient(os.getenv("VMS_AK_ENV"), os.getenv("VMS_SK_ENV"), "cn-hangzhou")
region_provider.add_endpoint("Dybaseapi", "dybaseapi.aliyuncs.com", "cn-hangzhou")
# 云通信業(yè)務(wù)token存在失效時(shí)間,需動(dòng)態(tài)更新。
class Token():
def __init__(self):
self.token = None
self.tmp_access_id = None
self.tmp_access_key = None
self.expire_time = None
def is_refresh(self):
if self.expire_time is None:
return 1
# 失效時(shí)間與當(dāng)前系統(tǒng)時(shí)間比較,提前2分鐘刷新token
now = datetime.now()
expire = datetime.strptime(self.expire_time, "%Y-%m-%d %H:%M:%S")
if expire <= now or (expire - now).seconds < 120:
return 1
return 0
def refresh(self):
print("start refresh token...")
request = QueryTokenForMnsQueueRequest()
request.set_MessageType(message_type)
request.set_QueueName(queue_name)
response = acs_client.do_action_with_exception(request)
# print response
if response is None:
raise ServerException("GET_TOKEN_FAIL", "獲取token時(shí)無(wú)響應(yīng)")
response_body = json.loads(response)
if response_body.get("Code") != "OK":
raise ServerException("GET_TOKEN_FAIL", "獲取token失敗")
sts_token = response_body.get("MessageTokenDTO")
self.tmp_access_key = sts_token.get("AccessKeySecret")
self.tmp_access_id = sts_token.get("AccessKeyId")
self.expire_time = sts_token.get("ExpireTime")
self.token = sts_token.get("SecurityToken")
print("finish refresh token...")
# 初始化 token, my_account, my_queue
token, my_account, my_queue = Token(), None, None
# 循環(huán)讀取刪除消息直到隊(duì)列空
# receive message請(qǐng)求使用long polling方式,通過(guò)wait_seconds指定長(zhǎng)輪詢時(shí)間為3秒
## long polling 解析:
### 當(dāng)隊(duì)列中有消息時(shí),請(qǐng)求立即返回;
### 當(dāng)隊(duì)列中沒(méi)有消息時(shí),請(qǐng)求在MNS服務(wù)器端掛3秒鐘,在這期間,有消息寫(xiě)入隊(duì)列,請(qǐng)求會(huì)立即返回消息,3秒后,請(qǐng)求返回隊(duì)列沒(méi)有消息;
wait_seconds = 3
print("%sReceive And Delete Message From Queue%s\nQueueName:%s\nWaitSeconds:%s\n" % (
10 * "=", 10 * "=", queue_name, wait_seconds))
while True:
receipt_handles = []
# 讀取消息
try:
# token過(guò)期是否需要刷新
if token.is_refresh() == 1:
# 刷新token
token.refresh()
if my_account:
my_account.mns_client.close_connection()
my_account = None
if not my_account:
my_account = Account(endpoint, token.tmp_access_id, token.tmp_access_key, token.token)
my_queue = my_account.get_queue(queue_name)
# 接收消息
recv_msgs = my_queue.batch_receive_message(10, wait_seconds)
for recv_msg in recv_msgs:
# TODO 業(yè)務(wù)處理
# receipt_handles.append(recv_msg.receipt_handle)
print("Receive Message Succeed! ReceiptHandle:%s MessageBody:%s MessageID:%s" % (
recv_msg.receipt_handle, recv_msg.message_body, recv_msg.message_id))
except MNSExceptionBase as e:
if e.type == "QueueNotExist":
print("Queue not exist, please create queue before receive message.")
break
elif e.type == "MessageNotExist":
print("Queue is empty! sleep 10s")
time.sleep(10)
continue
print("Receive Message Fail! Exception:%s\n" % e)
break
# 刪除消息
try:
if len(receipt_handles) > 0:
# my_queue.batch_delete_message(receipt_handles)
print("Delete Message Succeed! ReceiptHandles:%s" % receipt_handles)
except MNSExceptionBase as e:
print("Delete Message Fail! Exception:%s\n" % e)
package main
import (
"os"
"encoding/base64"
"fmt"
"github.com/aliyun/alibaba-cloud-sdk-go/sdk/endpoints"
"github.com/aliyun/alibaba-cloud-sdk-go/services/dybaseapi"
"github.com/aliyun/alibaba-cloud-sdk-go/services/dybaseapi/mns"
"time"
)
const (
mnsDomain = "1943695596114318.mns.cn-hangzhou.aliyuncs.com"
)
func main() {
endpoints.AddEndpointMapping("cn-hangzhou", "Dybaseapi", "dybaseapi.aliyuncs.com")
// 阿里云賬號(hào)AccessKey ID擁有所有API的訪問(wèn)權(quán)限,風(fēng)險(xiǎn)很高。強(qiáng)烈建議您創(chuàng)建并使用RAM用戶進(jìn)行API訪問(wèn)或日常運(yùn)維,請(qǐng)登錄RAM控制臺(tái)創(chuàng)建RAM用戶
// 此處以把AccessKey ID和AccessKey Secret保存在環(huán)境變量為例說(shuō)明。 您也可以根據(jù)業(yè)務(wù)需要,保存到配置文件里
// 強(qiáng)烈建議不要把AccessKey ID和AccessKey Secret保存到代碼里,會(huì)存在密鑰泄漏風(fēng)險(xiǎn)
// 創(chuàng)建client實(shí)例
client, err := dybaseapi.NewClientWithAccessKey(
"cn-hangzhou", // 您的可用區(qū)ID
os.Getenv("VMS_AK_ENV"),
os.Getenv("VMS_SK_ENV"))
if err != nil {
// 異常處理
panic(err)
}
queueName := "<QueueName>"http://需要替換成您的隊(duì)列名稱(chēng)
messageType := "<MessageType>"http://需要替換成您需要接收的消息類(lèi)型,回執(zhí)消息更多信息請(qǐng)參見(jiàn)回執(zhí)消息簡(jiǎn)介與配置流程。
var token *dybaseapi.MessageTokenDTO
for {
if token == nil || expireTime.Unix()-time.Now().Unix() < 2*60 {
// 創(chuàng)建 API 請(qǐng)求并設(shè)置參數(shù)
request := dybaseapi.CreateQueryTokenForMnsQueueRequest()
request.MessageType = messageType
request.QueueName = queueName
// 發(fā)起請(qǐng)求并處理異常
response, err := client.QueryTokenForMnsQueue(request)
if err != nil {
panic(err)
}
token = &response.MessageTokenDTO
}
expireTime, err = time.ParseInLocation("2006-01-02 15:04:05", token.ExpireTime, time.Local)
if err != nil {
panic(err)
}
mnsClient, err := mns.NewClientWithStsToken(
"cn-hangzhou",
token.AccessKeyId,
token.AccessKeySecret,
token.SecurityToken,
)
if err != nil {
panic(err)
}
mnsRequest := mns.CreateBatchReceiveMessageRequest()
mnsRequest.Domain = mnsDomain
mnsRequest.QueueName = queueName
mnsRequest.NumOfMessages = "10"
mnsRequest.WaitSeconds = "5"
mnsResponse, err := mnsClient.BatchReceiveMessage(mnsRequest)
if err != nil {
panic(err)
}
// fmt.Println(mnsResponse)
receiptHandles := make([]string, len(mnsResponse.Message))
for i, message := range mnsResponse.Message {
messageBody, decodeErr := base64.StdEncoding.DecodeString(message.MessageBody)
if decodeErr != nil {
panic(decodeErr)
}
fmt.Println(string(messageBody))
receiptHandles[i] = message.ReceiptHandle
}
if len(receiptHandles) > 0 {
mnsDeleteRequest := mns.CreateBatchDeleteMessageRequest()
mnsDeleteRequest.Domain = mnsDomain
mnsDeleteRequest.QueueName = queueName
mnsDeleteRequest.SetReceiptHandles(receiptHandles)
//_, err = mnsClient.BatchDeleteMessage(mnsDeleteRequest) // 取消注釋將刪除隊(duì)列中的消息
if err != nil {
panic(err)
}
}
}
}