設(shè)備數(shù)據(jù)訂閱
背景介紹:
該文檔適用于多租戶和單租戶兩種模式,發(fā)布和訂閱已授權(quán)給應(yīng)用的設(shè)備端的消息。物聯(lián)網(wǎng)平臺(tái)提供HTTPS/2 (Java)設(shè)備SDK進(jìn)行建聯(lián),用于建立設(shè)備端與物聯(lián)網(wǎng)平臺(tái)的通信。參考SDK鏈接 :http://bestwisewords.com/document_detail/143601.html?spm=a2c4g.11186623.4.5.367d4f38ycUoZZ 此處提供了SDK Demo,您可以參考此Demo,開發(fā)SDK,進(jìn)行訂閱設(shè)備端的消息。
前提條件
這種訂閱方式需要您使用托管平臺(tái)的2個(gè)模塊:分別是 [設(shè)備管理—設(shè)備—Topic列表] 和 [應(yīng)用托管—應(yīng)用集成—授權(quán)設(shè)備給應(yīng)用] 配合使用。前者定義設(shè)備端Topic的自定義名稱以及Topic的權(quán)限(發(fā)布和訂閱),后者是在托管平臺(tái)-設(shè)備集成把設(shè)備授權(quán)給應(yīng)用。
操作步驟
下載HTTP/2 SDK(Java) Demo。下載地址:設(shè)備訂閱Demo Code
使用IDEA或者Eclipse,將該Demo導(dǎo)入到工程里面。
從控制臺(tái)獲取已經(jīng)授權(quán)的appkey和appsecret信息。
完成身份認(rèn)證和對(duì)接進(jìn)行設(shè)備的消息訂閱。
Demo示例
a. 配置參數(shù)
//System.getenv("iot.hosting.appKey")方法可以從環(huán)境變量中獲取appkey和appsecret
String appKey = System.getenv("iot.hosting.appKey");
String appSecret=System.getenv("iot.hosting.appSecret");
//一般在配置文件中固定為:iot.http2.host=https://ah.iot-as-http2.cn-shanghai.aliyuncs.com:443
@Value("${iot.http2.host}")
private String httpHost;
b. 連接HTTP/2服務(wù)器,并接收數(shù)據(jù)
// 連接配置
Profile profile = Profile.getAppKeyProfile(httpHost, appKey, appSecret);
// 如果是true 那么清理所有離線消息,即qos0 或者 1的所有未接收內(nèi)容
profile.setCleanSession(false);
// 構(gòu)造客戶端
MessageClient messageClient = MessageClientFactory.messageClient(profile);
try {
// 數(shù)據(jù)接收
messageClient.connect(messageToken -> {
System.out.println(messageToken.getMessage());
return MessageCallback.Action.CommitSuccess;
});
} catch (Throwable ex) {
System.out.println(ex);
}
c. 訂閱Topic
// topic訂閱。訂閱成功后,即可在建連時(shí)的回調(diào)接口中收到消息
MessageCallback messageCallback = new MessageCallback() {
@Override
//消息消費(fèi)(接收消息)
public Action consume(MessageToken messageToken) {
//將消息實(shí)體化,進(jìn)行消息體的操作
ThingMsgRecordDO recordDO = new ThingMsgRecordDO();
recordDO.setTopic(messageToken.getMessage().getTopic());
recordDO.setPayload(new String(messageToken.getMessage().getPayload()));
System.out.println(recordDO);
return Action.CommitSuccess;
}
};
d.設(shè)置授權(quán)appkey的自定義topic
//消息訂閱Topic的設(shè)置,需在設(shè)備端設(shè)置自定義的Topic。(這里以自定義Topic為例)
//也可也使用設(shè)備端的統(tǒng)一的Topic
String topic = String.format("/a13TLBPrC0d/wkzijitianjiashebei/user/wkget", appKey);
messageClient.setMessageListener(topic, messageCallback);
多租戶和單租戶消息處理
a.消息返回示例
Message{payload={"deviceType":"CustomCategory",
"iotId":"69JL4ECGiPd28vqfdBM8000100",
"requestId":"123",
"productKey":"a13TLBPrC0d",
"gmtCreate":1565147849691,
"deviceName":"wkzijitianjiashebei",
"items":{"electric_pfa":{"value":23,"time":1565147849705},
"COSa":{"value":1,"time":1565147849705},
"Uab":{"value":121212,"time":1565147849705}}},
topic='/a13TLBPrC0d/wkzijitianjiashebei/thing/event/property/post',
messageId='1158940197065362432',
qos=0,
generateTime=1565147849705}
b.返回消息體參數(shù)簡要說明
名稱 | 描述 |
---|---|
deviceType | 設(shè)備類型 |
iotId | 阿里云物聯(lián)網(wǎng)平臺(tái)為設(shè)備頒發(fā)全局唯一的設(shè)備ID |
requestId | request請(qǐng)求id |
productKey | 設(shè)備隸屬的產(chǎn)品Key。 |
gmtCreate | 時(shí)間戳 |
deviceName | 設(shè)備名稱 |
items | 消息內(nèi)容 |
topic | 消息來源的Topic |
messageId | 消息ID |
qos | 清除離線消息 |
generateTime | 時(shí)間戳 |
c.消息處理
多租戶的模式:需要根據(jù)返回的消息體的字段進(jìn)行消息數(shù)據(jù)的處理。單租戶的模式:無需處理,獲取的消息為已授權(quán)給應(yīng)用的設(shè)備消息。
接口說明
身份認(rèn)證
設(shè)備連接物聯(lián)網(wǎng)平臺(tái)時(shí),需要使用Profile配置設(shè)備身份及相關(guān)參數(shù)。具體接口參數(shù)如下:
Profile profile = Profile.getAppKeyProfile(httpHost, appKey, appSecret);
MessageClient messageClient = MessageClientFactory.messageClient(profile);
profile.setCleanSession(false);
try {
messageClient.connect(messageToken -> {
return MessageCallback.Action.CommitSuccess;
});
} catch (Throwable ex) {
logger.info("init h2 client exception", ex);
System.out.println(ex);
}
Profile參數(shù)說明:
名稱 | 類型 | 是否必須 | 描述 |
---|---|---|---|
appKey | String | 是 | 設(shè)備授權(quán)應(yīng)用的appkey可以通過環(huán)境變量獲取system.getev(“iot.hosting.appkey”) |
appSecret | String | 是 | 設(shè)備授權(quán)應(yīng)用appSecret可以通過環(huán)境變量獲取system.getev(“iot.hosting.appSecret”) |
httpHost | String | 是 | 請(qǐng)求路由,目前為上海區(qū)域:”https://ah.iot-as-http2.cn-shanghai.aliyuncs.com:443“ |
cleanSession | Boolean | 否 | 是否清除緩存的離線消息。 |
heartBeatInterval | Long | 否 | 心跳間隔,單位為毫秒。 |
heartBeatTimeOut | Long | 否 | 心跳超時(shí)時(shí)間,單位為毫秒。 |
multiConnection | Boolean | 否 | 是否使用多連接。若使用設(shè)備的productKey和deivceName接入時(shí),請(qǐng)將此參數(shù)值設(shè)置為false。 |
callbackThreadCorePoolSize | Integer | 否 | 回調(diào)線程的corePoolSize(核心池的大小)。 |
callbackThreadMaximumPoolSize | Integer | 否 | 回調(diào)線程池的maximumPoolSize(最大線程數(shù))。 |
callbackThreadBlockingQueueSize | Integer | 否 | 回調(diào)線程池的BlockingQueueSize(阻塞隊(duì)列大小)。 |
authParams | Map | 否 | 自定義認(rèn)證參數(shù)。 |