日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

MQTT數(shù)據(jù)流出規(guī)則的實現(xiàn)

更新時間:

如果您的云端應用需要使用云消息隊列 RocketMQ 版產品的某些功能,例如順序消息特性、事務消息特性等,您可以通過消息流入或流出規(guī)則將云消息隊列 MQTT 版云消息隊列 RocketMQ 版數(shù)據(jù)進行流轉。本文介紹如何將云消息隊列 MQTT 版的數(shù)據(jù)導出至其他阿里云產品。

背景信息

云消息隊列 MQTT 版支持云端SDK,云上應用可直接通過云端SDK接入云消息隊列 MQTT 版服務端進行消息收發(fā)。云端SDK使用,請參見云端開發(fā)概述。

同時云消息隊列 MQTT 版支持和其他云產品進行互通,當前支持的云產品有云消息隊列 RocketMQ 版

本文以公網(wǎng)環(huán)境中的Java SDK為例說明如何將云消息隊列 MQTT 版的數(shù)據(jù)導出至云消息隊列 RocketMQ 版

此場景下可使用多語言的第三方開源SDK來實現(xiàn)消息收發(fā)。更多信息,請參見SDK下載。

quick_start_data_outflow

網(wǎng)絡訪問

云消息隊列 MQTT 版同時提供了公網(wǎng)接入點VPC 接入點。
  • 公網(wǎng)接入點為本地公網(wǎng)環(huán)境訪問的IP地址,一般用于物聯(lián)網(wǎng)和移動互聯(lián)網(wǎng)場景中;
  • VPC 接入點為云上私網(wǎng)訪問的IP地址,一般用于云端應用接入云消息隊列 MQTT 版。
重要 客戶端使用接入點連接服務時務必使用域名接入,不得直接使用域名背后的IP地址直接連接,因為IP地址隨時會變化。在以下使用情況中出現(xiàn)的問題云消息隊列 MQTT 版產品方概不負責:
  • 客戶端不使用域名接入而是使用IP地址接入,產品方更新了域名解析導致原有IP地址失效。
  • 客戶端網(wǎng)絡對IP地址設置網(wǎng)絡防火墻策略,產品方更新了域名解析后新IP地址被您的防火墻策略攔截。

前提條件

  • 安裝IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA為例。
  • 下載安裝JDK。
  • 已創(chuàng)建云消息隊列 MQTT 版實例、Topic和Group ID,具體操作,請參見創(chuàng)建資源。

  • 已創(chuàng)建云消息隊列 RocketMQ 版實例、Topic和Group ID,具體操作,請參見步驟二:創(chuàng)建資源。

重要
  • 云消息隊列 MQTT 版數(shù)據(jù)流出規(guī)則僅支持云消息隊列 RocketMQ 版4.x系列實例。

  • 云消息隊列 MQTT 版數(shù)據(jù)流出規(guī)則不能跨地域使用,因此,云消息隊列 MQTT 版云消息隊列 RocketMQ 版的資源都必須創(chuàng)建在同一地域。

1.創(chuàng)建數(shù)據(jù)流出規(guī)則

  1. 登錄云消息隊列 MQTT 版控制臺,并在左側導航欄單擊實例列表

  2. 在頂部菜單欄選擇目標地域,然后在實例列表中單擊實例名稱進入實例詳情頁面。

  3. 在左側導航欄單擊規(guī)則管理,然后在頁面左上角,單擊創(chuàng)建規(guī)則

  4. 創(chuàng)建規(guī)則頁面完成以下操作。

    1. 配置基本信息。輸入規(guī)則ID,選擇數(shù)據(jù)流出的規(guī)則類型。

      image

    2. 配置規(guī)則源。選擇已經(jīng)創(chuàng)建好的云消息隊列 MQTT 版的Topic。

      image

    3. 配置規(guī)則目標。選擇已經(jīng)創(chuàng)建好的云消息隊列 RocketMQ 版的實例和Topic。

      image

2.準備測試代碼

2.1下載示例代碼

  1. 下載mqtt-java-demo,并解壓該Demo工程包至您指定的文件夾。

  2. 在解壓的Demo工程中找到lmq-java-demo文件夾,將此文件夾導入IntelliJ IDEA,并確認pom.xml中已包含以下依賴。

    <dependencies>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.5.Final</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
    </dependencies>
  3. 配置訪問憑證。

    • 獲取AccessKey信息。獲取方式,請參見創(chuàng)建AccessKey。

    • 配置環(huán)境變量。云消息隊列 MQTT 版的AccessKey ID和AccessKey Secret的環(huán)境變量名稱分別為MQTT_AK_ENVMQTT_SK_ENV。關于配置環(huán)境變量的方法,請參見配置訪問憑證。

2.2收發(fā)消息代碼

MQ4IoTSendMessageToRocketMQ.java類中包含了發(fā)送MQTT消息和使用RocketMQ消費的代碼,按代碼注釋說明填寫云消息隊列 MQTT 版云消息隊列 RocketMQ 版資源的參數(shù)。示例代碼如下。

收發(fā)送消息代碼示例

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQ4IoTSendMessageToRocketMQ {
    public static void main(String[] args) throws Exception {
        /**
         * 初始化云消息隊列 RocketMQ 版接收客戶端,實際業(yè)務中一般部署在服務端應用中。
         */
        Properties properties = new Properties();
        /**
         * 設置云消息隊列 RocketMQ 版Group ID,在云消息隊列 RocketMQ 版控制臺創(chuàng)建。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
        
        /**
         * AccessKey ID,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。
         * 阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。
         * 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。
         * 本示例以將AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。
         */
        properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
        /**
         * AccessKey Secret,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。僅在簽名鑒權模式下需要設置。
         */
        properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
      
         /**
         * 設置TCP接入點,該接入點為云消息隊列 RocketMQ 版實例的接入點。云消息隊列 RocketMQ 版控制臺實例詳情頁面獲取。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://xxxxx.XXXXX.mq-internet.aliyuncs.com");
        /**
         * 設置云消息隊列 RocketMQ 版的Topic,在云消息隊列 RocketMQ 版控制臺創(chuàng)建。
         * 云消息隊列 RocketMQ 版和微消息隊列MQTT配合使用時,RocketMQ客戶端僅操作一級Topic。
         */
        final String parentTopic = "XXXXX";
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe(parentTopic, "*", new MessageListener() {
            public Action consume(Message message, ConsumeContext consumeContext) {
                System.out.println("recv msg:" + message);
                return Action.CommitMessage;
            }
        });
        consumer.start();
        //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
        /**
         * 初始化云消息隊列 MQTT 版發(fā)送客戶端,實際業(yè)務中云消息隊列 MQTT 版一般部署在移動終端環(huán)境。
         */

        /**
         * 您在控制臺創(chuàng)建的微消息隊列MQTT的實例ID。
         */
        String instanceId = "XXXXX";
         /**
         * 設置接入點,進入微消息隊列MQTT版控制臺實例詳情頁面獲取。
         */
        String endPoint = "XXXXXX.mqtt.aliyuncs.com";
        /**
         * AccessKey ID,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。
         * 阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。
         * 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。
         * 本示例以將AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。
         */
        String accessKey = System.getenv("MQTT_AK_ENV");
        /**
         * AccessKey Secret,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。僅在簽名鑒權模式下需要設置。
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        /**
         * MQTT客戶端ID,由業(yè)務系統(tǒng)分配,需要保證每個TCP連接都不一樣,保證全局唯一,如果不同的客戶端對象(TCP連接)使用了相同的clientId會導致連接異常斷開。
         * clientId由兩部分組成,格式為GroupID@@@DeviceID,其中GroupID在云消息隊列 MQTT 版控制臺創(chuàng)建,DeviceID由業(yè)務方自己設置,clientId總長度不得超過64個字符。
         */
        String clientId = "GID_XXXX@@@XXXXX";
       /**
         * 云消息隊列 MQTT 版支持子級Topic,用來做自定義的過濾,此處為示例,可以填寫任何字符串。
         * 需要注意的是,完整的Topic長度不得超過128個字符。
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * QoS參數(shù)代表傳輸質量,可選0,1,2。詳細信息,請參見名詞解釋。
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
         /**
         * 客戶端協(xié)議和端口??蛻舳耸褂玫膮f(xié)議和端口必須匹配,如果是SSL加密則設置ssl://endpoint:8883。
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 設置客戶端發(fā)送超時時間,防止無限阻塞。
         */
        mqttClient.setTimeToWait(5000);
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客戶端連接成功后就需要盡快訂閱需要的Topic。
                 */
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             *  發(fā)送普通消息時,Topic必須和接收方訂閱的Topic一致,或者符合通配符匹配規(guī)則。
             */
            mqttClient.publish(mq4IotTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);

    }

}

3.結果驗證

執(zhí)行MQ4IoTSendMessageToRocketMQ.java類中的Main函數(shù)運行代碼??梢愿鶕?jù)下面操作驗證消息的發(fā)送和消費情況。

代碼驗證

如下圖所示,MQTT消息已經(jīng)成功發(fā)送,RocketMQ客戶端也已經(jīng)成功消費。

image

控制臺驗證

  • 查詢消息發(fā)送情況。在云消息隊列 MQTT 版控制臺消息軌跡查詢頁面,根據(jù)Group ID和Device ID查詢消息已經(jīng)成功發(fā)送,如下圖所示。

    image

  • 查詢消息消費情況。在云消息隊列 RocketMQ 版控制臺消息查詢頁面,根據(jù)Topic查詢到消息已經(jīng)被轉發(fā)到云消息隊列 RocketMQ 版,如下圖所示。

    image

    單擊操作列中的消息軌跡查看,消息已經(jīng)被消費,如下圖所示。

    image

更多信息