日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

廣播拉取消息模型

輕量消息隊列(原 MNS)支持一對多拉取消息消費模型,以滿足一對多訂閱、主動拉取的場景。本文介紹如何高效利用該模型實現多消費者并行拉取與處理消息。

說明

本文以Java SDK為例介紹廣播拉取消息流程,其他語言SDK請參見新版SDK參考(推薦)

前提條件

背景信息

輕量消息隊列(原 MNS)提供隊列(Queue)和主題(Topic)兩種模型,基本能滿足大多數應用場景。

  • 隊列提供的是一對一的共享消息消費模型,采用客戶端主動拉取(Pull)模式。

  • 主題模型提供一對多的廣播消息消費模型,采用服務端主動推送(Push)模式。

推送模式的好處是即時性能較好,但需要暴露客戶端地址來接收服務端的消息推送。有些情況下有的信息,例如企業內網,無法暴露推送地址,希望改用拉取(Pull)的方式。雖然輕量消息隊列(原 MNS)不直接提供這種消費模型,但可以結合主題和隊列來實現一對多的拉取消息消費模型。

解決方案

通過創建訂閱,讓主題將消息先推送到隊列,然后由消費者從隊列拉取消息。這樣既可以做到一對多的廣播消息,又可避免暴露消費者的地址。消息流

安裝Java依賴庫

  1. 在IDEA中創建一個Java工程。

  2. pom.xml文件中添加以下依賴引入Java依賴庫。

    <dependency>
        <groupId>com.aliyun.mns</groupId>
        <artifactId>aliyun-sdk-mns</artifactId>
        <version>1.1.9.2</version>
    </dependency>

接口說明

最新的Java SDK(1.1.8)中的CloudPullTopic默認支持上述解決方案。其中MNSClient提供以下接口來快速創建CloudPullTopic:

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)

參數說明如下:

  • TopicMeta:創建主題的參數設置。

  • QueueMeta:創建隊列的參數設置。

  • queueNameList:指定主題消息推送的隊列名列表。

  • needCreateQueuequeueNameList是否需要創建。

  • queueMetaTemplate:創建隊列需要的QueueMeta參數示例。

示例代碼

廣播拉取消息的示例代碼如下:

package doc;

// 引入阿里云輕量消息隊列(原 MNS)的相關類
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.model.QueueMeta;
import com.aliyun.mns.model.TopicMeta;
import com.aliyun.mns.client.CloudPullTopic;
import com.aliyun.mns.model.TopicMessage;
import com.aliyun.mns.model.RawTopicMessage;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.common.ServiceException;
import com.aliyun.mns.common.ClientException;
import java.util.Vector;

public class DemoTopicMessageBroadcast {
    public static void main(String[] args) {
      
// 獲取阿里云賬戶的AccessKey ID,AccessKey Secret和SMQ服務的endpoint。
    CloudAccount account = new CloudAccount(
    ServiceSettings.getMNSAccessKeyId(),
    ServiceSettings.getMNSAccessKeySecret(),
    ServiceSettings.getMNSAccountEndpoint());
    MNSClient client = account.getMNSClient();

// 創建消費者列表。
Vector<String> consumerNameList = new Vector<String>();
String consumerName1 = "consumer001";
String consumerName2 = "consumer002";
String consumerName3 = "consumer003";
consumerNameList.add(consumerName1);
consumerNameList.add(consumerName2);
consumerNameList.add(consumerName3);
QueueMeta queueMetaTemplate = new QueueMeta();
queueMetaTemplate.setPollingWaitSeconds(30);

try{

    // 創建主題。
    String topicName = "demo-topic-for-pull";
    TopicMeta topicMeta = new TopicMeta();
    topicMeta.setTopicName(topicName);
    CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate);

    // 發布消息。
    String messageBody = "broadcast message to all the consumers:hello the world.";
    // 如果發送原始消息,使用getMessageBodyAsRawString解析消息體。
    TopicMessage tMessage = new RawTopicMessage();
    tMessage.setBaseMessageBody(messageBody);
    pullTopic.publishMessage(tMessage);


    // 接收消息。
    CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
    CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
    CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);

    Message consumer1Msg = queueForConsumer1.popMessage(30);
    if(consumer1Msg != null)
    {
        System.out.println("consumer1 receive message:" + consumer1Msg.getMessageBodyAsRawString());
    } else{
        System.out.println("the queue is empty");
    }

    Message consumer2Msg = queueForConsumer2.popMessage(30);
    if(consumer2Msg != null)
    {
        System.out.println("consumer2 receive message:" + consumer2Msg.getMessageBodyAsRawString());
    }else{
        System.out.println("the queue is empty");
    }

    Message consumer3Msg = queueForConsumer3.popMessage(30);
    if(consumer3Msg != null)
    {
        System.out.println("consumer3 receive message:" + consumer3Msg.getMessageBodyAsRawString());
    }else{
        System.out.println("the queue is empty");
    }

    // 刪除主題。
    pullTopic.delete();
} catch(ClientException ce) {
    System.out.println("Something wrong with the network connection between client and SMQ service."
            + "Please check your network and DNS availability.");
    ce.printStackTrace();
} catch(ServiceException se) {

    se.printStackTrace();
}

    client.close();
    }
}