本文介紹如何使用Java SDK消費隊列中的消息。
前提條件
授權信息
默認僅限阿里云賬號使用本接口,RAM用戶只有在被授予了相關API操作權限后方可使用。本接口的授權信息如下表所示。
Name | Value |
API | ReceiveMessage |
RAM授權操作 | mns:ReceiveMessage |
資源 | acs:mns:$region:$accountid:/queues/$queueName/messages |
使用說明
該接口用于消費者消費隊列中的消息,ReceiveMessage操作會將取得的消息狀態變成Inactive,Inactive狀態的時間長度由Queue屬性
VisibilityTimeout
指定。消費者在
VisibilityTimeout
時間內消費成功后需要調用DeleteMessage接口刪除該消息,否則該消息將會重新變成Active狀態,又可被消費者重新消費。
消息體編碼選擇
當消息體無特殊字符時,建議您不使用Base64編碼。
發送消息時使用
message.setMessageBodyAsRawString
方法設置消息體。接收消息時使用
meesage.getMessageBodyAsRawString
方法獲取消息體。
示例代碼
示例代碼下載,請參見ReceiveMessageDemo。
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import java.util.List;
/**
* 1. 遵循阿里云規范,env設置ak、sk。
* 2. ${"user.home"}/.aliyun-mns.properties 文件配置如下:
* mns.endpoint=http://xxxxxxx
* mns.msgBodyBase64Switch=true/false
*/
public class ReceiveMessageDemo {
/**
* 是否做 base64 編碼
*/
private static final Boolean IS_BASE64 = Boolean.valueOf(ServiceSettings.getMNSPropertyValue("msgBodyBase64Switch","false"));
public static void main(String[] args) {
String queueName = "cloud-queue-demo";
// 遵循阿里云規范,env設置ak、sk。
CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
//this client need only initialize once
MNSClient client = account.getMNSClient();
CloudQueue queue = client.getQueueRef(queueName);
try {
// 基礎: 單次拉取
singleReceive(queue);
// 推薦: 使用的 長輪詢批量拉取模型
longPollingBatchReceive(queue);
} catch (ClientException ce) {
System.out.println("Something wrong with the network connection between client and MNS service."
+ "Please check your network and DNS availablity.");
ce.printStackTrace();
} catch (ServiceException se) {
if (se.getErrorCode().equals("QueueNotExist")) {
System.out.println("Queue is not exist.Please create queue before use");
} else if (se.getErrorCode().equals("TimeExpired")) {
System.out.println("The request is time expired. Please check your local machine timeclock");
}
se.printStackTrace();
} catch (Exception e) {
System.out.println("Unknown exception happened!");
e.printStackTrace();
}
client.close();
}
private static void longPollingBatchReceive(CloudQueue queue) {
System.out.println("=============start longPollingBatchReceive=============");
// 一次性拉取最多xx條消息
int batchSize = 15;
// 長輪詢時間為 xx s
int waitSeconds = 15;
List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
if (messages != null && messages.size() > 0) {
for (Message message : messages) {
printMsgAndDelete(queue,message);
}
}
System.out.println("=============end longPollingBatchReceive=============");
}
private static void singleReceive(CloudQueue queue) {
System.out.println("=============start singleReceive=============");
Message popMsg = queue.popMessage();
printMsgAndDelete(queue, popMsg);
System.out.println("=============end singleReceive=============");
}
private static void printMsgAndDelete(CloudQueue queue, Message popMsg) {
if (popMsg != null) {
System.out.println("message handle: " + popMsg.getReceiptHandle());
System.out.println("message body: " + (IS_BASE64 ? popMsg.getMessageBody() : popMsg.getMessageBodyAsRawString()));
System.out.println("message id: " + popMsg.getMessageId());
System.out.println("message dequeue count:" + popMsg.getDequeueCount());
//<<to add your special logic.>>
//remember to delete message when consume message successfully.
queue.deleteMessage(popMsg.getReceiptHandle());
System.out.println("delete message successfully.\n");
}
}
}
文檔內容是否對您有幫助?