基于MQTT接入的設備靠心跳保活,但心跳是周期性的、且自動收發和超時重連,這些特性給主動檢測設備端是否在線帶來了一定難度。本文提供通過消息收發是否正常判定設備是否在線的原理、流程、實現方式。
原理
如果設備可以發送、接收消息,那么該設備的通信是沒問題的,并且一定在線。
消息收發是物聯網平臺的核心能力。因此,這種判定方法不會因為物聯網平臺架構升級或業務變動而變化,也不會因為設備使用的客戶端不同而不同。是設備端檢測自己是否在線最通用的一種原理。
該原理的一種特殊實現就是設備端消息的自發自收。
流程
- 創建Topic =
/yourProductKey/yourDeviceName/user/checkstatus
。Topic可以自定義,但權限必須為發布和訂閱。
- 設備端訂閱上一步創建的Topic 。
- 設備端發送消息
{"id":123,"version":"1.0","time":1234567890123}
,請一定使用QoS=0 。消息內容可自定義,但建議使用此格式。
參數說明:
字段 類型 說明 id Object 用于驗證收發的消息是否是同一個,請自行業務層保證唯一 version String 版本號固定1.0 time Long 發送消息的時間戳,可以計算消息來回的延時,評估當前的通信質量 - 設備端收到消息上一步發送的消息。
離線判定邏輯
- 嚴格的:發送消息后,5秒內沒有收到消息算失敗,出現1次失敗,判定為離線
- 普通的:發送消息后,5秒內沒有收到消息算失敗,連續2次失敗,判定為離線
- 寬松的:發送消息后,5秒內沒有收到消息算失敗,連續3次失敗,判定為離線
說明 您可以根據自己的情況,自定義離線判定邏輯。
實現
為方便體驗,本例基于Java SDK Demo開發,實現設備端檢測自己是否在線的嚴格判定邏輯。
首先,下載Demo工程,添加本類,并填寫設備證書信息。設備端代碼如下:
import java.io.UnsupportedEncodingException;
import com.aliyun.alink.dm.api.DeviceInfo;
import com.aliyun.alink.dm.api.InitResult;
import com.aliyun.alink.linkkit.api.ILinkKitConnectListener;
import com.aliyun.alink.linkkit.api.IoTMqttClientConfig;
import com.aliyun.alink.linkkit.api.LinkKit;
import com.aliyun.alink.linkkit.api.LinkKitInitParams;
import com.aliyun.alink.linksdk.cmp.connect.channel.MqttPublishRequest;
import com.aliyun.alink.linksdk.cmp.connect.channel.MqttSubscribeRequest;
import com.aliyun.alink.linksdk.cmp.core.base.AMessage;
import com.aliyun.alink.linksdk.cmp.core.base.ARequest;
import com.aliyun.alink.linksdk.cmp.core.base.AResponse;
import com.aliyun.alink.linksdk.cmp.core.base.ConnectState;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSendListener;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSubscribeListener;
import com.aliyun.alink.linksdk.tools.AError;
public class CheckDeviceStatusOnDevice {
// ===================需要用戶填寫的參數,開始===========================
// 產品productKey,設備證書參數之一
private static String productKey = "";
// 設備名字deviceName,設備證書參數之一
private static String deviceName = "";
// 設備密鑰deviceSecret,設備證書參數之一
private static String deviceSecret = "";
// 消息通信的Topic,需要在控制臺定義,權限必須為發布和訂閱
private static String checkStatusTopic = "/" + productKey + "/" + deviceName + "/user/checkstatus";
// ===================需要用戶填寫的參數結束===========================
// 接收到的消息
private static String subInfo = "";
public static void main(String[] args) throws InterruptedException {
CheckDeviceStatusOnDevice device = new CheckDeviceStatusOnDevice();
// 初始化
device.init(productKey, deviceName, deviceSecret);
// 下行數據監聽
device.registerNotifyListener();
// 訂閱Topic
device.subscribe(checkStatusTopic);
// 測試設備狀態
System.out.println("we will check device online status now.");
device.checkStatus();
// 準備測試設備離線狀態,請拔掉網線
System.out.println("pls close network,we will check device offline status after 60 seconds.");
for (int i = 0; i < 6; i++) {
Thread.sleep(10000);
}
device.checkStatus();
}
/**
* 測試設備狀態
*
* @throws InterruptedException
*/
public void checkStatus() throws InterruptedException {
// -------------------------------------------------------------------
// 要發送的消息,可以自定義,建議使用當前格式
// -------------------------------------------------------------------
// Field | Tyep | Desc
// -------------------------------------------------------------------
// id | Object | 用于驗證收發的消息是否是同一個,請自行業務層保證唯一
// -------------------------------------------------------------------
// version | String | 版本號固定1.0
// -------------------------------------------------------------------
// time | Long | 發送消息的時間戳,可以計算消息來回的延時,評估當前的通信質量
// -------------------------------------------------------------------
String payload = "{\"id\":123, \"version\":\"1.0\",\"time\":" + System.currentTimeMillis() + "}";
// 發送消息
publish(checkStatusTopic, payload);
// 嚴格的離線判定邏輯:發送消息后,5秒內沒有收到消息算失敗,出現1次失敗,判定為離線
boolean isTimeout = true;
for (int i = 0; i < 5; i++) {
Thread.sleep(1000);
if (!subInfo.isEmpty()) {
isTimeout = false;
break;
}
}
if (!isTimeout && payload.equals(subInfo)) {
System.out.println("Device is online !!");
} else {
System.out.println("Device is offline !!");
}
// 置空接收到的消息,方便下一次測試
subInfo = "";
}
/**
* 初始化
*
* @param pk productKey
* @param dn deviceName
* @param ds deviceSecret
* @throws InterruptedException
*/
public void init(String pk, String dn, String ds) throws InterruptedException {
LinkKitInitParams params = new LinkKitInitParams();
// 設置 MQTT 初始化參數
IoTMqttClientConfig config = new IoTMqttClientConfig();
config.productKey = pk;
config.deviceName = dn;
config.deviceSecret = ds;
params.mqttClientConfig = config;
// 設置初始化設備證書信息,用戶傳入
DeviceInfo deviceInfo = new DeviceInfo();
deviceInfo.productKey = pk;
deviceInfo.deviceName = dn;
deviceInfo.deviceSecret = ds;
params.deviceInfo = deviceInfo;
LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
@Override
public void onInitDone(InitResult initResult) {
System.out.println("init success !!");
}
@Override
public void onError(AError aError) {
System.out.println("init failed !! code=" + aError.getCode() + ",msg=" + aError.getMsg() + ",subCode="
+ aError.getSubCode() + ",subMsg=" + aError.getSubMsg());
}
});
// 確保初始化成功后才執行后面的步驟,可以根據實際情況適當延長這里的延時
Thread.sleep(2000);
}
/**
* 監聽下行數據
*/
public void registerNotifyListener() {
LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
@Override
public boolean shouldHandle(String connectId, String topic) {
// 只處理特定Topic的消息
if (checkStatusTopic.equals(topic)) {
return true;
} else {
return false;
}
}
@Override
public void onNotify(String connectId, String topic, AMessage aMessage) {
// 接收消息
try {
subInfo = new String((byte[]) aMessage.getData(), "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void onConnectStateChange(String connectId, ConnectState connectState) {
}
});
}
/**
* 發布消息
*
* @param topic 發送消息的Topic
* @param payload 發送的消息內容
*/
public void publish(String topic, String payload) {
MqttPublishRequest request = new MqttPublishRequest();
request.topic = topic;
request.payloadObj = payload;
request.qos = 0;
LinkKit.getInstance().getMqttClient().publish(request, new IConnectSendListener() {
@Override
public void onResponse(ARequest aRequest, AResponse aResponse) {
}
@Override
public void onFailure(ARequest aRequest, AError aError) {
}
});
}
/**
* 訂閱消息
*
* @param topic 訂閱消息的Topic
*/
public void subscribe(String topic) {
MqttSubscribeRequest request = new MqttSubscribeRequest();
request.topic = topic;
LinkKit.getInstance().getMqttClient().subscribe(request, new IConnectSubscribeListener() {
@Override
public void onSuccess() {
}
@Override
public void onFailure(AError aError) {
}
});
}
}
說明 檢測到設備離線后,盡量不要選擇主動重連。
物聯網平臺規定一個設備1分鐘只允許嘗試接入平臺5次,超過會觸發限流,限制設備接入。此時停止接入,等待1分鐘即可解除限制。
設備端應注意退避,切勿觸發限流。