數(shù)據(jù)AMQP方式推送
AMQP(Advanced Message Queuing Protocol,高級消息隊列協(xié)議)轉(zhuǎn)儲功能適用于生活物聯(lián)網(wǎng)平臺與企業(yè)服務(wù)器之間的消息流轉(zhuǎn)。通過集成和使用AMQP SDK,即可實現(xiàn)身份認證、消息接收的能力。我們推薦使用AMQP的方式推送設(shè)備數(shù)據(jù)(如設(shè)備狀態(tài)數(shù)據(jù)、設(shè)備控制記錄等),用戶信息數(shù)據(jù)等。
前提條件
開啟設(shè)備數(shù)據(jù)同步,并配置要同步數(shù)據(jù)的產(chǎn)品。詳細參見設(shè)置數(shù)據(jù)同步。
當(dāng)開啟數(shù)據(jù)同步后,集成AMQP客戶端SDK來訂閱數(shù)據(jù)。如果通過控制臺關(guān)閉數(shù)據(jù)同步再開啟時,客戶端SDK需要重新進行連接流程,否則無法正常接收數(shù)據(jù)。如果切換不同的AMQP客戶端,需要把之前的客戶端斷開,再連接新的客戶端。否則新客戶端無法正常接收數(shù)據(jù)。
AMQP SDK使用
引入依賴。
AMQP SDK為開源SDK。如果您使用Java開發(fā)語言,推薦使用Apache Qpid JMS客戶端。在項目中添加Maven依賴,Maven信息如下。
說明目前生活物聯(lián)網(wǎng)平臺僅支持Java語言開發(fā),暫未提供其他語言版本,請以生活物聯(lián)網(wǎng)平臺官網(wǎng)為主。
<!-- amqp 1.0 qpid client --> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.47.0</version> </dependency> <!-- util for base64--> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.10</version> </dependency>
認證身份信息。
身份認證需要使用AppKey和AppSecret,該信息可以從控制臺中獲取。
認證身份信息需要使用EndPoint、AppKey和AppSecret用于鑒權(quán)。
其中,EndPoint是連接節(jié)點,具體取值如下表所示。
區(qū)域
End Point
中國內(nèi)地
amqps://ilop.iot-amqp.cn-shanghai.aliyuncs.com:5671
新加坡
amqps://ilop.iot-amqp.ap-southeast-1.aliyuncs.com:5671
美國(弗吉尼亞)
amqps://ilop.iot-amqp.us-east-1.aliyuncs.com:5671
德國(法蘭克福)
amqps://ilop.iot-amqp.eu-central-1.aliyuncs.com:5671
接收云端消息。
首先需要創(chuàng)建消息接收的客戶端對象client,并傳入上面身份認證的profile信息。當(dāng)消息接收的客戶端和服務(wù)端建立連接后,服務(wù)端會立即向消息接收的客戶端推送已訂閱的消息,因此建立連接時需要提供默認消息接收的回調(diào)接口,用于處理云端推送的消息。
完整代碼示例如下。
import java.net.URI; import java.util.Hashtable; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import org.apache.commons.codec.binary.Base64; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionListener; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class AmqpJavaClientDemo { private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClientDemo.class); //業(yè)務(wù)處理異步線程池,線程池參數(shù)可以根據(jù)您的業(yè)務(wù)特點調(diào)整,或者您也可以用其他異步方式處理接收到的消息。 private final static ExecutorService executorService = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(50000)); public static void main(String[] args) throws Exception { String appKey = "${YourAppkey}"; //在控制臺自有App,單擊密鑰對應(yīng)的查看,可顯示App的App Key和App Secret 創(chuàng)建App時系統(tǒng) String appSecret = "${YourAppSecret}"; String consumerGroupId = "${YourAppkey}"; long random = xxxxx; //建議使用機器UUID、MAC地址、IP等唯一標(biāo)識等作為clientId。便于您區(qū)分識別不同的客戶端。 String clientId = "${YourClientId}"; String userName = clientId + "|authMode=appkey" + ",signMethod=" + "SHA256" + ",random=" + random + ",appKey=" + appKey + ",groupId=" + consumerGroupId + "|"; String signContent = "random=" + random; String password = doSign(signContent, appSecret, "HmacSHA256"); String connectionUrlTemplate = "failover:(${AMQPEndPointUrl}?amqp.idleTimeout=80000)" + "?failover.maxReconnectAttempts=10&failover.reconnectDelay=30"; Hashtable<String, String> hashtable = new Hashtable<>(); hashtable.put("connectionfactory.SBCF",connectionUrlTemplate); hashtable.put("queue.QUEUE", "default"); hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); Context context = new InitialContext(hashtable); ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF"); Destination queue = (Destination)context.lookup("QUEUE"); // 創(chuàng)建連接。 Connection connection = cf.createConnection(userName, password); ((JmsConnection) connection).addConnectionListener(myJmsConnectionListener); // 創(chuàng)建會話。 // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手動調(diào)用message.acknowledge()。 // Session.AUTO_ACKNOWLEDGE: SDK自動ACK(推薦)。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); // 創(chuàng)建Receiver連接。 MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(messageListener); } private static MessageListener messageListener = new MessageListener() { @Override public void onMessage(Message message) { try { //1.收到消息之后一定要ACK。 // 推薦做法:創(chuàng)建Session選擇Session.AUTO_ACKNOWLEDGE,這里會自動ACK。 // 其他做法:創(chuàng)建Session選擇Session.CLIENT_ACKNOWLEDGE,這里一定要調(diào)message.acknowledge()來ACK。 // message.acknowledge(); //2.建議異步處理收到的消息,確保onMessage函數(shù)里沒有耗時邏輯。 // 如果業(yè)務(wù)處理耗時過程過長阻塞住線程,可能會影響SDK收到消息后的正常回調(diào)。 executorService.submit(() -> processMessage(message)); } catch (Exception e) { logger.error("submit task occurs exception ", e); } } }; /** * 在這里處理您收到消息后的具體業(yè)務(wù)邏輯。 */ private static void processMessage(Message message) { try { byte[] body = message.getBody(byte[].class); String content = new String(body); String topic = message.getStringProperty("topic"); String messageId = message.getStringProperty("messageId"); logger.info("receive message" + ", topic = " + topic + ", messageId = " + messageId + ", content = " + content); } catch (Exception e) { logger.error("processMessage occurs error ", e); } } private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() { /** * 連接成功建立。 */ @Override public void onConnectionEstablished(URI remoteURI) { logger.info("onConnectionEstablished, remoteUri:{}", remoteURI); } /** * 嘗試過最大重試次數(shù)之后,最終連接失敗。 */ @Override public void onConnectionFailure(Throwable error) { logger.error("onConnectionFailure, {}", error.getMessage()); } /** * 連接中斷。 */ @Override public void onConnectionInterrupted(URI remoteURI) { logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI); } /** * 連接中斷后又自動重連上。 */ @Override public void onConnectionRestored(URI remoteURI) { logger.info("onConnectionRestored, remoteUri:{}", remoteURI); } @Override public void onInboundMessage(JmsInboundMessageDispatch envelope) {} @Override public void onSessionClosed(Session session, Throwable cause) {} @Override public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {} @Override public void onProducerClosed(MessageProducer producer, Throwable cause) {} }; /** * 計算簽名,password組裝方法,請參見AMQP客戶端接入說明文檔。 */ private static String doSign(String toSignString, String secret, String signMethod) throws Exception { SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod); Mac mac = Mac.getInstance(signMethod); mac.init(signingKey); byte[] rawHmac = mac.doFinal(toSignString.getBytes()); return Hex.encodeHexString(rawHmac); } }
說明消息推送失敗時,平臺會重新推送,重試策略如下。
如果對端不在線或未回復(fù)ack消息,則會造成消息堆積,堆積的消息轉(zhuǎn)為離線消息。
離線消息每隔1min重試推送一次(每次推送10條)。對端如果成功接收了消息,則重試策略會繼續(xù)推送剩余的離線消息(推送失敗的消息,下一次繼續(xù)推送)。
離線消息最多會保存 1 天,如果 1 天后仍然無法推送成功,則會被刪除。
離線消息會進入單獨的隊列,不會影響后續(xù)消息的實時推送。
消息格式
物的屬性變更消息
topic:
/${productKey}/${deviceName}/thing/event/property/post
消息字段說明如下。
參數(shù)
子參數(shù)
子參數(shù)
類型
含義
deviceType
String
設(shè)備所屬品類
gmtCreate
Long
數(shù)據(jù)流轉(zhuǎn)消息產(chǎn)生時間,自1970-1-1起流逝的毫秒值
iotId
String
設(shè)備的唯一ID
productKey
String
設(shè)備所屬產(chǎn)品的唯一標(biāo)識符
deviceName
String
設(shè)備名稱
items
JSON
變更的狀態(tài)列表
attribute
String
發(fā)生變更的屬性,具體取值由具體情況確定
value
具體數(shù)據(jù)類型由具體情況確定
變更值
time
Long
設(shè)備屬性發(fā)生變化的時間,自1970-1-1起流逝的毫秒值
消息示例如下。
{ "deviceType": "SmartDoor", "iotId": "Xzf15db9xxxxxxxxWR001046b400", "productKey": "a17xxxxTYNA", "gmtCreate": 153xxxx145304, "deviceName": "Xzf15xxxxucTHBgUo6WR", "items": { "WIFI_Rx_Rate": { "value": 74274, "time": 1534299145344 } } }
物的事件變更消息
topic:
/${productKey}/${deviceName}/thing/event/{tsl.event.identifier}/post
消息字段說明如下。
參數(shù)
子參數(shù)
類型
含義
deviceType
String
設(shè)備所屬品類
iotId
String
設(shè)備的唯一ID
productKey
String
設(shè)備所屬產(chǎn)品的唯一標(biāo)識符
deviceName
String
設(shè)備名稱
identifier
String
事件標(biāo)識符,對應(yīng)事件的identifier
name
String
事件名稱
type
String
事件類型
time
Long
設(shè)備上報value對應(yīng)的時間,自1970-1-1起流逝的毫秒值
value
JSON
變更的事件屬性列表:key-value鍵值對
key
String
屬性key
value
具體數(shù)據(jù)類型由具體情況確定
屬性取值
消息示例如下。
{ "deviceType": "SmartDoor", "identifier": "Doorxxxxication", "iotId": "Xzf15db9xxxxxxxxx01046b400", "name": "開門通知", "time": 1534319108982, "type": "info", "productKey": "a17xxxxTYNA", "deviceName": "Xzf15xxxxucTHBgUo6WR", "value": { "KeyID": "x8xxxxxkDY", "LockType": 3 } }
設(shè)備服務(wù)返回消息
topic:
/${productKey}/${deviceName}/thing/downlink/reply/message
消息字段說明如下。
參數(shù)
類型
含義
gmtCreate
Long
數(shù)據(jù)流轉(zhuǎn)消息產(chǎn)生時間,自1970-1-1起流逝的毫秒值
iotId
String
設(shè)備的唯一ID
productKey
String
設(shè)備所屬產(chǎn)品的唯一標(biāo)識符
deviceName
String
設(shè)備名稱
requestId
String
阿里云產(chǎn)生和設(shè)備通信的信息ID
code
Integer
調(diào)用的結(jié)果信息
message
String
結(jié)果信息說明
topic
String
服務(wù)調(diào)用下行時使用的topic
data
Object
設(shè)備返回的結(jié)果,非透傳之間返回設(shè)備結(jié)果,透傳則需要經(jīng)過腳本轉(zhuǎn)換
消息示例如下。
{ "gmtCreate": 151xxxx39881, "iotId": "4z819VQHxxxxxxxxxxxx7ee200", "productKey": "p1gxxxxBd", "deviceName": "xxxxxxxxxx", "requestId": "1234", "code": 200, "message": "success", "topic": "/sys/p1gxxxxeUBd/xxxxxxxxxx/thing/service/property/set", "data": {} }
物的狀態(tài)變更消息
為了提高消息有效性,設(shè)備上下線過于頻繁時,會對消息進行篩檢。
topic:
/as/mqtt/status/{pk}/{dn}
消息字段說明如下。
參數(shù)
類型
含義
status
String
設(shè)備狀態(tài)。online:上線。offline:離線。
iotId
String
設(shè)備在平臺內(nèi)的唯一標(biāo)識。
offlineReasonCode
Integer
設(shè)備下線時,返回的錯誤碼。詳細說明,請參見設(shè)備行為錯誤碼。
productKey
String
設(shè)備所屬產(chǎn)品的唯一標(biāo)識。
deviceName
String
設(shè)備名稱。
lastTime
String
該參數(shù)為歷史存量字段,已無實際意義。
utcLastTime
String
time
String
設(shè)備上、下線的時間。
utcTime
String
設(shè)備上、下線的UTC時間。
clientIp
String
設(shè)備公網(wǎng)出口IP。
消息示例如下。
{ "status":"offline", "iotId":"4z819VQHk6VSLmmBJfrf00107e****", "offlineReasonCode":427, "productKey":"al12345****", "deviceName":"deviceName1234", "time":"2018-08-31 15:32:28.205", "utcTime":"2018-08-31T07:32:28.205Z", "lastTime":"2018-08-31 15:32:28.195", "utcLastTime":"2018-08-31T07:32:28.195Z", "clientIp":"192.0.2.1" }
用戶綁定變更消息
用戶綁定/解綁設(shè)備產(chǎn)生的回流消息,用于同步用戶與設(shè)備的綁定、解綁。
topic:
/${productKey}/${deviceName}/thing/awss/enrollee/user
消息字段說明如下。
參數(shù)
子參數(shù)
類型
含義
bind
bool
true-綁定;false-解綁
productKey
String
設(shè)備所屬產(chǎn)品的唯一標(biāo)識符
deviceName
String
設(shè)備名稱
iotId
String
設(shè)備的唯一ID
messageCreateTime
Long
消息創(chuàng)建時間
identityInfos
list
用戶信息列表
identityId
String
用戶身份ID
scopeId
String
隔離ID
tenantId
String
租戶ID
owned
Integer
擁有標(biāo)記
0:分享者
1:擁有者
params
Map
擴展參數(shù)(暫未使用)
{ "bind":true, "productKey": "123xxxx569", "deviceName": "deviceNamexxxx34", "iotId": "", "messageCreateTime": 151xxxx9881, "identityInfos":[ { "identityId":"50xxxxxxxxxxxx62060259", "scopeId":"", "tenantId":"1D89B5xxxxxxxxxxxxxxxx861678FF", "owned":1 } ], "params":{ } }