基于MQTT Topic通信
Java Link SDK提供與阿里云物聯(lián)網(wǎng)平臺(tái)通信的基礎(chǔ)能力接口,本文介紹通過(guò)自定義Topic實(shí)現(xiàn)發(fā)布、訂閱、取消訂閱的基礎(chǔ)能力。
具體代碼實(shí)現(xiàn)請(qǐng)參見(jiàn)Demo中的MqttSample.java。
發(fā)布消息
更多發(fā)布消息內(nèi)容,請(qǐng)參見(jiàn)MqttPublishRequest。
不返回應(yīng)答
適用于設(shè)備上行消息,物聯(lián)網(wǎng)平臺(tái)不回復(fù),或者物聯(lián)網(wǎng)平臺(tái)進(jìn)行了回復(fù)但設(shè)備端不需要做處理的場(chǎng)景。
// 發(fā)布 MqttPublishRequest request = new MqttPublishRequest(); // 設(shè)置是否需要應(yīng)答。 request.isRPC = false; // 設(shè)置topic,設(shè)備通過(guò)該Topic向物聯(lián)網(wǎng)平臺(tái)發(fā)送消息。以下Topic為示例,需替換為用戶自己設(shè)備的Topic。 request.topic = "/a18wP******/LightSwitch/user/update"; // 設(shè)置qos request.qos = 0; String data = "hello world"; //TODO:data 設(shè)置需要發(fā)布的數(shù)據(jù) request.payloadObj = data; LinkKit.getInstance().publish(request, new IConnectSendListener() { @Override public void onResponse(ARequest aRequest, AResponse aResponse) { // 消息成功提交給操作系統(tǒng)的發(fā)送緩沖區(qū)。 // 在網(wǎng)絡(luò)波動(dòng)等異常情況下,消息可能無(wú)法到達(dá)云端。 // 如果上行的消息有對(duì)應(yīng)的下行的reply, 建議通過(guò)reply報(bào)文來(lái)確認(rèn)上行消息的到達(dá)。 } @Override public void onFailure(ARequest aRequest, AError aError) { // 發(fā)布失敗 } });
參數(shù)
示例
說(shuō)明
isRPC
false
是否為RPC請(qǐng)求,如果是,則需要等待 replyTopic消息后才Rsp。
默認(rèn)為false,表示不需應(yīng)答。
topic
/a18wP******/LightSwitch/user/update
擁有發(fā)布權(quán)限的Topic。設(shè)備通過(guò)該Topic向物聯(lián)網(wǎng)平臺(tái)發(fā)送消息。
qos
0
設(shè)置MQTT請(qǐng)求中QOS的值,默認(rèn)為0。
payloadObj
{"id":"160865432","method":"thing.event.property.post","params":{"LightSwitch":1},"version":"1.0"}
需要發(fā)布的數(shù)據(jù),可以為任意格式數(shù)據(jù)。如果格式為JSON String,其中
id
字段需要保持每次唯一,不可重復(fù),請(qǐng)使用自增的方式進(jìn)行設(shè)置ID字段。示例中id
字段為160865432,則下次id
字段應(yīng)為160865433。返回應(yīng)答
適用于用戶將設(shè)備上行的消息流轉(zhuǎn)到自己的業(yè)務(wù)服務(wù)器,業(yè)務(wù)服務(wù)器再發(fā)送下行消息給設(shè)備的場(chǎng)景。
適用于用戶使用Alink協(xié)議的Topic,并且協(xié)議規(guī)定服務(wù)端會(huì)回復(fù)reply報(bào)文的場(chǎng)景。
// 發(fā)布 MqttPublishRequest request = new MqttPublishRequest(); // 設(shè)置是否需要應(yīng)答。設(shè)置為true,表示期望收到物聯(lián)網(wǎng)平臺(tái)的下行回復(fù)。 request.isRPC = true; // 設(shè)置qos request.qos = 0; // 設(shè)置topic,設(shè)備通過(guò)該Topic向物聯(lián)網(wǎng)平臺(tái)發(fā)送消息。以下Topic為示例,需替換為用戶自己設(shè)備的Topic。 request.topic = "/a18wP******/LightSwitch/user/update"; //設(shè)置物聯(lián)網(wǎng)平臺(tái)答復(fù)的topic,若不設(shè)置,則默認(rèn)為 topic+“_reply”。 request.replyTopic = "/a18wP******/LightSwitch/user/update_reply"; String data = "hello world"; //TODO:data 設(shè)置需要發(fā) request.payloadObj = data; LinkKit.getInstance().publish(request, new IConnectSendListener() { @Override public void onResponse(ARequest aRequest, AResponse aResponse) { // 發(fā)布成功 } @Override public void onFailure(ARequest aRequest, AError aError) { // 發(fā)布失敗 } });
參數(shù)
示例
說(shuō)明
isRPC
true
是否為RPC請(qǐng)求,如果是,則需要等待 replyTopic消息后才Rsp。
設(shè)置為true,表示期望收到物聯(lián)網(wǎng)平臺(tái)的下行回復(fù)。
qos
0
設(shè)置MQTT請(qǐng)求中QoS的值,默認(rèn)為0。
topic
/a18wP******/LightSwitch/user/update
擁有發(fā)布權(quán)限的Topic。設(shè)備通過(guò)該Topic向物聯(lián)網(wǎng)平臺(tái)發(fā)送消息。
replyTopic
/a18wP******/LightSwitch/user/update_reply
設(shè)置物聯(lián)網(wǎng)平臺(tái)答復(fù)的topic,若不設(shè)置,則默認(rèn)為 topic+“_reply”。
payloadObj
{"id":"160865432","method":"thing.event.property.post","params":{"LightSwitch":1},"version":"1.0"}
需要發(fā)布的數(shù)據(jù),格式必須為JSON String,其中
id
字段需要保持每次唯一,不可重復(fù),請(qǐng)使用自增的方式進(jìn)行設(shè)置ID字段。示例中id
字段為160865432,則下次id
字段應(yīng)為160865433。
訂閱消息
您可以訂閱指定Topic的消息,訂閱關(guān)系將保存在物聯(lián)網(wǎng)平臺(tái)。物聯(lián)網(wǎng)平臺(tái)收到指定Topic發(fā)布的消息后,會(huì)將消息轉(zhuǎn)發(fā)給設(shè)備。更多內(nèi)容,請(qǐng)參見(jiàn)MqttSubscribeRequest。
訂閱成功后,相應(yīng)的下行消息,會(huì)通過(guò)認(rèn)證與連接的連接狀態(tài)與下行消息監(jiān)聽(tīng)中的IConnectNotifyListener
對(duì)象中透出。
示例代碼:
// 訂閱
MqttSubscribeRequest subscribeRequest = new MqttSubscribeRequest();
// subTopic 替換成您需要訂閱的 topic
subscribeRequest.topic = subTopic;
subscribeRequest.isSubscribe = true;
LinkKit.getInstance().subscribe(subscribeRequest, new IConnectSubscribeListener() {
@Override
public void onSuccess() {
// 訂閱成功
}
@Override
public void onFailure(AError aError) {
// 訂閱失敗
}
});
如果設(shè)備不再需要訂閱指定Topic,需要主動(dòng)取消訂閱。否則訂閱關(guān)系一直存在,您將持續(xù)收到訂閱Topic發(fā)送的消息。更多內(nèi)容,請(qǐng)參見(jiàn)下文取消訂閱。
取消訂閱
// 取消訂閱
MqttSubscribeRequest unsubRequest = new MqttSubscribeRequest();
// unSubTopic 替換成您需要取消訂閱的topic
unsubRequest.topic = unSubTopic;
unsubRequest.isSubscribe = false;
LinkKit.getInstance().unsubscribe(unsubRequest, new IConnectUnscribeListener() {
@Override
public void onSuccess() {
// 取消訂閱成功
}
@Override
public void onFailure(AError aError) {
// 取消訂閱失敗
}
});