MQTT上下線通知規(guī)則的實(shí)現(xiàn)
云消息隊(duì)列 MQTT 版支持通過創(chuàng)建上下線通知規(guī)則,將在線狀態(tài)通知消息推送至云消息隊(duì)列 RocketMQ 版,以獲取客戶端的在線狀態(tài)。本文為您介紹如何通過創(chuàng)建上下線通知規(guī)則來獲取客戶端的在線狀態(tài)。
背景信息
在實(shí)際業(yè)務(wù)中服務(wù)端需要對客戶端的上下線數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析,并根據(jù)客戶端的在線狀態(tài)推送消息。云消息隊(duì)列 MQTT 版提供異步上下線事件通知獲取客戶端在線狀態(tài),MQTT客戶端的上下線事件將會觸發(fā)MQTT服務(wù)端生成一條通知消息,獲取通知消息有以下方式:
通過云端SDK接入云消息隊(duì)列 MQTT 版服務(wù)端獲取客戶端的在線狀態(tài)。更多信息,請參見獲取MQTT客戶端在線狀態(tài)。
通過創(chuàng)建上下線通知規(guī)則,將在線狀態(tài)通知消息推送至云消息隊(duì)列 RocketMQ 版,然后訂閱云消息隊(duì)列 RocketMQ 版獲取客戶端的在線狀態(tài)。
本文以創(chuàng)建上下線通知規(guī)則為例,介紹后端應(yīng)用如何獲取客戶端在線狀態(tài)。
網(wǎng)絡(luò)訪問
- 公網(wǎng)接入點(diǎn)為本地公網(wǎng)環(huán)境訪問的IP地址,一般用于物聯(lián)網(wǎng)和移動(dòng)互聯(lián)網(wǎng)場景中;
- VPC 接入點(diǎn)為云上私網(wǎng)訪問的IP地址,一般用于云端應(yīng)用接入云消息隊(duì)列 MQTT 版。
- 客戶端不使用域名接入而是使用IP地址接入,產(chǎn)品方更新了域名解析導(dǎo)致原有IP地址失效。
- 客戶端網(wǎng)絡(luò)對IP地址設(shè)置網(wǎng)絡(luò)防火墻策略,產(chǎn)品方更新了域名解析后新IP地址被您的防火墻策略攔截。
前提條件
- 安裝IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA為例。
- 下載安裝JDK。
已創(chuàng)建云消息隊(duì)列 MQTT 版實(shí)例、Topic和Group ID,具體操作,請參見創(chuàng)建資源。
已創(chuàng)建云消息隊(duì)列 RocketMQ 版實(shí)例、Topic和Group ID,具體操作,請參見步驟二:創(chuàng)建資源。
云消息隊(duì)列 MQTT 版客戶端上下線通知規(guī)則僅支持云消息隊(duì)列 RocketMQ 版4.x系列實(shí)例。
云消息隊(duì)列 MQTT 版客戶端上下線通知規(guī)則不能跨地域使用,因此,云消息隊(duì)列 MQTT 版和云消息隊(duì)列 RocketMQ 版的資源都必須創(chuàng)建在同一地域。
1.創(chuàng)建上下線通知規(guī)則
登錄云消息隊(duì)列 MQTT 版控制臺,并在左側(cè)導(dǎo)航欄單擊實(shí)例列表。
在頂部菜單欄選擇目標(biāo)地域,然后在實(shí)例列表中單擊實(shí)例名稱進(jìn)入實(shí)例詳情頁面。
在左側(cè)導(dǎo)航欄單擊規(guī)則管理,然后在頁面左上角,單擊創(chuàng)建規(guī)則。
在創(chuàng)建規(guī)則頁面完成以下操作。
配置基本信息。輸入規(guī)則ID,選擇上下線通知的規(guī)則類型。
配置規(guī)則源。選擇已經(jīng)創(chuàng)建好的云消息隊(duì)列 MQTT 版的Group ID。
配置規(guī)則目標(biāo)。選擇已經(jīng)創(chuàng)建好的云消息隊(duì)列 RocketMQ 版的實(shí)例和Topic。
2.準(zhǔn)備測試代碼
客戶端的上下線操作,上下線通知的消息處理都需要代碼完成,本文以Java的Demo示例作為您代碼開發(fā)的參考。
2.1下載示例代碼
下載mqtt-java-demo,并解壓該Demo工程包至您指定的文件夾。
在解壓的Demo工程中找到lmq-java-demo文件夾,將此文件夾導(dǎo)入IntelliJ IDEA,并確認(rèn)pom.xml中已包含以下依賴。
<dependencies> <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcprov-jdk15on</artifactId> <version>1.70</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.5.Final</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-onsmqtt</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>aliyun-java-sdk-core</artifactId> <version>4.5.0</version> </dependency> </dependencies>
配置訪問憑證。
獲取AccessKey信息。獲取方式,請參見創(chuàng)建AccessKey。
配置環(huán)境變量。云消息隊(duì)列 MQTT 版的AccessKey ID和AccessKey Secret的環(huán)境變量名稱分別為MQTT_AK_ENV和MQTT_SK_ENV。關(guān)于配置環(huán)境變量的方法,請參見配置訪問憑證。
2.2客戶端上下線代碼
在MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java
類中,按代碼注釋說明填寫云消息隊(duì)列 MQTT 版資源的參數(shù)。
測試中僅模擬上下線的操作不需要發(fā)送消息,可以將其中發(fā)送消息的相關(guān)代碼去掉,示例代碼如下。
import com.aliyun.openservices.lmq.example.util.ConnectionOptionWrapper;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {
public static void main(String[] args) throws Exception {
/**
* MQ4IOT 實(shí)例 ID,購買后控制臺獲取
*/
String instanceId = "XXXXX";
/**
* 接入點(diǎn)地址,購買 MQ4IOT 實(shí)例,且配置完成后即可獲取,接入點(diǎn)地址必須填寫分配的域名,不得使用 IP 地址直接連接,否則可能會導(dǎo)致客戶端異常。
*/
String endPoint = "XXXXX.mqtt.aliyuncs.com";
/**
* 賬號 AccessKey,從賬號系統(tǒng)控制臺獲取
* 阿里云賬號AccessKey擁有所有API的訪問權(quán)限,建議您使用RAM用戶進(jìn)行API訪問或日常運(yùn)維。
* 強(qiáng)烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號下所有資源的安全。
* 本示例以把AccessKey ID和AccessKey Secret保存在環(huán)境變量為例說明。運(yùn)行本代碼示例之前,請先配置環(huán)境變量MQTT_AK_ENV和MQTT_SK_ENV
* 例如:export MQTT_AK_ENV=<access_key_id>
* export MQTT_SK_ENV=<access_key_secret>
* 需要將<access_key_id>替換為已準(zhǔn)備好的AccessKey ID,<access_key_secret>替換為AccessKey Secret。
*/
String accessKey = System.getenv("MQTT_AK_ENV");
/**
* 賬號 secretKey,從賬號系統(tǒng)控制臺獲取,僅在Signature鑒權(quán)模式下需要設(shè)置
*/
String secretKey = System.getenv("MQTT_SK_ENV");
/**
* MQ4IOT clientId,由業(yè)務(wù)系統(tǒng)分配,需要保證每個(gè) tcp 連接都不一樣,保證全局唯一,如果不同的客戶端對象(tcp 連接)使用了相同的 clientId 會導(dǎo)致連接異常斷開。
* clientId 由兩部分組成,格式為 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制臺申請,DeviceId 由業(yè)務(wù)方自己設(shè)置,clientId 總長度不得超過64個(gè)字符。
*/
String clientId = "GID_XXXXX@@@XXXXX";
ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
final MemoryPersistence memoryPersistence = new MemoryPersistence();
/**
* 客戶端使用的協(xié)議和端口必須匹配,具體參考文檔 http://bestwisewords.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB
* 如果是 SSL 加密則設(shè)置ssl://endpoint:8883
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
/**
* 客戶端設(shè)置好發(fā)送超時(shí)時(shí)間,防止無限阻塞
*/
mqttClient.setTimeToWait(5000);
final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
mqttClient.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("connect success");
}
@Override
public void connectionLost(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
}
});
mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
Thread.sleep(Long.MAX_VALUE);
}
}
2.3上下線通知消息的處理代碼
上下線通知消息被推送到云消息隊(duì)列 RocketMQ 版,消費(fèi)者訂閱消息后需要根據(jù)業(yè)務(wù)需求進(jìn)行處理。
在MQTTClientStatusNoticeProcessDemo.java
類中按代碼注釋說明填寫云消息隊(duì)列 RocketMQ 版資源的參數(shù),示例代碼如下。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
public class MQTTClientStatusNoticeProcessDemo {
public static void main(String[] args) {
/**
* 初始化消息隊(duì)列RocketMQ版接收客戶端,實(shí)際業(yè)務(wù)中一般部署在服務(wù)端應(yīng)用中。
*/
Properties properties = new Properties();
/**
* 設(shè)置消息隊(duì)列RocketMQ版Group ID,在消息隊(duì)列RocketMQ版控制臺創(chuàng)建。
*/
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_XXXX");
/**
* AccessKey ID,阿里云身份驗(yàn)證,在阿里云RAM控制臺創(chuàng)建。
* 阿里云賬號AccessKey擁有所有API的訪問權(quán)限,建議您使用RAM用戶進(jìn)行API訪問或日常運(yùn)維。
* 強(qiáng)烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號下所有資源的安全。
* 本示例以將AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。
*/
properties.put(PropertyKeyConst.AccessKey, System.getenv("MQTT_AK_ENV"));
/**
* AccessKey Secret,阿里云身份驗(yàn)證,在阿里云RAM控制臺創(chuàng)建。僅在簽名鑒權(quán)模式下需要設(shè)置。
*/
properties.put(PropertyKeyConst.SecretKey, System.getenv("MQTT_SK_ENV"));
/**
* 設(shè)置TCP接入點(diǎn),該接入點(diǎn)為消息隊(duì)列RocketMQ版實(shí)例的接入點(diǎn)。進(jìn)入消息隊(duì)列RocketMQ版控制臺實(shí)例詳情頁面獲取。
*/
properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXXX");
/**
* 使用消息隊(duì)列RocketMQ版消費(fèi)端來處理MQTT客戶端的上下線通知時(shí),訂閱的Topic為上下線通知Topic。
*/
final String parentTopic = "GID_XXXX_MQTT";
/**
* 客戶端狀態(tài)數(shù)據(jù),實(shí)際生產(chǎn)環(huán)境中建議使用數(shù)據(jù)庫或者Redis等外部持久化存儲來保存該信息,避免應(yīng)用重啟丟失狀態(tài),本示例以單機(jī)內(nèi)存版實(shí)現(xiàn)為例。
*/
MqttClientStatusStore mqttClientStatusStore = new MemoryHashMapStoreImpl();
Consumer consumer = ONSFactory.createConsumer(properties);
/**
* 此處僅處理客戶端是否在線,因此只需要關(guān)注connect事件和tcpclean事件即可。
*/
consumer.subscribe(parentTopic, "connect||tcpclean", new MqttClientStatusNoticeListener(mqttClientStatusStore));
consumer.start();
String clientId = "GID_XXXXX@@@XXXXX";
while (true) {
System.out.println("ClientStatus :" + checkClientOnline(clientId, mqttClientStatusStore));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 處理上下線通知的邏輯。
* 實(shí)際部署過程中,消費(fèi)上下線通知的應(yīng)用可能部署多臺機(jī)器,因此客戶端在線狀態(tài)的數(shù)據(jù)可以使用數(shù)據(jù)庫或者Redis等外部共享存儲來維護(hù)。
* 其次需要單獨(dú)做消息冪等處理,以免重復(fù)接收消息導(dǎo)致狀態(tài)機(jī)判斷錯(cuò)誤。
*/
static class MqttClientStatusNoticeListener implements MessageListener {
private MqttClientStatusStore mqttClientStatusStore;
public MqttClientStatusNoticeListener(
MqttClientStatusStore mqttClientStatusStore) {
this.mqttClientStatusStore = mqttClientStatusStore;
}
@Override
public Action consume(Message message, ConsumeContext context) {
try {
JSONObject msgBody = JSON.parseObject(new String(message.getBody()));
System.out.println(msgBody);
String eventType = msgBody.getString("eventType");
String clientId = msgBody.getString("clientId");
String channelId = msgBody.getString("channelId");
ClientStatusEvent event = new ClientStatusEvent();
event.setChannelId(channelId);
event.setClientIp(msgBody.getString("clientIp"));
event.setEventType(eventType);
event.setTime(msgBody.getLong("time"));
/**
* 首先存儲新的事件。
*/
mqttClientStatusStore.addEvent(clientId, channelId, eventType, event);
/**
* 讀取當(dāng)前channel的事件列表。
*/
Set<ClientStatusEvent> events = mqttClientStatusStore.getEvent(clientId, channelId);
if (events == null || events.isEmpty()) {
return Action.CommitMessage;
}
/**
* 如果事件列表里上線和下線事件都已經(jīng)收到,則當(dāng)前channel已經(jīng)掉線,可以清理掉這個(gè)channel的數(shù)據(jù)。
*/
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent clientStatusEvent : events) {
if (clientStatusEvent.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent && findOfflineEvent) {
mqttClientStatusStore.deleteEvent(clientId, channelId);
}
return Action.CommitMessage;
} catch (Throwable e) {
e.printStackTrace();
}
return Action.ReconsumeLater;
}
}
/**
* 根據(jù)狀態(tài)表判斷一個(gè)clientId是否有活躍的TCP連接。
* 1.如果沒有channel表,則客戶端一定不在線。
* 2.如果channel表非空,檢查一下channel數(shù)據(jù)中是否僅包含上線事件,如果有則代表有活躍連接,客戶端在線。
* 如果全部的channel都有掉線斷開事件則客戶端一定不在線。
*
* @param clientId
* @param mqttClientStatusStore
* @return
*/
public static boolean checkClientOnline(String clientId,
MqttClientStatusStore mqttClientStatusStore) {
Map<String, Set<ClientStatusEvent>> channelMap = mqttClientStatusStore.getEventsByClientId(clientId);
if (channelMap == null) {
return false;
}
for (Set<ClientStatusEvent> events : channelMap.values()) {
boolean findOnlineEvent = false;
boolean findOfflineEvent = false;
for (ClientStatusEvent event : events) {
if (event.isOnlineEvent()) {
findOnlineEvent = true;
} else {
findOfflineEvent = true;
}
}
if (findOnlineEvent & !findOfflineEvent) {
return true;
}
}
return false;
}
}
3.結(jié)果驗(yàn)證
執(zhí)行
MQ4IoTSendMessageToMQ4IoTUseSignatureMode.java
類中的Main函數(shù)運(yùn)行代碼模擬客戶端上線操作。可以根據(jù)下面操作查詢客戶端狀態(tài)和消息推送情況。說明可以通過終止Main函數(shù)的執(zhí)行來模擬客戶端的離線操作。
查詢客戶端狀態(tài)。在云消息隊(duì)列 MQTT 版控制臺設(shè)備狀態(tài)查詢頁面,根據(jù)Device ID查詢客戶端此時(shí)已是在線狀態(tài),如下圖所示。
查詢事件消息推送情況。在云消息隊(duì)列 RocketMQ 版控制臺消息查詢頁面,根據(jù)Topic查詢到上線事件消息已經(jīng)被推送過來,如下圖所示。
執(zhí)行
MQTTClientStatusNoticeProcessDemo.java
類中的Main函數(shù)運(yùn)行代碼。此時(shí)已經(jīng)收到上線的事件消息,ClientStatus
狀態(tài)也從false
變成true
,如下圖所示。