使用MQTT協議接入的設備和物聯網平臺,通過訂閱Topic或向Topic發布消息的方式進行通信。Topic分為系統Topic、物模型Topic和自定義Topic,其中自定義Topic需要用戶在控制臺定義。 本文為您介紹設備使用自定義Topic與物聯網平臺進行上下行通信,以及物聯網平臺和業務服務器之間通信的步驟。
背景信息
本示例中,電子溫度計通過訂閱自定義Topic接收指令,通過向自定義Topic發布消息從而上報溫度,物聯網平臺將收到的溫度信息通過AMQP服務端轉發到用戶服務器,用戶服務器調用Pub接口向自定義Topic發布消息從而實現對設備的遠程精度設置。
準備開發環境
本示例中,設備端和云端均使用Java語言的SDK,需先準備Java開發環境。您可從Java官方網站下載并安裝Java開發環境。
本示例使用環境如下:
操作系統:Windows 10 64位
JDK版本:JDK8
集成開發環境:IntelliJ IDEA社區版
創建產品和設備
登錄物聯網平臺控制臺。
在實例概覽頁簽的全部環境下,找到對應的實例,單擊實例卡片。
在左側導航欄,單擊 。
單擊創建產品,創建溫度計產品,獲取productKey,例如
a1uzcH0****
。詳細操作指導,請參見創建產品。
創建產品成功后,單擊該產品對應的查看。
在產品詳情頁面的Topic類列表頁簽下,單擊自定義Topic,增加自定義Topic類。
詳細操作指導,請參見使用自定義Topic通信。
本示例中,定義了以下兩個Topic類:
設備發布消息Topic:/a1uzcH0****/${deviceName}/user/devmsg,權限為發布。
設備訂閱消息Topic:/a1uzcH0****/${deviceName}/user/cloudmsg,權限為訂閱。
在服務端訂閱頁簽下,單擊創建訂閱,設置AMQP服務端訂閱,訂閱設備上報消息到默認消費組。
設備上報消息包含自定義Topic消息和物模型消息。詳細操作和說明,請參見配置AMQP服務端訂閱。
在左側導航欄,選擇 ,然后在剛創建的溫度計產品下,添加設備device1,獲取設備證書ProductKey、DeviceName和DeviceSecret。
詳細操作指導,請參見單個創建設備。
設備發送消息給服務器
流程圖:
在整個流程中:
服務器通過AMQP客戶端接收消息,需配置AMQP客戶端接入物聯網平臺,監聽設備消息。具體操作,請參見Java SDK接入示例。
重要AMQP訂閱消息的消費組,必須與設備在相同的物聯網平臺實例下。
配置設備端SDK接入物聯網平臺,并發送消息。
配置設備認證信息。
final String productKey = "a1uzcH0****"; final String deviceName = "device1"; final String deviceSecret = "uwMTmVAMnxB****"; final String region = "cn-shanghai"; final String iotInstanceId = "iot-2w****";
實際業務場景中,您需修改以下參數值。
參數
示例
說明
productKey
a1uzcH0****
設備證書信息。您可在物聯網平臺控制臺的設備詳情頁面查看。具體操作,請參見查看具體設備信息。
deviceName
device1
deviceSecret
uwMTmVAMnxB****
region
cn-shanghai
您物聯網平臺設備所在地域的Region ID。Region ID表達方法,請參見地域列表。
iotInstanceId
iot-2w****
設備所屬實例的ID。
您可在控制臺的實例概覽頁面查看。
若有ID值,必須傳入該ID值。
若無實例概覽頁面或ID值,傳入空值,即
iotInstanceId = ""
。
設置初始化連接參數,包括MQTT連接配置、設備信息和初始物模型屬性。
LinkKitInitParams params = new LinkKitInitParams(); //LinkKit底層是MQTT協議,設置MQTT的配置。 IoTMqttClientConfig config = new IoTMqttClientConfig(); config.productKey = productKey; config.deviceName = deviceName; config.deviceSecret = deviceSecret; config.channelHost = productKey + ".iot-as-mqtt." + region + ".aliyuncs.com:1883"; //設備的信息。 DeviceInfo deviceInfo = new DeviceInfo(); deviceInfo.productKey = productKey; deviceInfo.deviceName = deviceName; deviceInfo.deviceSecret = deviceSecret; //報備的設備初始狀態。 Map<String, ValueWrapper> propertyValues = new HashMap<String, ValueWrapper>(); params.mqttClientConfig = config; params.deviceInfo = deviceInfo; params.propertyValues = propertyValues;
實際業務場景中,您需修改以下參數值。
參數
示例
說明
config.channelHost
config.channelHost = productKey + ".iot-as-mqtt." + region + ".aliyuncs.com:1883";
MQTT設備接入域名。
舊版公共實例:
config.channelHost = productKey + ".iot-as-mqtt." + region + ".aliyuncs.com:1883";
。新版公共實例和企業版實例:
config.channelHost = iotInstanceId + ".mqtt.iothub.aliyuncs.com:1883";
。
初始化連接。
//連接并設置連接成功以后的回調函數。 LinkKit.getInstance().init(params, new ILinkKitConnectListener() { @Override public void onError(AError aError) { System.out.println("Init error:" + aError); } //初始化成功以后的回調。 @Override public void onInitDone(InitResult initResult) { System.out.println("Init done:" + initResult); } });
設備發送消息。
設備端連接物聯網平臺后,向自定義的Topic發送消息。您需將onInitDone函數內容替換為以下內容:
@Override public void onInitDone(InitResult initResult) { //設置Pub消息的Topic和內容。 MqttPublishRequest request = new MqttPublishRequest(); request.topic = "/" + productKey + "/" + deviceName + "/user/devmsg"; request.qos = 0; request.payloadObj = "{\"temperature\":35.0, \"time\":\"sometime\"}"; //發送消息并設置成功以后的回調。 LinkKit.getInstance().publish(request, new IConnectSendListener() { @Override public void onResponse(ARequest aRequest, AResponse aResponse) { System.out.println("onResponse:" + aResponse.getData()); } @Override public void onFailure(ARequest aRequest, AError aError) { System.out.println("onFailure:" + aError.getCode() + aError.getMsg()); } }); }
實際業務場景中,您需修改以下參數值。
參數
示例
說明
request.topic
"/" + productKey + "/" + deviceName + "/user/devmsg"
具有發布權限的自定義Topic。
request.payloadObj
"{\"temperature\":35.0, \"time\":\"sometime\"}"
自定義的消息內容。
服務器收到消息如下:
Message {payload={"temperature":35.0, "time":"sometime"}, topic='/a1uzcH0****/device1/user/devmsg', messageId='1131755639450642944', qos=0, generateTime=1558666546105}
服務器發送消息給設備
流程圖:
配置設備端SDK訂閱Topic。
配置設備認證信息、設置初始化連接參數、初始化連接,請參見設備發送消息給服務器中的相應示例代碼。
設備要接收服務器發送的消息,還需訂閱消息Topic。
配置設備端訂閱Topic示例如下:
//初始化成功以后的回調。 @Override public void onInitDone(InitResult initResult) { //設置訂閱的Topic。 MqttSubscribeRequest request = new MqttSubscribeRequest(); request.topic = "/" + productKey + "/" + deviceName + "/user/cloudmsg"; request.isSubscribe = true; //發出訂閱請求并設置訂閱成功或者失敗的回調函數。 LinkKit.getInstance().subscribe(request, new IConnectSubscribeListener() { @Override public void onSuccess() { System.out.println(""); } @Override public void onFailure(AError aError) { } }); //設置訂閱的下行消息到來時的回調函數。 IConnectNotifyListener notifyListener = new IConnectNotifyListener() { //此處定義收到下行消息以后的回調函數。 @Override public void onNotify(String connectId, String topic, AMessage aMessage) { System.out.println( "received message from " + topic + ":" + new String((byte[])aMessage.getData())); } @Override public boolean shouldHandle(String s, String s1) { return false; } @Override public void onConnectStateChange(String s, ConnectState connectState) { } }; LinkKit.getInstance().registerOnNotifyListener(notifyListener); }
其中,
request.topic
值需修改為具有訂閱權限的自定義Topic。配置云端SDK調用物聯網平臺接口Pub發布消息。參數說明,請參見Pub,使用說明,請參見Java SDK使用說明。
設置身份認證信息。
String regionId = "cn-shanghai"; String accessKey = "LTAI4GFGQvKuqHJhFaj****"; String accessSecret = "iMS8ZhCDdfJbCMeA005sieKe****"; final String productKey = "a1uzcH0****"; final String deviceName = "device1"; final String iotInstanceId = "iot-2w****";
實際業務場景中,您需修改以下參數值。
參數
示例
說明
accessKey
LTAI4GFGQvKuqHJhFaj****
您的阿里云賬號的AccessKey ID和AccessKey Secret。
登錄物聯網平臺控制臺,將鼠標移至賬號頭像上,然后單擊AccessKey管理,獲取AccessKey ID和AccessKey Secret。
說明如果使用RAM用戶,您需授予該RAM用戶管理物聯網平臺的權限(AliyunIOTFullAccess),否則將連接失敗。授權方法請參見授權RAM用戶訪問物聯網平臺。
accessSecret
iMS8ZhCDdfJbCMeA005sieKe****
productKey
a1uzcH0****
設備證書信息。您可在物聯網平臺控制臺的設備詳情頁面查看。具體操作,請參見查看具體設備信息。
deviceName
device1
region
cn-shanghai
您物聯網平臺設備所在地域的Region ID。Region ID表達方法,請參見地域列表。
iotInstanceId
iot-2w****
設備所屬實例的ID。
您可在控制臺的實例概覽頁面查看。
若有ID值,必須傳入該ID值。
若無實例概覽頁面或ID值,傳入空值,即
iotInstanceId = ""
。
設置連接參數。
//設置client的參數。 DefaultProfile profile = DefaultProfile.getProfile(regionId, accessKey, accessSecret); IAcsClient client = new DefaultAcsClient(profile);
設置消息發布參數。
PubRequest request = new PubRequest(); request.setIotInstanceId(iotInstanceId); request.setQos(0); //設置發布消息的Topic。 request.setTopicFullName("/" + productKey + "/" + deviceName + "/user/cloudmsg"); request.setProductKey(productKey); //設置消息的內容,一定要用Base64編碼,否則亂碼。 request.setMessageContent(Base64.encode("{\"accuracy\":0.001,\"time\":now}"));
實際業務場景中,您需修改調用接口的請求參數。詳細說明,請參見Pub。
發送消息。
try { PubResponse response = client.getAcsResponse(request); System.out.println("pub success?:" + response.getSuccess()); } catch (Exception e) { System.out.println(e); }
設備端接收到的消息如下:
msg = [{"accuracy":0.001,"time":now}]
附錄:代碼示例
實際業務場景中,請按照上文描述,修改相關代碼及相關參數的值。
下載Pub/Sub demo,包含本示例的云端SDK和設備端SDK配置代碼Demo。
AMQP客戶端接入物聯網平臺示例,請參見: