MQTT客戶端上下線事件數據流出
您可通過配置云消息隊列 MQTT 版的客戶端上下線通知規則,將獲取的MQTT客戶端上下線事件數據導出至其他阿里云產品。該方法為異步上下線通知。本文介紹客戶端上下線通知的原理、應用場景、使用限制以及云消息隊列 MQTT 版與其他阿里云產品的資源映射關系。
基本原理
在客戶端上線和下線事件觸發時,MQTT服務器會根據您配置的客戶端上下線通知規則,向后端其他云產品推送一條上下線消息。業務應用一般部署在阿里云的服務器上,業務應用通過向后端云產品訂閱這條消息來獲取所有客戶端的上下線動作。
該方式屬于異步感知客戶端的狀態,且感知到的是上下線事件,而非在線狀態,云端應用需要根據事件發生的時間序列分析出客戶端的狀態。
異步上下線通知因為采用消息解耦,狀態判斷更加復雜,且誤判可能性更大,但該方法可以基于事件分析多個客戶端的運行狀態軌跡。
應用場景
客戶端上下線通知主要的應用場景為業務應用需要在客戶端上線或者下線時觸發一些預定義的動作。
例如,客戶端在線狀態聚合。此場景中,MQTT客戶端產生上下線等狀態變更,云消息隊列 MQTT 版會根據您配置的客戶端狀態通知規則,將狀態變更封裝后轉發到云消息隊列 RocketMQ 版消息的方式來實現客戶端狀態數據的聚合和統計。
針對其他場景,推薦您使用同步查詢接口來獲取客戶端在線狀態。詳細信息,請參見獲取MQTT客戶端在線狀態。
使用限制
限制項 | 限制值 | 說明 |
單實例規則數量 | 100 | 如果默認限制不滿足,請聯系云消息隊列 MQTT 版技術支持,釘釘群號:35228338。 |
規則去重限制 | 同一個內部資源同種規則只能創建一個規則。 | 例如一個Group ID只能創建一個上下線通知規則,一個MQTT Topic只能創建一個數據流入規則和一個數據流出規則。 |
地域限制 | 不支持跨地域創建規則,規則的數據源和數據目標所屬的實例必須處于同一地域。 | 例如,創建數據流出規則,數據源云消息隊列 MQTT 版實例屬于華東1(杭州)地域,則數據目標云消息隊列 RocketMQ 版只能選擇華東1(杭州)地域的實例。 |
云消息隊列 MQTT 版實例版本 | 僅新版本的實例支持。 | 新購的云消息隊列 MQTT 版實例默認為新版本實例,舊版實例已不支持購買。 |
云消息隊列 RocketMQ 版實例版本 | 僅4.0系列實例支持 | 云消息隊列 MQTT 版和云消息隊列 RocketMQ 版通過消息流入或消息流出規則進行數據互通時,云消息隊列 RocketMQ 版僅4.0系列實例支持消息流入或流出規則,5.0系列實例不支持。 |
資源映射方式
同一個云消息隊列 MQTT 版Group ID下的所有客戶端的狀態變更通知都會轉發到您配置的同一個其他阿里云產品的資源里。
表 1. 映射關系
MQTT資源 | 其他阿里云產品 | 其他阿里云產品資源 | 數據包定義 |
MQTT Group ID | 云消息隊列 RocketMQ 版 | 云消息隊列 RocketMQ 版的Topic |
操作流程
如上文所述,如果使用異步上下線通知的方式,您需創建客戶端上下線事件通知規則,將上下線通知的消息導出至后端云產品中。下文以使用云消息隊列 RocketMQ 版后端云產品為例進行說明。
創建上下線通知規則。
您需關注哪些Group ID分組的設備,就在云消息隊列 MQTT 版控制臺創建規則時,選定相應的Group ID。創建規則的詳細步驟,請參見創建上下線通知規則。
業務應用訂閱該類通知消息。
通過步驟1中創建的規則,即可收到關注的客戶端的上下線事件。云消息隊列 RocketMQ 版的接收程序,請參見訂閱消息。示例代碼詳細信息,請參見MQTTClientStatusNoticeProcessDemo.java。
事件類型放在云消息隊列 RocketMQ 版的Tag中,代表上線或下線。數據格式如下:
MQ Tag:connect/disconnect/tcpclean
其中:
connect事件代表客戶端上線動作。
disconnect事件代表客戶端主動斷開連接。按照MQTT協議,客戶端主動斷開TCP連接之前應該發送disconnect 報文,MQTT服務器在收到disconnect 報文后觸發該類型消息。如果某些客戶端SDK沒有按照協議發送disconnect 報文,MQTT服務器相應無法收到該消息。
tcpclean事件代表實際的TCP連接斷開。無論客戶端是否顯示發送過disconnect 報文,只要當前TCP連接斷開就會觸發tcpclean事件。
說明tcpclean消息代表客戶端網絡層連接的真實斷開。對應的,disconnect消息僅僅代表客戶端是主動發送了下線報文。受限于客戶端的實現,有時候客戶端異常退出會導致disconnect消息并沒有正常發送。因此判斷客戶端下線請使用tcpclean事件。
數據內容為JSON類型,相關的Key說明如下:
clientId代表具體設備。
time代表本次事件的時間。
eventType代表事件類型,供客戶端區分事件類型。
channelId代表每個TCP連接的唯一標識。
clientIp代表客戶端使用的公網出口IP地址。
certificateChainSn代表客戶端設備的證書鏈的SN序列號。格式為{設備證書SN,CA證書SN}。
僅當客戶端接入時使用證書認證功能,才包含該參數。更多信息,請參見客戶端證書認證。
示例如下:
clientId:GID_XXX@@@YYYYY time:1212121212 eventType:connect/disconnect/tcpclean channelId:2b9b1281046046faafe5e0b458e4XXXX clientIp:192.168.XX.XX:133XX certificateChainSn:1254685.....2458655,154689536.....25655
判斷客戶端當前是否在線不能僅僅根據收到的最后一條消息的狀態,而需要結合上下線消息的前后關聯來判斷。
具體判斷規則如下:
同一個clientId的客戶端,產生上下線事件的先后順序以時間為準,基本原則為時間戳越大則越新。
同一個clientId的客戶端,可能存在多次閃斷,因此,當收到下線消息時,一定要根據channelId字段判斷是否是當前的TCP連接。簡而言之,下線消息只能覆蓋channelId相同的下線消息,如果下線消息的channelId不一樣,盡管time較新,也不能覆蓋。一個channelId代表一個TCP連接,只會存在一個connect事件和一個close事件。
package com.aliyun.openservices.lmq.example.demo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class MQTTClientStatusNoticeProcessDemo {
public static void main(String[] args) {
/**
* 初始化消息隊列RocketMQ版接收客戶端,實際業務中一般部署在服務端應用中。
*/
Properties properties = new Properties();
/**
* 設置RocketMQ客戶端的Group ID,注意此處的groupId和MQTT實例中的groupId是兩個概念,請按照各自產品的說明申請填寫。
*/
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
/**
* 賬號AccessKey ID,從控制臺獲取。
*/
properties.put(PropertyKeyConst.AccessKey, "XXXX");
/**
* 賬號AccessKey Secret,從控制臺獲取,僅在Signature鑒權模式下需要設置。
*/
properties.put(PropertyKeyConst.SecretKey, "XXXX");
/**
* 設置TCP接入域名。
*/
properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://XXXX");
/**
* 使用RocketMQ消費端來處理MQTT客戶端的上下線通知時,訂閱的Topic為上下線通知Topic,請遵循控制臺文檔提前創建。
*/
final String parentTopic = "GID_XXXX_MQTT";
/**
* 客戶端狀態數據,實際生產環境中建議使用數據庫或者Redis等外部持久化存儲來保存該信息,避免應用重啟丟失狀態,本Demo以單機內存版實現做演示。
*/
MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
Consumer consumer = ONSFactory.createConsumer(properties);
/**
* 此處僅處理客戶端是否在線,因此只需要關注connect事件和tcpclean事件即可。
*/
consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
consumer.start();
String clientId = "GID_XXXXxXX@@@XXXXX";
while (true) {
System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 處理上下線通知的邏輯。
* 實際部署過程中,消費上下線通知的應用可能部署多臺機器,因此客戶端在線狀態的數據可以使用數據庫或者Redis等外部共享存儲來維護。
* 其次需要單獨做消息冪等處理,以免重復接收消息導致狀態機判斷錯誤。
*/
static class MqttClientStatusNoticeListener implements MessageListener {
private MqttClientStatusStore mqttClientStatusStore;
public MqttClientStatusNoticeListener(
MqttClientStatusStore mqttClientStatusStore) {
this.mqttClientStatusStore = mqttClientStatusStore;
}
@Override
public Action consume(Message message, ConsumeContext context) {
try {
JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
System.out.println(msgBody);
String eventType = msgBody.getString("eventType");
String clientId = msgBody.getString("clientId");
String channelId = msgBody.getString("channelId");
ClientStatusEvent event = new ClientStatusEvent();
event.setChannelId(channelId);
event.setClientIp(msgBody.getString("clientIp"));
event.setEventType(eventType);
event.setTime(msgBody.getLong("time"));
/**
* 首先存儲新的事件。
*/
mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
/**
* 讀取當前channel的事件列表。
*/
Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
if (events == null || events.isEmpty()) {
return Action.CommitMessage;
}
/**
* 如果事件列表里上線和下線事件都已經收到,則當前channel已經掉線,可以清理掉這個channel的數據。
*/
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent clientStatusEvent : events) {
if (clientStatusEvent.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent && findOfflineEvent) {
mqttClientStatusStore.deleteEvent(clientId, channelId);
}
return Action.CommitMessage;
} catch (Throwable e) {
e.printStackTrace();
}
return Action.ReconsumeLater;
}
}
/**
* 根據狀態表判斷一個clientId是否有活躍的TCP連接。
* 1.如果沒有channel表,則一定不在線。
* 2.如果channel表非空,檢查一下channel數據中是否僅包含上線事件,如果有則代表有活躍連接在線。
* 如果全部的channel都有掉線斷開事件則一定是不在線。
*
* @param clientId
* @param mqttClientStatusStore
* @return
*/
public static boolean checkClientOnline(String clientId,
MqttClientStatusStore mqttClientStatusStore) {
Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
if (channelMap == null) {
return false;
}
for (Set<ClientStatusEvent> events : channelMap.values()) {
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent event : events) {
if (event.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent & !findOfflineEvent) {
return true;
}
}
return false;
}
}
更多信息
如需了解控制臺上的操作,請參見上下線通知規則管理。