當消息內容大于64 KB以至于無法直接存入SMQ隊列時,不做消息切片,使用輕量消息隊列(原 MNS)和對象存儲OSS來傳遞大于64 KB的消息。
背景信息
輕量消息隊列(原 MNS)的隊列的消息大小最大限制為64 KB,正常情況下基本能夠滿足控制流信息交換的需求。在某些特殊場景下,消息數據超過64 KB時就只能采用消息切片的方式。如果您不想使用消息切片,輕量消息隊列(原 MNS)支持通過OSS實現超大消息的傳遞。
下面為您介紹如何通過OSS來傳遞大于64 KB的消息。
解決方案
生產者在向輕量消息隊列(原 MNS)發送消息前,如果發現消息體大于64 KB,則先將消息體數據上傳到OSS。
生產者把數據對應的Object信息發送到輕量消息隊列(原 MNS)。
消費者從輕量消息隊列(原 MNS)隊列里讀取消息,判斷消息內容是否為OSS的Object信息。
判斷消息內容是OSS的Object信息,則從OSS下載對應的Object內容,并作為消息體返回給上層程序。
具體過程如下圖所示。
注意事項
大消息主要消費網絡帶寬,用該方案發送大消息時,生產者和消費者的網絡帶寬需要滿足需求。
大消息網絡傳輸時間較長,受網絡波動影響的概率更大,建議在上層做必要的重試。
前提條件
示例代碼
示例代碼下載,請參見LargeMessageDemo。
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.scenarios.largeMessage.service.MNSExtendedClient;
import com.aliyun.mns.sample.scenarios.largeMessage.service.bean.MNSExtendedConfiguration;
import com.aliyun.mns.sample.scenarios.largeMessage.service.impl.MNSExtendedClientImpl;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
import com.aliyuncs.exceptions.ClientException;
import org.junit.Assert;
public class LargeMessageDemo {
private final static String OSS_ENDPOINT = "oss-cn-XXX.aliyuncs.com";
private final static String OSS_BUCKET_NAME = "mns-test-XXXXX-bucket";
private final static String MNS_QUEUE_NAME = "test-largeMessage-queue";
private final static String MNS_TOPIC_NAME = "test-largeMessage-topic";
/**
* 本文以4 KB臨界值為例,大于4 KB即用OSS存儲。
*/
private final static Long payloadSizeThreshold = 4L;
public static void main(String[] args) throws ClientException {
// 從環境變量中獲取訪問憑證。
EnvironmentVariableCredentialsProvider credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
// 創建OSSClient實例。
OSS ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, credentialsProvider);
// 創建MNS實例。
// 遵循阿里云規范,env設置ak、sk。
CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
MNSClient client = account.getMNSClient();
CloudQueue queue = client.getQueueRef(MNS_QUEUE_NAME);
CloudTopic cloudTopic = client.getTopicRef(MNS_TOPIC_NAME);
//reCreate
ReCreateUtil.reCreateQueue(client,MNS_QUEUE_NAME);
ReCreateUtil.reCreateTopic(client,MNS_TOPIC_NAME);
// 配置超大隊列屬性。
MNSExtendedConfiguration configuration = new MNSExtendedConfiguration()
.setOssClient(ossClient).setOssBucketName(OSS_BUCKET_NAME)
.setMNSQueue(queue)
.setMNSTopic(cloudTopic)
.setPayloadSizeThreshold(payloadSizeThreshold);
MNSExtendedClient mnsExtendedClient = new MNSExtendedClientImpl(configuration);
// 執行常規發送。
Message normalMessage = new Message();
normalMessage.setMessageBodyAsRawString("1");
mnsExtendedClient.sendMessage(normalMessage);
Message message = mnsExtendedClient.receiveMessage(10);
System.out.println("[normal]ReceiveMsg:"+message.getMessageBodyAsRawString());
mnsExtendedClient.deleteMessage(message.getReceiptHandle());
// 大文件發送Queue模型。
String largeMsgBody = "largeMessage";
Assert.assertTrue(largeMsgBody.getBytes().length > payloadSizeThreshold);
Message largeMessage = new Message();
largeMessage.setMessageBodyAsRawString(largeMsgBody);
mnsExtendedClient.sendMessage(largeMessage);
Message receiveMessage = mnsExtendedClient.receiveMessage(10);
System.out.println("[large]ReceiveMsg:"+receiveMessage.getMessageBodyAsRawString());
mnsExtendedClient.deleteMessage(receiveMessage.getReceiptHandle());
client.close();
ossClient.shutdown();
}
}
文檔內容是否對您有幫助?