直接使用AMQP服務端訂閱實時獲取設備數據,只能從產品維度獲取所有設備的數據,使用消息轉發的云產品流轉功能,可以將全部設備或指定設備發送到物聯網平臺的消息,先經過SQL表達式處理和過濾,再轉發到AMQP服務端訂閱消費組,通過AMQP客戶端消費。
工作原理
云產品流轉可將同一產品所有設備或指定設備的指定Topic消息,實時轉發到一個或多個消費組中,每個消費組中包括多個消費者即AMQP客戶端。每條消息轉發到消費組時,消費組中隨機一個消費者收到消息,不同消費組通過消費組ID區分。在上圖中:
消費者:使用AMQP SDK注冊消費組的消費者,用于接收物聯網平臺轉發到AMQP消費組中的消息。
物聯網平臺:配置云產品流轉規則的數據源Topic、SQL表達式和數據目的,并啟動規則,將設備消息轉發到AMQP服務端訂閱的消費組。
AMQP客戶端:啟動規則后,物聯網平臺會自動將設備消息轉發到AMQP客戶端,不需要在AMQP客戶端代碼中訂閱Topic,只要AMQP客戶端在線就可以接收消息。
云產品流轉不支持從AMQP客戶端下發消息給設備,如果需要下發指令給設備,請調用消息通信API。
應用場景
業務服務器接收設備消息:云產品流轉可以靈活地轉發設備消息到AMQP服務端訂閱的消息組。
轉發指定設備的消息。
轉發指定Topic的消息。
消息過濾或處理后再轉發。
AMQP客戶端直接實時獲取指定產品下所有設備的消息,可以直接配置AMQP服務端訂閱。具體內容,請參見配置AMQP服務端訂閱。
使用限制
前提條件
已創建消費組,作為數據轉發目的地。您可使用物聯網平臺默認消費組(DEFAULT_GROUP)或創建消費組。
步驟一:配置數據轉發目的
登錄物聯網平臺控制臺。
在實例概覽頁簽的全部環境下,找到對應的實例,單擊實例卡片。
在左側導航欄,選擇 。
單擊規則對應的查看,進入數據流轉規則頁面。
重要若當前頁面為云產品流轉新版頁面,需先單擊右上角返回舊版,再單擊目標規則對應的查看。
單擊轉發數據一欄對應的添加操作。
在添加操作對話框中,選擇操作為發布到AMQP服務端訂閱消費組。按照界面提示,設置其他信息,單擊確認。
參數
描述
選擇操作
選擇發布到AMQP服務端訂閱消費組。
消費組
選擇一個已創建的消費組作為數據轉發目標。單擊創建消費組可以進行消費組創建。
Tag
設置tag后,所有通過該操作流轉到AMQP服務端訂閱消費組里的消息都會攜帶該tag。
tag長度為1~128個字符,可以輸入常量或變量。
常量支持輸入中文漢字、英文字母、數字。
變量格式為
${key}
,代表SQL處理后的JSON數據中key對應的value值。如果取不到value值,則消息不攜帶tag。
回到云產品流轉頁,單擊規則對應的啟動按鈕啟動規則。
步驟二:運行AMQP客戶端
建議使用阿里云物聯網平臺提供的AMQP SDK接入示例。對于您自研的AMQP SDK,阿里云不提供后續技術支持服務。
本示例使用Java語言,其他語言的示例請參見AMQP客戶端接入說明。
本示例購買Alibaba Cloud Linux操作系統的ECS實例,作為AMQP客戶端的開發環境:
登錄ECS實例。登錄方式,請參見連接方式概述。
執行以下命令,下載Demo文件。
wget https://linkkit-export.oss-cn-shanghai.aliyuncs.com/amqp/amqp-demo.zip
執行以下命令,解壓demo文件。
unzip amqp-demo.zip
在
src/main/java/com.aliyun.iotx.demo
目錄下AmqpClient.java
文件中,參照下表修改AMQP的接入信息。重要本示例Demo代碼中,添加了結束程序的代碼(
Thread.sleep(60 * 1000);
),即程序啟動成功,運行一分鐘后會結束。實際場景中,您可根據需要自行設置運行時間。參數
說明
accessKey
阿里云主賬號或RAM用戶的AccessKey ID和AccessKey Secret。
登錄物聯網平臺控制臺,將鼠標移至賬號頭像上,然后單擊AccessKey管理,獲取AccessKey ID和AccessKey Secret。
重要為避免將AccessKey硬編碼到業務代碼中帶來的安全風險,可采用配置環境變量的方法管理AccessKey。
您需在本地操作系統中添加環境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET,并分別寫入已準備好的AccessKey ID和AccessKey Secret。
在示例代碼中可通過以下方法獲取:
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
accessSecret
consumerGroupId
當前物聯網平臺對應實例中的消費組ID。
登錄物聯網平臺控制臺,在對應實例的
查看您的消費組ID。iotInstanceId
實例ID。您可在物聯網平臺控制臺的實例概覽頁面,查看當前實例的ID。
若有ID值,必須傳入該ID值。
若無實例概覽頁面或ID值,傳入空值,即
iotInstanceId = ""
。
clientId
表示客戶端ID,用戶自定義,長度不可超過64個字符。建議使用您的AMQP客戶端所在服務器UUID、MAC地址、IP等唯一標識。
AMQP客戶端接入并啟動成功后,登錄物聯網平臺控制臺,在對應實例的 頁簽,單擊消費組對應的查看,消費組詳情頁面將顯示該參數,方便您識別區分不同的客戶端。
connectionCount
啟動AMQP客戶端的連接數,最大不超過128個。用于實時消息推送的擴容。
消費組詳情頁面會以
${clientId}+"-"+數字
形式,顯示連接的客戶端。其中數字最小值為0。host
AMQP接入域名。
${YourHost}
對應的AMQP接入域名信息,請參見查看和配置實例終端節點信息(Endpoint)。在
pom.xml
文件中,已添加相關Maven依賴。在amqp-demo
根目錄執行以下命令,重新加載Maven變更,構建項目。mvn clean package
在
amqp-demo/target
目錄執行以下命令,運行生成的JAR包。java -jar demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
運行示例代碼后返回如下信息,表示AMQP客戶端已接入物聯網平臺并成功接收消息。
重要只有當AMQP客戶端在線時,才能在服務器上收到設備消息。
10:42:43.254 [main] INFO com.aliyun.iotx.demo.AmqpClient - amqp demo is started successfully, and will exit after 60s 10:59:46.405 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Dispatching received message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.409 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1:1 } ackType: DELIVERED (5) 10:59:46.432 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Delivered Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.441 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } ackType: ACCEPTED (6) 10:59:46.442 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Accepted Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } 10:59:46.452 [pool-1-thread-1] INFO com.aliyun.iotx.demo.AmqpClient - receive message, topic = /g18******/device01/thing/event/property/post, messageId = 1731508564705******, content = {"temperature":10,"humidity":56}
在相應消費組顯示在線的AMQP客戶端。
amqp-demo
中connectionCount = 4
代表4個客戶端。