MQTT客戶端上下線事件數(shù)據(jù)流出
您可通過配置云消息隊列 MQTT 版的客戶端上下線通知規(guī)則,將獲取的MQTT客戶端上下線事件數(shù)據(jù)導出至其他阿里云產(chǎn)品。該方法為異步上下線通知。本文介紹客戶端上下線通知的原理、應(yīng)用場景、使用限制以及云消息隊列 MQTT 版與其他阿里云產(chǎn)品的資源映射關(guān)系。
基本原理
在客戶端上線和下線事件觸發(fā)時,MQTT服務(wù)器會根據(jù)您配置的客戶端上下線通知規(guī)則,向后端其他云產(chǎn)品推送一條上下線消息。業(yè)務(wù)應(yīng)用一般部署在阿里云的服務(wù)器上,業(yè)務(wù)應(yīng)用通過向后端云產(chǎn)品訂閱這條消息來獲取所有客戶端的上下線動作。
該方式屬于異步感知客戶端的狀態(tài),且感知到的是上下線事件,而非在線狀態(tài),云端應(yīng)用需要根據(jù)事件發(fā)生的時間序列分析出客戶端的狀態(tài)。
異步上下線通知因為采用消息解耦,狀態(tài)判斷更加復雜,且誤判可能性更大,但該方法可以基于事件分析多個客戶端的運行狀態(tài)軌跡。
應(yīng)用場景
客戶端上下線通知主要的應(yīng)用場景為業(yè)務(wù)應(yīng)用需要在客戶端上線或者下線時觸發(fā)一些預定義的動作。
例如,客戶端在線狀態(tài)聚合。此場景中,MQTT客戶端產(chǎn)生上下線等狀態(tài)變更,云消息隊列 MQTT 版會根據(jù)您配置的客戶端狀態(tài)通知規(guī)則,將狀態(tài)變更封裝后轉(zhuǎn)發(fā)到云消息隊列 RocketMQ 版消息的方式來實現(xiàn)客戶端狀態(tài)數(shù)據(jù)的聚合和統(tǒng)計。
針對其他場景,推薦您使用同步查詢接口來獲取客戶端在線狀態(tài)。詳細信息,請參見獲取MQTT客戶端在線狀態(tài)。
使用限制
限制項 | 限制值 | 說明 |
單實例規(guī)則數(shù)量 | 100 | 如果默認限制不滿足,請聯(lián)系云消息隊列 MQTT 版技術(shù)支持,釘釘群號:35228338。 |
規(guī)則去重限制 | 同一個內(nèi)部資源同種規(guī)則只能創(chuàng)建一個規(guī)則。 | 例如一個Group ID只能創(chuàng)建一個上下線通知規(guī)則,一個MQTT Topic只能創(chuàng)建一個數(shù)據(jù)流入規(guī)則和一個數(shù)據(jù)流出規(guī)則。 |
地域限制 | 不支持跨地域創(chuàng)建規(guī)則,規(guī)則的數(shù)據(jù)源和數(shù)據(jù)目標所屬的實例必須處于同一地域。 | 例如,創(chuàng)建數(shù)據(jù)流出規(guī)則,數(shù)據(jù)源云消息隊列 MQTT 版實例屬于華東1(杭州)地域,則數(shù)據(jù)目標云消息隊列 RocketMQ 版只能選擇華東1(杭州)地域的實例。 |
云消息隊列 MQTT 版實例版本 | 僅新版本的實例支持。 | 新購的云消息隊列 MQTT 版實例默認為新版本實例,舊版實例已不支持購買。 |
云消息隊列 RocketMQ 版實例版本 | 僅4.0系列實例支持 | 云消息隊列 MQTT 版和云消息隊列 RocketMQ 版通過消息流入或消息流出規(guī)則進行數(shù)據(jù)互通時,云消息隊列 RocketMQ 版僅4.0系列實例支持消息流入或流出規(guī)則,5.0系列實例不支持。 |
資源映射方式
同一個云消息隊列 MQTT 版Group ID下的所有客戶端的狀態(tài)變更通知都會轉(zhuǎn)發(fā)到您配置的同一個其他阿里云產(chǎn)品的資源里。
表 1. 映射關(guān)系
MQTT資源 | 其他阿里云產(chǎn)品 | 其他阿里云產(chǎn)品資源 | 數(shù)據(jù)包定義 |
MQTT Group ID | 云消息隊列 RocketMQ 版 | 云消息隊列 RocketMQ 版的Topic |
操作流程
如上文所述,如果使用異步上下線通知的方式,您需創(chuàng)建客戶端上下線事件通知規(guī)則,將上下線通知的消息導出至后端云產(chǎn)品中。下文以使用云消息隊列 RocketMQ 版后端云產(chǎn)品為例進行說明。
創(chuàng)建上下線通知規(guī)則。
您需關(guān)注哪些Group ID分組的設(shè)備,就在云消息隊列 MQTT 版控制臺創(chuàng)建規(guī)則時,選定相應(yīng)的Group ID。創(chuàng)建規(guī)則的詳細步驟,請參見創(chuàng)建上下線通知規(guī)則。
業(yè)務(wù)應(yīng)用訂閱該類通知消息。
通過步驟1中創(chuàng)建的規(guī)則,即可收到關(guān)注的客戶端的上下線事件。云消息隊列 RocketMQ 版的接收程序,請參見訂閱消息。示例代碼詳細信息,請參見MQTTClientStatusNoticeProcessDemo.java。
事件類型放在云消息隊列 RocketMQ 版的Tag中,代表上線或下線。數(shù)據(jù)格式如下:
MQ Tag:connect/disconnect/tcpclean
其中:
connect事件代表客戶端上線動作。
disconnect事件代表客戶端主動斷開連接。按照MQTT協(xié)議,客戶端主動斷開TCP連接之前應(yīng)該發(fā)送disconnect 報文,MQTT服務(wù)器在收到disconnect 報文后觸發(fā)該類型消息。如果某些客戶端SDK沒有按照協(xié)議發(fā)送disconnect 報文,MQTT服務(wù)器相應(yīng)無法收到該消息。
tcpclean事件代表實際的TCP連接斷開。無論客戶端是否顯示發(fā)送過disconnect 報文,只要當前TCP連接斷開就會觸發(fā)tcpclean事件。
說明tcpclean消息代表客戶端網(wǎng)絡(luò)層連接的真實斷開。對應(yīng)的,disconnect消息僅僅代表客戶端是主動發(fā)送了下線報文。受限于客戶端的實現(xiàn),有時候客戶端異常退出會導致disconnect消息并沒有正常發(fā)送。因此判斷客戶端下線請使用tcpclean事件。
數(shù)據(jù)內(nèi)容為JSON類型,相關(guān)的Key說明如下:
clientId代表具體設(shè)備。
time代表本次事件的時間。
eventType代表事件類型,供客戶端區(qū)分事件類型。
channelId代表每個TCP連接的唯一標識。
clientIp代表客戶端使用的公網(wǎng)出口IP地址。
certificateChainSn代表客戶端設(shè)備的證書鏈的SN序列號。格式為{設(shè)備證書SN,CA證書SN}。
僅當客戶端接入時使用證書認證功能,才包含該參數(shù)。更多信息,請參見客戶端證書認證。
示例如下:
clientId:GID_XXX@@@YYYYY time:1212121212 eventType:connect/disconnect/tcpclean channelId:2b9b1281046046faafe5e0b458e4XXXX clientIp:192.168.XX.XX:133XX certificateChainSn:1254685.....2458655,154689536.....25655
判斷客戶端當前是否在線不能僅僅根據(jù)收到的最后一條消息的狀態(tài),而需要結(jié)合上下線消息的前后關(guān)聯(lián)來判斷。
具體判斷規(guī)則如下:
同一個clientId的客戶端,產(chǎn)生上下線事件的先后順序以時間為準,基本原則為時間戳越大則越新。
同一個clientId的客戶端,可能存在多次閃斷,因此,當收到下線消息時,一定要根據(jù)channelId字段判斷是否是當前的TCP連接。簡而言之,下線消息只能覆蓋channelId相同的下線消息,如果下線消息的channelId不一樣,盡管time較新,也不能覆蓋。一個channelId代表一個TCP連接,只會存在一個connect事件和一個close事件。
package com.aliyun.openservices.lmq.example.demo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class MQTTClientStatusNoticeProcessDemo {
public static void main(String[] args) {
/**
* 初始化消息隊列RocketMQ版接收客戶端,實際業(yè)務(wù)中一般部署在服務(wù)端應(yīng)用中。
*/
Properties properties = new Properties();
/**
* 設(shè)置RocketMQ客戶端的Group ID,注意此處的groupId和MQTT實例中的groupId是兩個概念,請按照各自產(chǎn)品的說明申請?zhí)顚憽? */
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
/**
* 賬號AccessKey ID,從控制臺獲取。
*/
properties.put(PropertyKeyConst.AccessKey, "XXXX");
/**
* 賬號AccessKey Secret,從控制臺獲取,僅在Signature鑒權(quán)模式下需要設(shè)置。
*/
properties.put(PropertyKeyConst.SecretKey, "XXXX");
/**
* 設(shè)置TCP接入域名。
*/
properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://XXXX");
/**
* 使用RocketMQ消費端來處理MQTT客戶端的上下線通知時,訂閱的Topic為上下線通知Topic,請遵循控制臺文檔提前創(chuàng)建。
*/
final String parentTopic = "GID_XXXX_MQTT";
/**
* 客戶端狀態(tài)數(shù)據(jù),實際生產(chǎn)環(huán)境中建議使用數(shù)據(jù)庫或者Redis等外部持久化存儲來保存該信息,避免應(yīng)用重啟丟失狀態(tài),本Demo以單機內(nèi)存版實現(xiàn)做演示。
*/
MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
Consumer consumer = ONSFactory.createConsumer(properties);
/**
* 此處僅處理客戶端是否在線,因此只需要關(guān)注connect事件和tcpclean事件即可。
*/
consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
consumer.start();
String clientId = "GID_XXXXxXX@@@XXXXX";
while (true) {
System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 處理上下線通知的邏輯。
* 實際部署過程中,消費上下線通知的應(yīng)用可能部署多臺機器,因此客戶端在線狀態(tài)的數(shù)據(jù)可以使用數(shù)據(jù)庫或者Redis等外部共享存儲來維護。
* 其次需要單獨做消息冪等處理,以免重復接收消息導致狀態(tài)機判斷錯誤。
*/
static class MqttClientStatusNoticeListener implements MessageListener {
private MqttClientStatusStore mqttClientStatusStore;
public MqttClientStatusNoticeListener(
MqttClientStatusStore mqttClientStatusStore) {
this.mqttClientStatusStore = mqttClientStatusStore;
}
@Override
public Action consume(Message message, ConsumeContext context) {
try {
JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
System.out.println(msgBody);
String eventType = msgBody.getString("eventType");
String clientId = msgBody.getString("clientId");
String channelId = msgBody.getString("channelId");
ClientStatusEvent event = new ClientStatusEvent();
event.setChannelId(channelId);
event.setClientIp(msgBody.getString("clientIp"));
event.setEventType(eventType);
event.setTime(msgBody.getLong("time"));
/**
* 首先存儲新的事件。
*/
mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
/**
* 讀取當前channel的事件列表。
*/
Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
if (events == null || events.isEmpty()) {
return Action.CommitMessage;
}
/**
* 如果事件列表里上線和下線事件都已經(jīng)收到,則當前channel已經(jīng)掉線,可以清理掉這個channel的數(shù)據(jù)。
*/
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent clientStatusEvent : events) {
if (clientStatusEvent.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent && findOfflineEvent) {
mqttClientStatusStore.deleteEvent(clientId, channelId);
}
return Action.CommitMessage;
} catch (Throwable e) {
e.printStackTrace();
}
return Action.ReconsumeLater;
}
}
/**
* 根據(jù)狀態(tài)表判斷一個clientId是否有活躍的TCP連接。
* 1.如果沒有channel表,則一定不在線。
* 2.如果channel表非空,檢查一下channel數(shù)據(jù)中是否僅包含上線事件,如果有則代表有活躍連接在線。
* 如果全部的channel都有掉線斷開事件則一定是不在線。
*
* @param clientId
* @param mqttClientStatusStore
* @return
*/
public static boolean checkClientOnline(String clientId,
MqttClientStatusStore mqttClientStatusStore) {
Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
if (channelMap == null) {
return false;
}
for (Set<ClientStatusEvent> events : channelMap.values()) {
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent event : events) {
if (event.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent & !findOfflineEvent) {
return true;
}
}
return false;
}
}
更多信息
如需了解控制臺上的操作,請參見上下線通知規(guī)則管理。