隊(duì)列訂閱示例
更新時(shí)間:
本文介紹如何使用Java SDK消費(fèi)指定主題的消息。
前提條件
消息體編碼選擇
訂閱格式為SIMPLIFIED:消息體無(wú)特殊字符的情況下,建議不使用Base64編碼。
向主題發(fā)送消息時(shí)使用
RawTopicMessage
初始化消息對(duì)象。從隊(duì)列消費(fèi)消息時(shí)使用
message.getMessageBodyAsRawString()
獲取消息體。
訂閱格式為JSON或XML:Base64編碼的字符串適合在JSON、XML等文本格式中傳輸,建議使用Base64編碼。
向主題發(fā)送消息時(shí)使用TopicMessage初始化消息對(duì)象,此時(shí)消息體會(huì)被Base64編碼后放在Message字段中傳輸。
從隊(duì)列消費(fèi)消息時(shí)使用
message.getMessageBodyAsRawString();
獲取Message字段值后使用Base64解碼。JSONObject object = new JSONObject(message.getMessageBodyAsRawString()); String jsonMessageData = String.valueOf(object.get("Message")); String messageBody = new String(Base64.decodeBase64(jsonMessageData));
示例代碼
示例代碼下載,請(qǐng)參見(jiàn)ConsumerQueueForTopicDemo。
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.ClientException;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import java.io.StringReader;
import java.util.List;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.commons.codec.binary.Base64;
import org.json.JSONException;
import org.json.JSONObject;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.InputSource;
/**
* 在 topic 模型下,queue有三個(gè)類型,xml、json、simple,在base 64加密下不一樣,詳見(jiàn)下文
* 1. 遵循阿里云規(guī)范,env 設(shè)置ak、sk。
* 2. ${"user.home"}/.aliyun-mns.properties 文件配置如下:
* mns.endpoint=http://xxxxxxx
* mns.msgBodyBase64Switch=true/false
*/
public class ConsumerQueueForTopicDemo {
/**
* 是否做 base64 編碼
*/
private static final Boolean IS_BASE64 = Boolean.valueOf(ServiceSettings.getMNSPropertyValue("msgBodyBase64Switch","false"));
public static void main(String[] args) {
String QUEUE_NAME = "TestQueue";
// 遵循阿里云規(guī)范,env設(shè)置ak、sk。
CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
//this client need only initialize once
MNSClient client = account.getMNSClient();
CloudQueue queue = client.getQueueRef(QUEUE_NAME);
try {
longPollingBatchReceive(queue);
} catch (ClientException ce) {
System.out.println("Something wrong with the network connection between client and MNS service."
+ "Please check your network and DNS availablity.");
ce.printStackTrace();
} catch (ServiceException se) {
if (se.getErrorCode().equals("QueueNotExist")) {
System.out.println("Queue is not exist.Please create queue before use");
} else if (se.getErrorCode().equals("TimeExpired")) {
System.out.println("The request is time expired. Please check your local machine timeclock");
}
se.printStackTrace();
} catch (Exception e) {
System.out.println("Unknown exception happened!");
e.printStackTrace();
}
client.close();
}
private static void longPollingBatchReceive(CloudQueue queue) {
System.out.println("=============start longPollingBatchReceive=============");
// 一次性拉取 最多 xx 條消息
int batchSize = 15;
// 長(zhǎng)輪詢時(shí)間為 xx s
int waitSeconds = 15;
List<Message> messages = queue.batchPopMessage(batchSize, waitSeconds);
if (messages != null && messages.size() > 0) {
for (Message message : messages) {
System.out.println("message handle: " + message.getReceiptHandle());
System.out.println("message body: " + message.getOriginalMessageBody());
System.out.println("message body real data: " + getMessageBodyData(message));
System.out.println("message id: " + message.getMessageId());
System.out.println("message dequeue count:" + message.getDequeueCount());
//<<to add your special logic.>>
//remember to delete message when consume message successfully.
queue.deleteMessage(message.getReceiptHandle());
System.out.println("delete message successfully.\n");
}
}
System.out.println("=============end longPollingBatchReceive=============");
}
private static String getMessageBodyData(Message message){
if (message == null){
return null;
}
String originalMessageBody = message.getOriginalMessageBody();
// 1. 嘗試解析為JSON
try {
JSONObject object = new JSONObject(originalMessageBody);
String jsonMessageData = String.valueOf(object.get("Message"));
System.out.println("message body type: JSON,value:"+jsonMessageData );
return IS_BASE64? new String(Base64.decodeBase64(jsonMessageData)): jsonMessageData;
} catch (JSONException ex1) {
// 不是JSON,繼續(xù)檢查XML
}
// 2. 嘗試解析為XML
try {
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
DocumentBuilder builder = factory.newDocumentBuilder();
Document doc = builder.parse(new InputSource(new StringReader(originalMessageBody)));
Element root = doc.getDocumentElement();
NodeList nodeList = root.getElementsByTagName("Message");
String content = nodeList.item(0).getTextContent();
System.out.println("message body type: XML,value:"+content );
return IS_BASE64? new String(Base64.decodeBase64(content)): content;
} catch (Exception ex) {
// 不是有效的XML
}
// 既不是JSON也不是XML,視為普通文本
System.out.println("message body type: SIMPLE" );
return IS_BASE64 ? message.getMessageBody() : message.getMessageBodyAsRawString();
}
}
文檔內(nèi)容是否對(duì)您有幫助?