日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

MQTT上下線通知規(guī)則的實(shí)現(xiàn)

更新時(shí)間:

云消息隊(duì)列 MQTT 版支持通過創(chuàng)建上下線通知規(guī)則,將在線狀態(tài)通知消息推送至云消息隊(duì)列 RocketMQ 版,以獲取客戶端的在線狀態(tài)。本文為您介紹如何通過創(chuàng)建上下線通知規(guī)則來獲取客戶端的在線狀態(tài)。

背景信息

在實(shí)際業(yè)務(wù)中服務(wù)端需要對客戶端的上下線數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析,并根據(jù)客戶端的在線狀態(tài)推送消息。云消息隊(duì)列 MQTT 版提供異步上下線事件通知獲取客戶端在線狀態(tài),MQTT客戶端的上下線事件將會觸發(fā)MQTT服務(wù)端生成一條通知消息,獲取通知消息有以下方式:

  • 通過云端SDK接入云消息隊(duì)列 MQTT 版服務(wù)端獲取客戶端的在線狀態(tài)。更多信息,請參見獲取MQTT客戶端在線狀態(tài)

  • 通過創(chuàng)建上下線通知規(guī)則,將在線狀態(tài)通知消息推送至云消息隊(duì)列 RocketMQ 版,然后訂閱云消息隊(duì)列 RocketMQ 版獲取客戶端的在線狀態(tài)。

本文以創(chuàng)建上下線通知規(guī)則為例,介紹后端應(yīng)用如何獲取客戶端在線狀態(tài)。

quick_start_client_stats_notify

網(wǎng)絡(luò)訪問

云消息隊(duì)列 MQTT 版同時(shí)提供了公網(wǎng)接入點(diǎn)VPC 接入點(diǎn)
  • 公網(wǎng)接入點(diǎn)為本地公網(wǎng)環(huán)境訪問的IP地址,一般用于物聯(lián)網(wǎng)和移動(dòng)互聯(lián)網(wǎng)場景中;
  • VPC 接入點(diǎn)為云上私網(wǎng)訪問的IP地址,一般用于云端應(yīng)用接入云消息隊(duì)列 MQTT 版
重要 客戶端使用接入點(diǎn)連接服務(wù)時(shí)務(wù)必使用域名接入,不得直接使用域名背后的IP地址直接連接,因?yàn)镮P地址隨時(shí)會變化。在以下使用情況中出現(xiàn)的問題云消息隊(duì)列 MQTT 版產(chǎn)品方概不負(fù)責(zé):
  • 客戶端不使用域名接入而是使用IP地址接入,產(chǎn)品方更新了域名解析導(dǎo)致原有IP地址失效。
  • 客戶端網(wǎng)絡(luò)對IP地址設(shè)置網(wǎng)絡(luò)防火墻策略,產(chǎn)品方更新了域名解析后新IP地址被您的防火墻策略攔截。

前提條件

  • 安裝IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA為例。
  • 下載安裝JDK
  • 已創(chuàng)建云消息隊(duì)列 MQTT 版實(shí)例、Topic和Group ID,具體操作,請參見創(chuàng)建資源

  • 已創(chuàng)建云消息隊(duì)列 RocketMQ 版實(shí)例、Topic和Group ID,具體操作,請參見步驟二:創(chuàng)建資源

重要
  • 云消息隊(duì)列 MQTT 版客戶端上下線通知規(guī)則僅支持云消息隊(duì)列 RocketMQ 版4.x系列實(shí)例。

  • 云消息隊(duì)列 MQTT 版客戶端上下線通知規(guī)則不能跨地域使用,因此,云消息隊(duì)列 MQTT 版云消息隊(duì)列 RocketMQ 版的資源都必須創(chuàng)建在同一地域。

1.創(chuàng)建上下線通知規(guī)則

  1. 登錄云消息隊(duì)列 MQTT 版控制臺,并在左側(cè)導(dǎo)航欄單擊實(shí)例列表

  2. 在頂部菜單欄選擇目標(biāo)地域,然后在實(shí)例列表中單擊實(shí)例名稱進(jìn)入實(shí)例詳情頁面。

  3. 在左側(cè)導(dǎo)航欄單擊規(guī)則管理,然后在頁面左上角,單擊創(chuàng)建規(guī)則

  4. 創(chuàng)建規(guī)則頁面完成以下操作。

    1. 配置基本信息。輸入規(guī)則ID,選擇上下線通知的規(guī)則類型。

      image

    2. 配置規(guī)則源。選擇已經(jīng)創(chuàng)建好的云消息隊(duì)列 MQTT 版的Group ID。

      image

    3. 配置規(guī)則目標(biāo)。選擇已經(jīng)創(chuàng)建好的云消息隊(duì)列 RocketMQ 版的實(shí)例和Topic。

      image

2.準(zhǔn)備測試代碼

客戶端的上下線操作,上下線通知的消息處理都需要代碼完成,本文以Java的Demo示例作為您代碼開發(fā)的參考。

2.1下載示例代碼

  1. 下載mqtt-java-demo,并解壓該Demo工程包至您指定的文件夾。

  2. 在解壓的Demo工程中找到lmq-java-demo文件夾,將此文件夾導(dǎo)入IntelliJ IDEA,并確認(rèn)pom.xml中已包含以下依賴。

    <dependencies>
        <dependency>
            <groupId>org.bouncycastle</groupId>
            <artifactId>bcprov-jdk15on</artifactId>
            <version>1.70</version>
        </dependency>
        <dependency>
            <groupId>commons-codec</groupId>
            <artifactId>commons-codec</artifactId>
            <version>1.10</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.5.Final</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-onsmqtt</artifactId>
            <version>1.0.3</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>4.5.0</version>
        </dependency>
    </dependencies>
  3. 配置訪問憑證。

    • 獲取AccessKey信息。獲取方式,請參見創(chuàng)建AccessKey

    • 配置環(huán)境變量。云消息隊(duì)列 MQTT 版的AccessKey ID和AccessKey Secret的環(huán)境變量名稱分別為MQTT_AK_ENVMQTT_SK_ENV。關(guān)于配置環(huán)境變量的方法,請參見配置訪問憑證

2.2客戶端上下線代碼

MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java類中,按代碼注釋說明填寫云消息隊(duì)列 MQTT 版資源的參數(shù)。

測試中僅模擬上下線的操作不需要發(fā)送消息,可以將其中發(fā)送消息的相關(guān)代碼去掉,示例代碼如下。

客戶端上下線代碼示例

import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
    public static void main(String[] args) throws Exception {
        /**
         * MQ4IOT 實(shí)例 ID,購買后控制臺獲取
         */
        String instanceId = "XXXXX";
        /**
         * 接入點(diǎn)地址,購買 MQ4IOT 實(shí)例,且配置完成后即可獲取,接入點(diǎn)地址必須填寫分配的域名,不得使用 IP 地址直接連接,否則可能會導(dǎo)致客戶端異常。
         */
        String endPoint = "XXXXX.mqtt.aliyuncs.com";
        /**
         * 賬號 AccessKey,從賬號系統(tǒng)控制臺獲取
         * 阿里云賬號AccessKey擁有所有API的訪問權(quán)限,建議您使用RAM用戶進(jìn)行API訪問或日常運(yùn)維。
         * 強(qiáng)烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號下所有資源的安全。
         * 本示例以把AccessKey ID和AccessKey Secret保存在環(huán)境變量為例說明。運(yùn)行本代碼示例之前,請先配置環(huán)境變量MQTT_AK_ENV和MQTT_SK_ENV
         * 例如:export MQTT_AK_ENV=<access_key_id>
         *      export MQTT_SK_ENV=<access_key_secret>
         * 需要將<access_key_id>替換為已準(zhǔn)備好的AccessKey ID,<access_key_secret>替換為AccessKey Secret。
         */
        String accessKey = System.getenv("MQTT_AK_ENV");
        /**
         * 賬號 secretKey,從賬號系統(tǒng)控制臺獲取,僅在Signature鑒權(quán)模式下需要設(shè)置
         */
        String secretKey = System.getenv("MQTT_SK_ENV");
        /**
         * MQ4IOT clientId,由業(yè)務(wù)系統(tǒng)分配,需要保證每個(gè) tcp 連接都不一樣,保證全局唯一,如果不同的客戶端對象(tcp 連接)使用了相同的 clientId 會導(dǎo)致連接異常斷開。
         * clientId 由兩部分組成,格式為 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制臺申請,DeviceId 由業(yè)務(wù)方自己設(shè)置,clientId 總長度不得超過64個(gè)字符。
         */
        String clientId = "GID_XXXXX@@@XXXXX";

        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * 客戶端使用的協(xié)議和端口必須匹配,具體參考文檔 http://bestwisewords.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
         * 如果是 SSL 加密則設(shè)置ssl://endpoint:8883
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * 客戶端設(shè)置好發(fā)送超時(shí)時(shí)間,防止無限阻塞
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        Thread.sleep(Long.MAX_VALUE);
    }
}

2.3上下線通知消息的處理代碼

上下線通知消息被推送到云消息隊(duì)列 RocketMQ 版,消費(fèi)者訂閱消息后需要根據(jù)業(yè)務(wù)需求進(jìn)行處理。

MQTTClientStatusNoticeProcessDemo.java類中按代碼注釋說明填寫云消息隊(duì)列 RocketMQ 版資源的參數(shù),示例代碼如下。

上下線通知消息的處理代碼示例

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

public class MQTTClientStatusNoticeProcessDemo {
    public static void main(String[] args) {
        /**
         * 初始化消息隊(duì)列RocketMQ版接收客戶端,實(shí)際業(yè)務(wù)中一般部署在服務(wù)端應(yīng)用中。
         */
        Properties properties = new Properties();
        /**
         * 設(shè)置消息隊(duì)列RocketMQ版Group ID,在消息隊(duì)列RocketMQ版控制臺創(chuàng)建。
         */
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
        /**
         * AccessKey ID,阿里云身份驗(yàn)證,在阿里云RAM控制臺創(chuàng)建。
         * 阿里云賬號AccessKey擁有所有API的訪問權(quán)限,建議您使用RAM用戶進(jìn)行API訪問或日常運(yùn)維。
         * 強(qiáng)烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號下所有資源的安全。
         * 本示例以將AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。
         */
        properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
        /**
         * AccessKey Secret,阿里云身份驗(yàn)證,在阿里云RAM控制臺創(chuàng)建。僅在簽名鑒權(quán)模式下需要設(shè)置。
         */
        properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
        /**
         * 設(shè)置TCP接入點(diǎn),該接入點(diǎn)為消息隊(duì)列RocketMQ版實(shí)例的接入點(diǎn)。進(jìn)入消息隊(duì)列RocketMQ版控制臺實(shí)例詳情頁面獲取。
         */
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
        /**
         * 使用消息隊(duì)列RocketMQ版消費(fèi)端來處理MQTT客戶端的上下線通知時(shí),訂閱的Topic為上下線通知Topic。
         */
        final String parentTopic = "GID_XXXX_MQTT";
        /**
         * 客戶端狀態(tài)數(shù)據(jù),實(shí)際生產(chǎn)環(huán)境中建議使用數(shù)據(jù)庫或者Redis等外部持久化存儲來保存該信息,避免應(yīng)用重啟丟失狀態(tài),本示例以單機(jī)內(nèi)存版實(shí)現(xiàn)為例。
         */
        MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
        Consumer consumer = ONSFactory.createConsumer(properties);
        /**
         *  此處僅處理客戶端是否在線,因此只需要關(guān)注connect事件和tcpclean事件即可。
         */
        consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
        consumer.start();
        String clientId = "GID_XXXXX@@@XXXXX";
        while (true) {
            System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 處理上下線通知的邏輯。
     * 實(shí)際部署過程中,消費(fèi)上下線通知的應(yīng)用可能部署多臺機(jī)器,因此客戶端在線狀態(tài)的數(shù)據(jù)可以使用數(shù)據(jù)庫或者Redis等外部共享存儲來維護(hù)。
     * 其次需要單獨(dú)做消息冪等處理,以免重復(fù)接收消息導(dǎo)致狀態(tài)機(jī)判斷錯(cuò)誤。
     */
    static class MqttClientStatusNoticeListener implements MessageListener {
        private MqttClientStatusStore mqttClientStatusStore;

        public MqttClientStatusNoticeListener(
            MqttClientStatusStore mqttClientStatusStore) {
            this.mqttClientStatusStore = mqttClientStatusStore;
        }

        @Override
        public Action consume(Message message, ConsumeContext context) {
            try {
                JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
                System.out.println(msgBody);
                String eventType = msgBody.getString("eventType");
                String clientId = msgBody.getString("clientId");
                String channelId = msgBody.getString("channelId");
                ClientStatusEvent event = new ClientStatusEvent();
                event.setChannelId(channelId);
                event.setClientIp(msgBody.getString("clientIp"));
                event.setEventType(eventType);
                event.setTime(msgBody.getLong("time"));
                /**
                 * 首先存儲新的事件。
                 */
                mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
                /**
                 * 讀取當(dāng)前channel的事件列表。
                 */
                Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
                if (events == null || events.isEmpty()) {
                    return Action.CommitMessage;
                }
                /**
                 * 如果事件列表里上線和下線事件都已經(jīng)收到,則當(dāng)前channel已經(jīng)掉線,可以清理掉這個(gè)channel的數(shù)據(jù)。
                 */
                boolean findOnlineEvent = false;
                boolean findOfflineEvent = false;
                for (ClientStatusEvent clientStatusEvent : events) {
                    if (clientStatusEvent.isOnlineEvent()) {
                        findOnlineEvent = true;
                    } else {
                        findOfflineEvent = true;
                    }
                }
                if (findOnlineEvent && findOfflineEvent) {
                    mqttClientStatusStore.deleteEvent(clientId, channelId);
                }
                return Action.CommitMessage;
            } catch (Throwable e) {
                e.printStackTrace();
            }
            return Action.ReconsumeLater;
        }
    }

    /**
     * 根據(jù)狀態(tài)表判斷一個(gè)clientId是否有活躍的TCP連接。
     * 1.如果沒有channel表,則客戶端一定不在線。
     * 2.如果channel表非空,檢查一下channel數(shù)據(jù)中是否僅包含上線事件,如果有則代表有活躍連接,客戶端在線。
     * 如果全部的channel都有掉線斷開事件則客戶端一定不在線。
     *
     * @param clientId
     * @param mqttClientStatusStore
     * @return
     */
    public static boolean checkClientOnline(String clientId,
        MqttClientStatusStore mqttClientStatusStore) {
        Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
        if (channelMap == null) {
            return false;
        }
        for (Set<ClientStatusEvent> events : channelMap.values()) {
            boolean findOnlineEvent = false;
            boolean findOfflineEvent = false;
            for (ClientStatusEvent event : events) {
                if (event.isOnlineEvent()) {
                    findOnlineEvent = true;
                } else {
                    findOfflineEvent = true;
                }
            }
            if (findOnlineEvent & !findOfflineEvent) {
                return true;
            }
        }
        return false;
    }
}

3.結(jié)果驗(yàn)證

  1. 執(zhí)行MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java類中的Main函數(shù)運(yùn)行代碼模擬客戶端上線操作。可以根據(jù)下面操作查詢客戶端狀態(tài)和消息推送情況。

    說明

    可以通過終止Main函數(shù)的執(zhí)行來模擬客戶端的離線操作。

    • 查詢客戶端狀態(tài)。在云消息隊(duì)列 MQTT 版控制臺設(shè)備狀態(tài)查詢頁面,根據(jù)Device ID查詢客戶端此時(shí)已是在線狀態(tài),如下圖所示。

      image

    • 查詢事件消息推送情況。在云消息隊(duì)列 RocketMQ 版控制臺消息查詢頁面,根據(jù)Topic查詢到上線事件消息已經(jīng)被推送過來,如下圖所示。

      image

  2. 執(zhí)行MQTTClientStatusNoticeProcessDemo.java類中的Main函數(shù)運(yùn)行代碼。此時(shí)已經(jīng)收到上線的事件消息,ClientStatus狀態(tài)也從false變成true,如下圖所示。

    image

更多信息