日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

超大消息傳輸

當消息內容大于64 KB以至于無法直接存入SMQ隊列時,不做消息切片,使用輕量消息隊列(原 MNS)對象存儲OSS來傳遞大于64 KB的消息。

背景信息

輕量消息隊列(原 MNS)的隊列的消息大小最大限制為64 KB,正常情況下基本能夠滿足控制流信息交換的需求。在某些特殊場景下,消息數據超過64 KB時就只能采用消息切片的方式。如果您不想使用消息切片,輕量消息隊列(原 MNS)支持通過OSS實現超大消息的傳遞。

下面為您介紹如何通過OSS來傳遞大于64 KB的消息。

解決方案

  1. 生產者在向輕量消息隊列(原 MNS)發送消息前,如果發現消息體大于64 KB,則先將消息體數據上傳到OSS。

  2. 生產者把數據對應的Object信息發送到輕量消息隊列(原 MNS)

  3. 消費者從輕量消息隊列(原 MNS)隊列里讀取消息,判斷消息內容是否為OSS的Object信息。

  4. 判斷消息內容是OSS的Object信息,則從OSS下載對應的Object內容,并作為消息體返回給上層程序。

具體過程如下圖所示。

image

注意事項

  • 大消息主要消費網絡帶寬,用該方案發送大消息時,生產者和消費者的網絡帶寬需要滿足需求。

  • 大消息網絡傳輸時間較長,受網絡波動影響的概率更大,建議在上層做必要的重試。

前提條件

示例代碼

示例代碼下載,請參見LargeMessageDemo

import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.scenarios.largeMessage.service.MNSExtendedClient;
import com.aliyun.mns.sample.scenarios.largeMessage.service.bean.MNSExtendedConfiguration;
import com.aliyun.mns.sample.scenarios.largeMessage.service.impl.MNSExtendedClientImpl;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
import com.aliyuncs.exceptions.ClientException;
import org.junit.Assert;

public class LargeMessageDemo {

    private final static String OSS_ENDPOINT = "oss-cn-XXX.aliyuncs.com";
    private final static String OSS_BUCKET_NAME = "mns-test-XXXXX-bucket";
    private final static String MNS_QUEUE_NAME = "test-largeMessage-queue";
    private final static String MNS_TOPIC_NAME = "test-largeMessage-topic";
    /**
     * 本文以4 KB臨界值為例,大于4 KB即用OSS存儲。
     */
    private final static Long payloadSizeThreshold = 4L;

    public static void main(String[] args) throws ClientException {
        // 從環境變量中獲取訪問憑證。
        EnvironmentVariableCredentialsProvider credentialsProvider = CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();

        // 創建OSSClient實例。
        OSS ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, credentialsProvider);

        // 創建MNS實例。
        // 遵循阿里云規范,env設置ak、sk。
        CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
        MNSClient client = account.getMNSClient();
        CloudQueue queue = client.getQueueRef(MNS_QUEUE_NAME);
        CloudTopic cloudTopic = client.getTopicRef(MNS_TOPIC_NAME);

        //reCreate
        ReCreateUtil.reCreateQueue(client,MNS_QUEUE_NAME);
        ReCreateUtil.reCreateTopic(client,MNS_TOPIC_NAME);

        // 配置超大隊列屬性。
        MNSExtendedConfiguration configuration = new MNSExtendedConfiguration()
            .setOssClient(ossClient).setOssBucketName(OSS_BUCKET_NAME)
            .setMNSQueue(queue)
            .setMNSTopic(cloudTopic)
            .setPayloadSizeThreshold(payloadSizeThreshold);

        MNSExtendedClient mnsExtendedClient = new MNSExtendedClientImpl(configuration);

        // 執行常規發送。
        Message normalMessage = new Message();
        normalMessage.setMessageBodyAsRawString("1");
        mnsExtendedClient.sendMessage(normalMessage);
        Message message = mnsExtendedClient.receiveMessage(10);
        System.out.println("[normal]ReceiveMsg:"+message.getMessageBodyAsRawString());
        mnsExtendedClient.deleteMessage(message.getReceiptHandle());

        // 大文件發送Queue模型。
        String largeMsgBody = "largeMessage";
        Assert.assertTrue(largeMsgBody.getBytes().length > payloadSizeThreshold);

        Message largeMessage = new Message();
        largeMessage.setMessageBodyAsRawString(largeMsgBody);

        mnsExtendedClient.sendMessage(largeMessage);
        Message receiveMessage = mnsExtendedClient.receiveMessage(10);
        System.out.println("[large]ReceiveMsg:"+receiveMessage.getMessageBodyAsRawString());
        mnsExtendedClient.deleteMessage(receiveMessage.getReceiptHandle());


        client.close();
        ossClient.shutdown();
    }
}