服務(wù)端檢測(cè)設(shè)備是否在線
本文提供通過(guò)使用RRPC來(lái)判定設(shè)備是否在線的原理、流程、實(shí)現(xiàn)方式。
背景信息
基于MQTT接入的設(shè)備靠心跳保活,但心跳具有周期性、自動(dòng)收發(fā)包和超時(shí)重連等特性,這些特性給主動(dòng)檢測(cè)設(shè)備端是否在線帶來(lái)了一定難度。服務(wù)端雖提供了GetDeviceStatus和BatchGetDeviceState接口查詢?cè)O(shè)備狀態(tài),但是調(diào)用API獲取設(shè)備狀態(tài)是基于會(huì)話實(shí)現(xiàn)的,會(huì)話保活是建立在心跳基礎(chǔ)上的。
原理
如果設(shè)備可以接收服務(wù)端下發(fā)的消息并作出響應(yīng),則該設(shè)備通信正常,設(shè)備一定在線。
消息收發(fā)是物聯(lián)網(wǎng)平臺(tái)的核心能力。因此,這種判定方法不會(huì)因?yàn)槲锫?lián)網(wǎng)平臺(tái)架構(gòu)升級(jí)或業(yè)務(wù)變動(dòng)而變化,也不會(huì)因?yàn)樵O(shè)備使用的客戶端不同而不同。這是服務(wù)端檢測(cè)設(shè)備是否在線最通用的一種原理。
物聯(lián)網(wǎng)平臺(tái)提供的RRPC能力就是該原理的一種特殊實(shí)現(xiàn)。
流程
設(shè)備端訂閱RRPC請(qǐng)求的Topic是
/sys/${yourProductKey}/${yourDeviceName}/rrpc/request/+
。服務(wù)端調(diào)用RRpc接口發(fā)送指令,例如
{"id":123,"version":"1.0","time":1234567890123}
。消息內(nèi)容可自定義,但建議使用此格式。
參數(shù)說(shuō)明如下表。
字段
類(lèi)型
說(shuō)明
id
Object
消息ID,用于驗(yàn)證收發(fā)的消息是否是同一條。請(qǐng)自行在業(yè)務(wù)層生成,并保證其唯一性。
version
String
版本號(hào)。目前版本號(hào)固定為1.0。
time
Long
發(fā)送消息的時(shí)間戳,可以用于計(jì)算消息來(lái)回的延時(shí),評(píng)估當(dāng)前的通信質(zhì)量。
設(shè)備端接收指令并響應(yīng)RRPC請(qǐng)求。接收的消息格式:
{"id":123,"version":"1.0","time":1234567890123}
離線判定邏輯如下。
嚴(yán)格:發(fā)送消息后,5秒內(nèi)沒(méi)有收到消息算失敗,出現(xiàn)1次失敗,判定為離線。
普通:發(fā)送消息后,5秒內(nèi)沒(méi)有收到消息算失敗,連續(xù)2次失敗,判定為離線。
寬松:發(fā)送消息后,5秒內(nèi)沒(méi)有收到消息算失敗,連續(xù)3次失敗,判定為離線。
說(shuō)明您可以根據(jù)自己的情況,自定義離線判定邏輯。
實(shí)現(xiàn)
為方便體驗(yàn),本例基于設(shè)備端Java SDK Demo和服務(wù)端Java SDK Demo開(kāi)發(fā)。
您可以根據(jù)實(shí)際情況,選擇不同的設(shè)備端SDK和服務(wù)端SDK進(jìn)行開(kāi)發(fā)。RRpc接口的參數(shù)說(shuō)明,請(qǐng)參見(jiàn)RRpc。
分別下載Demo工程,服務(wù)端添加CheckDeviceStatusOnServer類(lèi),設(shè)備端添加Device類(lèi),填寫(xiě)您的阿里云賬號(hào)AccessKey信息和設(shè)備證書(shū)信息。
設(shè)備端代碼如下:
import java.io.UnsupportedEncodingException;
import com.aliyun.alink.dm.api.DeviceInfo;
import com.aliyun.alink.dm.api.InitResult;
import com.aliyun.alink.linkkit.api.ILinkKitConnectListener;
import com.aliyun.alink.linkkit.api.IoTMqttClientConfig;
import com.aliyun.alink.linkkit.api.LinkKit;
import com.aliyun.alink.linkkit.api.LinkKitInitParams;
import com.aliyun.alink.linksdk.cmp.connect.channel.MqttPublishRequest;
import com.aliyun.alink.linksdk.cmp.connect.channel.MqttSubscribeRequest;
import com.aliyun.alink.linksdk.cmp.core.base.AMessage;
import com.aliyun.alink.linksdk.cmp.core.base.ARequest;
import com.aliyun.alink.linksdk.cmp.core.base.AResponse;
import com.aliyun.alink.linksdk.cmp.core.base.ConnectState;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSendListener;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSubscribeListener;
import com.aliyun.alink.linksdk.tools.AError;
public class Device {
// ===================需要用戶填寫(xiě)的參數(shù),開(kāi)始===========================
// 產(chǎn)品productKey,設(shè)備證書(shū)參數(shù)之一
private static String productKey = "";
// 設(shè)備名字deviceName,設(shè)備證書(shū)參數(shù)之一
private static String deviceName = "";
// 設(shè)備密鑰deviceSecret,設(shè)備證書(shū)參數(shù)之一
private static String deviceSecret = "";
// 消息通信的Topic,無(wú)需創(chuàng)建和定義,直接使用即可
private static String rrpcTopic = "/sys/" + productKey + "/" + deviceName + "/rrpc/request/+";
// ===================需要用戶填寫(xiě)的參數(shù),結(jié)束===========================
public static void main(String[] args) throws InterruptedException {
Device device = new Device();
// 初始化
device.init(productKey, deviceName, deviceSecret);
// 下行數(shù)據(jù)監(jiān)聽(tīng)
device.registerNotifyListener();
// 訂閱Topic
device.subscribe(rrpcTopic);
}
/**
* 初始化
*
* @param pk productKey
* @param dn deviceName
* @param ds deviceSecret
* @throws InterruptedException
*/
public void init(String pk, String dn, String ds) throws InterruptedException {
LinkKitInitParams params = new LinkKitInitParams();
// 設(shè)置 MQTT 初始化參數(shù)
IoTMqttClientConfig config = new IoTMqttClientConfig();
config.productKey = pk;
config.deviceName = dn;
config.deviceSecret = ds;
params.mqttClientConfig = config;
// 設(shè)置初始化設(shè)備證書(shū)信息,用戶傳入
DeviceInfo deviceInfo = new DeviceInfo();
deviceInfo.productKey = pk;
deviceInfo.deviceName = dn;
deviceInfo.deviceSecret = ds;
params.deviceInfo = deviceInfo;
LinkKit.getInstance().init(params, new ILinkKitConnectListener() {
public void onError(AError aError) {
System.out.println("init failed !! code=" + aError.getCode() + ",msg=" + aError.getMsg() + ",subCode="
+ aError.getSubCode() + ",subMsg=" + aError.getSubMsg());
}
public void onInitDone(InitResult initResult) {
System.out.println("init success !!");
}
});
// 確保初始化成功后才執(zhí)行后面的步驟,可以根據(jù)實(shí)際情況適當(dāng)延長(zhǎng)這里的延時(shí)
Thread.sleep(2000);
}
/**
* 監(jiān)聽(tīng)下行數(shù)據(jù)
*/
public void registerNotifyListener() {
LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {
@Override
public boolean shouldHandle(String connectId, String topic) {
// 只處理特定Topic的消息
if (topic.contains("/rrpc/request/")) {
return true;
} else {
return false;
}
}
@Override
public void onNotify(String connectId, String topic, AMessage aMessage) {
// 接收RRPC請(qǐng)求并回復(fù)RRPC響應(yīng)
try {
String response = topic.replace("/request/", "/response/");
publish(response, new String((byte[]) aMessage.getData(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void onConnectStateChange(String connectId, ConnectState connectState) {
}
});
}
/**
* 發(fā)布消息
*
* @param topic 發(fā)送消息的Topic
* @param payload 發(fā)送的消息內(nèi)容
*/
public void publish(String topic, String payload) {
MqttPublishRequest request = new MqttPublishRequest();
request.topic = topic;
request.payloadObj = payload;
request.qos = 0;
LinkKit.getInstance().getMqttClient().publish(request, new IConnectSendListener() {
@Override
public void onResponse(ARequest aRequest, AResponse aResponse) {
}
@Override
public void onFailure(ARequest aRequest, AError aError) {
}
});
}
/**
* 訂閱消息
*
* @param topic 訂閱消息的Topic
*/
public void subscribe(String topic) {
MqttSubscribeRequest request = new MqttSubscribeRequest();
request.topic = topic;
LinkKit.getInstance().getMqttClient().subscribe(request, new IConnectSubscribeListener() {
@Override
public void onSuccess() {
}
@Override
public void onFailure(AError aError) {
}
});
}
}
服務(wù)端代碼如下:
import java.io.UnsupportedEncodingException;
import org.apache.commons.codec.binary.Base64;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.exceptions.ServerException;
import com.aliyuncs.iot.model.v20180120.RRpcRequest;
import com.aliyuncs.iot.model.v20180120.RRpcResponse;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
public class CheckDeviceStatusOnServer {
// ===================需要用戶填寫(xiě)的參數(shù),開(kāi)始===========================
// 用戶賬號(hào)AccessKey
private static String accessKeyID = "";
// 用戶賬號(hào)AccesseKeySecret
private static String accessKeySecret = "";
// 產(chǎn)品productKey,設(shè)備證書(shū)參數(shù)之一
private static String productKey = "";
// 設(shè)備名字deviceName,設(shè)備證書(shū)參數(shù)之一
private static String deviceName = "";
// 實(shí)例ID。
private static String instanceId = "iot-******02";
// ===================需要用戶填寫(xiě)的參數(shù),結(jié)束===========================
public static void main(String[] args) throws ServerException, ClientException, UnsupportedEncodingException {
// -------------------------------------------------------------------
// 要發(fā)送的消息,可以自定義,建議使用當(dāng)前格式
// -------------------------------------------------------------------
// Field | Tyep | Desc
// -------------------------------------------------------------------
// id | Object | 用于驗(yàn)證收發(fā)的消息是否是同一個(gè),請(qǐng)自行業(yè)務(wù)層保證唯一
// -------------------------------------------------------------------
// version | String | 版本號(hào)固定1.0
// -------------------------------------------------------------------
// time | Long | 發(fā)送消息的時(shí)間戳,可以計(jì)算消息來(lái)回的延時(shí),評(píng)估當(dāng)前的通信質(zhì)量
// -------------------------------------------------------------------
String payload = "{\"id\":123, \"version\":\"1.0\",\"time\":" + System.currentTimeMillis() + "}";
// 構(gòu)建RRPC請(qǐng)求
RRpcRequest request = new RRpcRequest();
request.setProductKey(productKey);
request.setDeviceName(deviceName);
request.setIotInstanceId(instanceId);
request.setRequestBase64Byte(Base64.encodeBase64String(payload.getBytes()));
request.setTimeout(5000);
// 獲取服務(wù)端請(qǐng)求客戶端
DefaultAcsClient client = getClient();
// 發(fā)起RRPC請(qǐng)求
RRpcResponse response = (RRpcResponse) client.getAcsResponse(request);
// RRPC響應(yīng)處理
// 這個(gè)不能看response.getSuccess(),這個(gè)僅表明RRPC請(qǐng)求發(fā)送成功,不代表設(shè)備接收成功和響應(yīng)成功
// 需要根據(jù)RrpcCode來(lái)判定,參考文檔http://bestwisewords.com/document_detail/69797.html
if (response != null && "SUCCESS".equals(response.getRrpcCode())) {
if (payload.equals(new String(Base64.decodeBase64(response.getPayloadBase64Byte()), "UTF-8"))) {
System.out.println("Device is online");
} else {
System.out.println("Device is offline1");
}
} else {
System.out.println("Device is offline");
}
}
public static DefaultAcsClient getClient() {
DefaultAcsClient client = null;
try {
// 以下代碼中cn-shanghai僅為示例,需替換為您物聯(lián)網(wǎng)平臺(tái)服務(wù)的地域ID。
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", accessKeyID, accessKeySecret);
DefaultProfile.addEndpoint("cn-shanghai", "cn-shanghai", "Iot", "iot.cn-shanghai.aliyuncs.com");
client = new DefaultAcsClient(profile);
} catch (Exception e) {
System.out.println("init client failed !! exception:" + e.getMessage());
}
return client;
}
}
需服務(wù)端主動(dòng)觸發(fā)RRPC調(diào)用,檢測(cè)是否能及時(shí)收到設(shè)備端響應(yīng)結(jié)果。