云消息隊列 MQTT 版提供同步查詢和異步上下線事件通知兩種方式,來獲取MQTT客戶端在線狀態。本文介紹這兩種方式的基本原理、應用場景、具體差異以及實現方式。
基本原理
云消息隊列 MQTT 版服務端(下文簡稱為MQTT服務端)提供以下方式獲取客戶端在線狀態:
- 同步查詢
該方式相對簡單,即通過開放的接入點地址調用HTTP/HTTPS方式的OpenAPI查詢某個特定客戶端的當前實時狀態,適用于對單個或多個客戶端的狀態判斷。
- 異步上下線事件通知
該方式使用消息通知,在客戶端上線和下線事件觸發時,MQTT服務端支持將上下線消息直接推送到部署在阿里云服務器上的云端應用。
該方式屬于異步感知客戶端的狀態,且感知到的是上下線事件,而非在線狀態,云端應用需要根據事件發生的時間序列分析出客戶端的狀態。
應用場景
兩種獲取MQTT客戶端在線狀態的方式分別應用于以下場景:
- 同步查詢
- 主業務流程中需要根據客戶端是否在線來決定后續運行邏輯。
- 運維過程需要判斷特定客戶端當前是否在線。
- 異步上下線事件通知
- 服務端需要在客戶端上線或者下線時觸發一些預定義的動作。
- 服務端需要對客戶端的上下線數據進行統計分析,并根據客戶端的在線狀態推送消息。
同步查詢與異步事件通知的差異
兩種查詢方式的區別如下:
- 同步查詢是查詢當前客戶端的實時狀態,理論上比異步通知的方式更精確。
- 異步上下線通知因為采用消息解耦,狀態判斷更加復雜,且誤判可能性更大,但該方法可以基于事件分析多個客戶端的運行狀態軌跡。異步通知雖然存在一定復雜度和誤判,但更加適合大規模的客戶端的狀態統計。
實現方式
- 同步查詢
同步查詢方式可以通過調用以下API接口獲取客戶端狀態:
- 異步上下線事件通知
異步上下線事件通知方式下,云端服務可通過調用云消息隊列 MQTT 版提供的云端SDK獲取客戶端上下線消息。SDK下載,請參見版本說明。
獲取客戶端上下線事件的示例代碼如下:說明 在使用示例代碼前,需要配置環境變量,通過環境變量讀取訪問憑證。關于配置環境變量的方法,請參見配置訪問憑證。云消息隊列 MQTT 版的AccessKey ID和AccessKey Secret的環境變量名稱分別為MQTT_AK_ENV和MQTT_SK_ENV。
package com.aliyun.openservices.lmq.example; import com.alibaba.fastjson.JSONObject; import com.alibaba.mqtt.server.ServerConsumer; import com.alibaba.mqtt.server.callback.StatusListener; import com.alibaba.mqtt.server.config.ChannelConfig; import com.alibaba.mqtt.server.config.ConsumerConfig; import com.alibaba.mqtt.server.model.StatusNotice; public class MQTTClientStatusNoticeProcessDemo { public static void main(String[] args) throws Exception { /** * 您創建的云消息隊列 MQTT 版的實例接入點。 * 獲取云端SDK的接入點,請聯系云消息隊列 MQTT 版技術支持,釘釘群號:35228338。。 * 接入點地址必須填寫分配的域名,不得使用IP地址直接連接,否則可能會導致服務端異常。 */ String domain = "domain"; /** * 使用的協議和端口必須匹配,該參數值固定為5672。 */ int port = 5672; /** * 您創建的云消息隊列 MQTT 版的實例ID。 */ String instanceId = "instanceId"; /** * AccessKey ID,阿里云身份驗證,在阿里云RAM控制臺創建。 * 阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。 * 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。 * 本示例以將AccessKey 和 AccessKeySecret 保存在環境變量為例說明。 */ String accessKey = System.getenv("MQTT_AK_ENV"); /** * AccessKey Secret,阿里云身份驗證,在阿里云RAM控制臺創建。僅在簽名鑒權模式下需要設置。 */ String secretKey = System.getenv("MQTT_SK_ENV"); /** * 您在云消息隊列 MQTT 版控制臺創建的Group的ID。 * */ String mqttGroupId = "mqttGroupId"; ChannelConfig channelConfig = new ChannelConfig(); channelConfig.setDomain(domain); channelConfig.setPort(port); channelConfig.setInstanceId(instanceId); channelConfig.setAccessKey(accessKey); channelConfig.setSecretKey(secretKey); ServerConsumer serverConsumer = new ServerConsumer(channelConfig, new ConsumerConfig()); serverConsumer.start(); serverConsumer.subscribeStatus(mqttGroupId, new StatusListener() { @Override public void process(StatusNotice statusNotice) { System.out.println(JSONObject.toJSONString(statusNotice)); } }); } }