日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

MQTT數(shù)據(jù)流入規(guī)則的實現(xiàn)

更新時間:

如果您的云端應用需要使用云消息隊列 RocketMQ 版產品的某些功能,例如順序消息特性、事務消息特性等,您可以通過消息流入或流出規(guī)則將云消息隊列 MQTT 版云消息隊列 RocketMQ 版數(shù)據(jù)進行流轉。本文介紹如何將云消息隊列 RocketMQ 版產品的數(shù)據(jù)導入云消息隊列 MQTT 版

背景信息

云消息隊列 MQTT 版支持云端SDK,云上應用可直接通過云端SDK接入云消息隊列 MQTT 版服務端進行消息收發(fā)。云端SDK使用,請參見云端開發(fā)概述

同時云消息隊列 MQTT 版支持和其他云產品進行互通,當前支持的云產品有云消息隊列 RocketMQ 版

本文以公網(wǎng)環(huán)境中的Java SDK為例說明如何將云消息隊列 RocketMQ 版數(shù)據(jù)流入至云消息隊列 MQTT 版

quick_start_data_inflow

網(wǎng)絡訪問

云消息隊列 MQTT 版同時提供了公網(wǎng)接入點VPC 接入點
  • 公網(wǎng)接入點為本地公網(wǎng)環(huán)境訪問的IP地址,一般用于物聯(lián)網(wǎng)和移動互聯(lián)網(wǎng)場景中;
  • VPC 接入點為云上私網(wǎng)訪問的IP地址,一般用于云端應用接入云消息隊列 MQTT 版
重要 客戶端使用接入點連接服務時務必使用域名接入,不得直接使用域名背后的IP地址直接連接,因為IP地址隨時會變化。在以下使用情況中出現(xiàn)的問題云消息隊列 MQTT 版產品方概不負責:
  • 客戶端不使用域名接入而是使用IP地址接入,產品方更新了域名解析導致原有IP地址失效。
  • 客戶端網(wǎng)絡對IP地址設置網(wǎng)絡防火墻策略,產品方更新了域名解析后新IP地址被您的防火墻策略攔截。

前提條件

  • 安裝IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA為例。
  • 下載安裝JDK
  • 已創(chuàng)建云消息隊列 MQTT 版實例、Topic和Group ID,具體操作,請參見創(chuàng)建資源

  • 已創(chuàng)建云消息隊列 RocketMQ 版實例、Topic和Group ID,具體操作,請參見步驟二:創(chuàng)建資源

重要
  • 云消息隊列 MQTT 版數(shù)據(jù)流入規(guī)則僅支持云消息隊列 RocketMQ 版4.x系列實例。

  • 云消息隊列 MQTT 版數(shù)據(jù)流入規(guī)則不能跨地域使用,因此,云消息隊列 MQTT 版云消息隊列 RocketMQ 版的資源都必須創(chuàng)建在同一地域。

1.創(chuàng)建數(shù)據(jù)流入規(guī)則

  1. 登錄云消息隊列 MQTT 版控制臺,并在左側導航欄單擊實例列表

  2. 在頂部菜單欄選擇目標地域,然后在實例列表中單擊實例名稱進入實例詳情頁面。

  3. 在左側導航欄單擊規(guī)則管理,然后在頁面左上角,單擊創(chuàng)建規(guī)則

  4. 創(chuàng)建規(guī)則頁面完成以下操作。

    1. 配置基本信息。輸入規(guī)則ID,選擇數(shù)據(jù)流入的規(guī)則類型。

      image

    2. 配置規(guī)則源。選擇已經創(chuàng)建好的云消息隊列 RocketMQ 版的實例和Topic。

      image

    3. 配置規(guī)則目標。選擇已經創(chuàng)建好的云消息隊列 MQTT 版的Topic。

      image

2.準備測試代碼

2.1下載示例代碼

  1. 下載mqtt-java-demo,并解壓該Demo工程包至您指定的文件夾。

  2. 在解壓的Demo工程中找到lmq-java-demo文件夾,將此文件夾導入IntelliJ IDEA,并確認pom.xml中已包含以下依賴。

    <dependencies>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.5.Final</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
    </dependencies>
  3. 配置訪問憑證。

    • 獲取AccessKey信息。獲取方式,請參見創(chuàng)建AccessKey

    • 配置環(huán)境變量。云消息隊列 MQTT 版的AccessKey ID和AccessKey Secret的環(huán)境變量名稱分別為MQTT_AK_ENVMQTT_SK_ENV。關于配置環(huán)境變量的方法,請參見配置訪問憑證

2.2收發(fā)消息代碼

RocketMQSendMessageToMQ4IoT.java類中包含了發(fā)送RocketMQ消息和使用MQTT消費的代碼,按代碼注釋說明填寫云消息隊列 RocketMQ 版云消息隊列 MQTT 版資源的參數(shù)。

測試中可以將其中發(fā)送P2P消息的相關代碼注釋掉,示例代碼如下。

收發(fā)送消息代碼示例

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.SendResult;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class RocketMQSendMessageToMQ4IoT {
    public static void main(String[] args) throws Exception {
        /**
         * 初始化云消息隊列 RocketMQ 版發(fā)送客戶端,實際業(yè)務中一般部署在服務端應用中。
         */
        Properties properties = new Properties();
        /**
         * 設置云消息隊列 RocketMQ 版Group ID,在云消息隊列 RocketMQ 版控制臺創(chuàng)建。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-XXXXX");
        /**
         * AccessKey ID,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。
         * 阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。
         * 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。
         * 本示例以將AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。
         */
        properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
        /**
         * AccessKey Secret,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。僅在簽名鑒權模式下需要設置。
         */
        properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
        /**
         * 設置TCP接入點,該接入點為云消息隊列 RocketMQ 版實例的接入點。進入云消息隊列 RocketMQ 版控制臺實例詳情頁面獲取。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
        /**
         * 設置云消息隊列 RocketMQ 版的Topic,在云消息隊列 RocketMQ 版控制臺創(chuàng)建。
         * 云消息隊列 RocketMQ 版和云消息隊列 MQTT 版配合使用時,RocketMQ客戶端僅操作一級Topic。
         */
        final String parentTopic = "XXXXX";
        Producer producer = ONSFactory.createProducer(properties);
        producer.start();
        //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
        /**
         * 初始化云消息隊列 MQTT 版接收客戶端,實際業(yè)務中云消息隊列 MQTT 版一般部署在移動終端環(huán)境。
         */

        /**
         * 您在控制臺創(chuàng)建的云消息隊列 MQTT 版的實例ID。
         */
        String instanceId = "XXXXX";
        /**
         * 設置接入點,進入云消息隊列 MQTT 版控制臺實例詳情頁面獲取。
         */
        String endPoint = "XXXXXX.mqtt.aliyuncs.com";
        /**
         * AccessKey ID,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。
         * 阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。
         * 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。
         * 本示例以將AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。
         */
        String accessKey = System.getenv("MQTT_AK_ENV");
        /**
         * AccessKey Secret,阿里云身份驗證,在阿里云RAM控制臺創(chuàng)建。僅在簽名鑒權模式下需要設置。
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        /**
         * MQTT客戶端ID,由業(yè)務系統(tǒng)分配,需要保證每個TCP連接都不一樣,保證全局唯一,如果不同的客戶端對象(TCP連接)使用了相同的clientId會導致連接異常斷開。
         * clientId由兩部分組成,格式為GroupID@@@DeviceID,其中GroupID在云消息隊列 MQTT 版控制臺創(chuàng)建,DeviceID由業(yè)務方自己設置,clientId總長度不得超過64個字符。
         */
        String clientId = "GID_XXXX@@@XXXXX";
        /**
         * 云消息隊列 MQTT 版支持子級Topic,用來做自定義的過濾,此處為示例,可以填寫任何字符串。
         * 需要注意的是,完整的Topic長度不得超過128個字符。
         */
        final String subTopic = "/testMq4Iot";
        final String mq4IotTopic = parentTopic + subTopic;
        /**
         * QoS參數(shù)代表傳輸質量,可選0,1,2。詳細信息,請參見名詞解釋。
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 客戶端協(xié)議和端口。客戶端使用的協(xié)議和端口必須匹配,如果是SSL加密則設置ssl://endpoint:8883。
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 設置客戶端發(fā)送超時時間,防止無限阻塞。
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * 客戶端連接成功后就需要盡快訂閱需要的Topic。
                 */
                System.out.println("connect success");
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            final String topicFilter[] = {mq4IotTopic};
                            final int[] qos = {qosLevel};
                            mqttClient.subscribe(topicFilter, qos);
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * 消費消息的回調接口,需要確保該接口不拋異常,該接口運行返回即代表消息消費成功。
                 * 消費消息需要保證在規(guī)定時間內完成,如果消費耗時超過服務端約定的超時時間,對于可靠傳輸?shù)哪J剑斩丝赡軙卦囃扑停瑯I(yè)務需要做好冪等去重處理。
                 */
                System.out.println(
                    "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            /**
             * 使用RocketMQ客戶端發(fā)消息給MQTT客戶端時,Topic指定為一級父Topic,Tag指定為MQ2MQTT。
             */
            Message msg = new Message(parentTopic, "MQ2MQTT", "hello mq send mqtt msg".getBytes());
            /**
             * 使用RocketMQ客戶端發(fā)消息給MQTT客戶端時,可以通過MqttSecondTopic屬性設置MQTT的子級Topic屬性。
             */
            msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, subTopic);
            SendResult result = producer.send(msg);
            System.out.println(result);
//            /**
//             * 發(fā)送P2P消息,設置子級Topic。
//             */
//            msg.putUserProperties(PropertyKeyConst.MqttSecondTopic, "/p2p/" + clientId);
//            result = producer.send(msg);
//            System.out.println(result);
        }
        Thread.sleep(Long.MAX_VALUE);

    }

}

3.結果驗證

執(zhí)行RocketMQSendMessageToMQ4IoT.java類中的Main函數(shù)運行代碼。可以根據(jù)下面操作驗證消息的發(fā)送和消費情況。

代碼驗證

如下圖所示,RocketMQ消息已經成功發(fā)送,MQTT客戶端也已經成功消費。

image

控制臺驗證

  • 查詢消息發(fā)送情況。在云消息隊列 RocketMQ 版控制臺消息查詢頁面,根據(jù)Topic和Message ID查詢到消息已經成功發(fā)送,如下圖所示。

    image

  • 查詢消息消費情況。在云消息隊列 MQTT 版控制臺消息軌跡查詢頁面,根據(jù)Message ID查詢消息已經被消費,如下圖所示。

    image

更多信息