如果需要將設備消息轉發至AMQP客戶端進行消費,您需在業務服務器中使用AMQP SDK開啟客戶端連接物聯網平臺。AMQP客戶端接入成功并在業務服務器運行,才能接收物聯網平臺的設備消息。本文介紹AMQP客戶端接入物聯網平臺的方法。
前提條件
已創建消費組:使用AMQP SDK開啟AMQP客戶端消費消費組內訂閱的消息,需要指定消費組ID。
背景信息
服務端訂閱和云產品流轉功能就可將設備消息流轉至AMQP客戶端消費。您可對比流轉方案、應用場景及功能優勢,根據業務需求選擇合適的流轉方案。具體內容,請參見:
使用限制
阿里云物聯網平臺服務端訂閱僅支持AMQP 1.0版的協議標準。AMQP協議標準的詳細介紹,請參見AMQP協議標準。
一個AMQP客戶端的連接數最大為128個。最多可開啟64個AMQP客戶端同時消費同一個消費組。
AMQP SDK說明
建議您使用阿里云物聯網平臺提供的AMQP SDK接入示例。對于您自研的AMQP SDK,阿里云不提供后續技術支持服務。
阿里云物聯網平臺提供以下語言的AMQP SDK示例代碼供您使用。AMQP客戶端認證過程、認證參數配置和消息處理邏輯,請參見本文的連接認證過程、連接配置說明和客戶端接收消息邏輯。
接入過程中,您可能會遇到消息相關的錯誤碼。更多信息,請參見消息相關錯誤碼。
連接認證過程
AMQP客戶端與物聯網平臺經過三次握手建立TCP連接,然后進行TLS握手校驗。
說明為了保障安全,接收方必須使用TLS加密,不支持非加密的TCP傳輸。
AMQP客戶端請求建立
Connection
。連接認證方式為
PLAIN-SASL
,可以理解為用戶名(userName)和密碼(password)認證。物聯網平臺的云端認證userName和password通過后,建立Connection
。此外,根據AMQP協議,AMQP客戶端建連時,還需在Open幀中攜帶心跳時間,即AMQP協議的idle-time-out參數。心跳時間單位為毫秒,取值范圍為30,000~300,000。如果超過心跳時間,
Connection
上沒有任何幀通信,物聯網平臺將關閉連接。SDK不同,idle-time-out參數設置方法不同。具體設置方法,請參見各語言SDK示例文檔。AMQP客戶端向物聯網平臺的云端發起請求,建立
Receiver Link
(即云端向客戶端推送數據的單向通道)。客戶端建立
Connection
成功后,需在15秒內完成Receiver Link
的建立,否則物聯網平臺會關閉連接。建立
Receiver Link
后,客戶端成功接入物聯網平臺。說明一個
Connection
上只能創建一個Receiver Link,不支持創建Sender Link
,即只能由物聯網平臺的云端向客戶端推送消息,客戶端不能向云端發送消息。Receiver Link
在不同SDK中名稱不同,例如在有的SDK上稱為MessageConsumer,請根據具體SDK設置。
連接配置說明
AMQP客戶端接入物聯網平臺的連接地址和連接認證參數說明如下:
接入域名和端口
公共實例和企業版實例中,AMQP的接入域名,請參見管理實例終端節點。
接入域名為AMQP SDK示例代碼中的${YourHost}
。
對于Java、.NET、Python 2.7、Node.js、Go客戶端:端口號為5671。
對于Python3、PHP客戶端:端口號為61614。
客戶端身份認證參數
不同身份賬號使用AMQP SDK將AMQP客戶端接入物聯網平臺,配置的認證參數有區別。
如果是當前物聯網平臺所屬阿里云主賬號或其下直接授權的RAM用戶,認證參數如下:
說明對于直接授權的RAM用戶,需要給該RAM用戶授予操作AMQP服務端訂閱功能的權限(iot:sub),否則將會連接失敗。授權方法,請參見IoT授權映射表。
為提升物聯網平臺數據安全,推薦通過RAM角色授予RAM用戶指定的操作權限。具體說明,請參見下文。
userName = clientId|iotInstanceId=${iotInstanceId},authMode=aksign,signMethod=hmacsha1,consumerGroupId=${consumerGroupId},authId=${accessKey},timestamp=1573489088171| password = signMethod(stringToSign, accessSecret)
如果是通過RAM角色授權的RAM用戶,認證參數如下:
說明通過RAM角色授權的RAM用戶除了當前物聯網平臺所屬阿里云主賬號下的RAM用戶,還支持跨賬號(其他阿里云主賬號)下的RAM用戶。關于如何通過RAM角色授權RAM用戶操作物聯網平臺服務端訂閱功能,請參見本賬號RAM用戶授權服務端訂閱和跨賬號RAM用戶授權服務端訂閱。
userName = clientId|iotInstanceId=${iotInstanceId},authMode=ststoken,securityToken=${SecurityToken},signMethod=hmacsha1,consumerGroupId=${consumerGroupId},authId=${accessKey},timestamp=1573489088171| password = signMethod(stringToSign, accessSecret)
表 1. userName參數說明
參數
是否必傳
說明
clientId
是
表示客戶端ID,需您自定義,長度不可超過64個字符。建議使用您的AMQP客戶端所在服務器UUID、MAC地址、IP等唯一標識。
AMQP客戶端接入并啟動成功后,登錄物聯網平臺控制臺,在對應實例的 頁簽,單擊消費組對應的查看,消費組詳情頁面將顯示該參數,方便您識別區分不同的客戶端。
iotInstanceId
否
當前物聯網平臺實例的ID。您可在物聯網平臺控制臺的實例概覽頁簽,查看當前實例的ID。
若有ID值,必須傳入該ID值。
若無實例概覽頁簽或ID值,則無需傳入。
authMode
是
鑒權模式。
當前物聯網平臺所屬阿里云主賬號或其下直接授權的RAM用戶:使用
aksign
模式。通過RAM角色授權的RAM用戶:使用
ststoken
模式。
securityToken
否
重要僅通過RAM角色授權的RAM用戶接入AMQP客戶端時,需配置此參數。
RAM用戶扮演RAM角色的臨時身份憑證((STS Token)),可以通過調用AssumeRole接口獲取,具體內容,請參見AssumeRole。
signMethod
是
簽名算法。可選:
hmacmd5
、hmacsha1
和hmacsha256
。consumerGroupId
是
當前物聯網平臺對應實例中的消費組ID。
登錄物聯網平臺控制臺,在對應實例的
查看您的消費組ID。authId
是
認證信息。
對于當前物聯網平臺所屬阿里云主賬號或其下直接授權的RAM用戶
分別對應取值為阿里云主賬號的AccessKey ID,或RAM用戶的AccessKey ID。
登錄物聯網平臺控制臺,將鼠標移至賬號頭像上,然后單擊AccessKey管理,獲取AccessKey。
對于通過RAM角色授權的RAM用戶
取值為扮演RAM角色的RAM用戶的AccessKey ID。
timestamp
是
當前時間。Long類型的毫秒值時間戳。
表 2. password參數說明 參數
是否必傳
說明
signMethod
是
簽名算法。請使用userName中指定的簽名算法計算簽名值,并轉為base64字符串。
stringToSign
是
待簽名的字符串。
將需要簽名的參數的鍵值對按照首字母字典排序,并在鍵值間添加等號(=);參數間添加與號(&),拼接成待簽名的字符串。
對于當前物聯網平臺所屬阿里云主賬號或其下直接授權的RAM用戶
需要簽名的參數為:
authId
和timestamp
。待簽名的字符串為:
stringToSign = authId=${accessKey}×tamp=1573489088171
。對于通過RAM角色授權的RAM用戶
需要簽名的參數為:
securityToken
、authId
和timestamp
。待簽名的字符串為:
stringToSign = authId=${accessKey}&securityToken=${SecurityToken}×tamp=1573489088171
。
accessSecret
是
對于當前物聯網平臺所屬阿里云主賬號或其下直接授權的RAM用戶
分別對應取值為阿里云主賬號的AccessKey Secret,或RAM用戶的AccessKey Secret。
登錄物聯網平臺控制臺,將鼠標移至賬號頭像上,然后單擊AccessKey管理,獲取AccessKey。
對于通過RAM角色授權的RAM用戶
取值為扮演RAM角色的RAM用戶的AccessKey Secret。
客戶端接收消息邏輯
客戶端和物聯網平臺云端之間的Receiver Link建連成功后,云端就可以在這條Link上向AMQP客戶端推送消息。
客戶端僅支持接收物聯網平臺已經訂閱的消息,要向設備發送消息或指令,可根據需要,調用對應的API。更多信息,請參見API列表。
消息推送
物聯網平臺推送的消息:
消息體:消息的payload為二進制格式。
消息的業務屬性:如消息Topic和Message ID等,需要從AMQP協議的Application Properties中獲取。格式為
key:value
。Key
含義
topic
消息Topic。
messageId
消息ID。
generateTime
消息生成時間。
說明消息生成時間generateTime不能作為判斷消息順序的依據。
消息回執
按照AMQP協議的定義,客戶端需要給物聯網平臺的云端回執(AMQP協議上一般稱為settle),通知云端消息已經被成功接收。AMQP客戶端通常會提供自動回執模式(推薦)和手動回執模式。具體請參考相應的客戶端的使用說明。
消息處理
AMQP客戶端接收到消息后,業務層處理該消息的邏輯和方法,由您自行完成開發。
消息策略
實時消息直接推送。
進入堆積列隊的消息:
由于消費客戶端離線、消息消費慢等原因,消息不能實時消費,而進入堆積隊列。
消費客戶端重新上線并恢復穩定消費能力后,物聯網平臺重試推送堆積消息。
如果客戶端對重試推送的消息消費失敗,可能導致堆積隊列阻塞。按大約一分鐘間隔,物聯網平臺向客戶端再次重試推送。
消息時序
消息不保序,即接收到消息的時間順序不一定是消息實際產生的時間順序。
設備上下線消息:
收到消息的順序不是實際設備上下線時間排序。設備上下線順序需按照time具體值排序。
例如,您依次收到3條消息:
上線:
2018-08-31 10:02:28.195
。下線:
2018-08-31 10:01:28.195
。下線:
2018-08-31 10:03:28.195
。
這3條消息展示了,設備先下線,再上線,最后下線的過程。
關于消息中參數的更多信息,請參見數據格式。
其他類型的消息:
您需要在業務層,給消息增加序列號。根據接收到消息中的序列號,冪等判斷消息是否需要處理。