日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

數據轉發到AMQP服務端訂閱消費組消費

直接使用AMQP服務端訂閱實時獲取設備數據,只能從產品維度獲取所有設備的數據,使用消息轉發的云產品流轉功能,可以將全部設備或指定設備發送到物聯網平臺的消息,先經過解析腳本處理和過濾,再轉發到AMQP服務端訂閱消費組,通過AMQP客戶端消費。本文以物模型數據上報Topic為例,介紹流轉消息數據的完整流程。

工作原理

image

云產品流轉可將同一產品所有設備或指定設備的指定Topic消息,實時轉發到一個或多個消費組中,每個消費組中包括多個消費者即AMQP客戶端。每條消息轉發到消費組時,消費組中隨機一個消費者收到消息,不同消費組通過消費組ID區分。在上圖中:

  1. 消費者:使用AMQP SDK注冊消費組的消費者,用于接收物聯網平臺轉發到AMQP消費組中的消息。

  2. 物聯網平臺:配置云產品流轉數據源Topic數據目的解析器腳本,并啟動解析器,將設備消息轉發到AMQP服務端訂閱的消費組。

  3. AMQP客戶端:啟動解析器后,物聯網平臺會自動將設備消息轉發到AMQP客戶端,不需要在AMQP客戶端代碼中訂閱Topic,只要AMQP客戶端在線就可以接收消息。

說明

云產品流轉不支持從AMQP客戶端下發消息給設備,如果需要下發指令給設備,請調用消息通信API

應用場景

業務服務器接收設備消息:云產品流轉可以靈活地轉發設備消息到AMQP服務端訂閱的消息組。

  • 轉發指定設備的消息。

  • 轉發指定Topic的消息。

  • 消息過濾或處理后再轉發。

AMQP客戶端直接實時獲取指定產品下所有設備的消息,可以直接配置AMQP服務端訂閱。具體內容,請參見配置AMQP服務端訂閱

前提條件

使用限制

  • AMQP客戶端建立連接之后,需要立刻發送認證請求。如果15秒內沒有認證成功,服務器會主動關閉連接。

  • AMQP客戶端的一個連接限流1,000 TPS,消息轉發TPS限流由實例的消息轉發TPS規格決定,消息大小無限制。更多的AMQP服務端訂閱限制,請參見服務端訂閱使用限制

步驟一:在物聯網平臺配置數據目的和解析器

step1:配置數據目的

  1. 登錄物聯網平臺控制臺

  2. 實例概覽頁簽的全部環境下,找到對應的實例,單擊實例卡片。

  3. 在左側導航欄,選擇消息轉發 > 云產品流轉

  4. 云產品流轉頁面,單擊右上角體驗新版,進入新版功能頁面。

    說明

    如果您已執行過此操作,再次進入云產品流轉頁面,會直接進入新版功能頁面。

  5. 單擊數據目的頁簽,然后單擊創建數據目的

  6. 創建數據目的對話框,輸入數據目的名稱,例如DataPurpose,按照以下參數說明,完成配置,然后單擊確定

    選擇操作

    參數

    描述

    選擇操作

    選擇發布到AMQP服務端訂閱消費組

    消費組

    選擇一個已創建的消費組作為數據轉發目標。單擊創建消費組可以進行消費組創建。

step2:配置并啟動解析器

  1. 創建解析器,例如DataParser。具體操作,請參見創建解析器

  2. 解析器詳情頁面,關聯數據源。

    1. 在配置向導的數據源下,單擊關聯數據源

    2. 在彈出的對話框中,單擊數據源下拉列表,選擇已創建的數據源DataSource,單擊確定

  3. 解析器詳情頁面,關聯數據目的。

    1. 單擊配置向導的數據目的,然后單擊數據目的列表右上方的關聯數據目的

    2. 在彈出的對話框中,單擊數據目的下拉列表,選擇已創建的數據目的DataPurpose,單擊確定

    3. 在數據目的列表,查看并保存數據目的ID,例如為1000

      后續解析腳本中,需使用此處的數據目的ID

  4. 解析器詳情頁面,單擊解析器

  5. 在腳本輸入框,輸入解析腳本。

    解析腳本類似JavaScript語言,編輯腳本的語法參考JavaScript語法,詳細編輯方法,請參見腳本語法

    轉發數據到AMQP服務端訂閱消息組,需要使用函數writeAmqp(destinationId, payload, tag)函數參數說明,請參見函數列表

    • 無需指定設備,轉發產品下全部設備的數據:

      //通過payload函數,獲取設備上報的消息內容,并按照JSON格式轉換。
      var data = payload("json");
      //直接流轉物模型上報數據。
      writeAmqp(1000, data, "調試");
    • 指定某一個設備,僅轉發該設備的消息:

      //通過payload函數,獲取設備上報的消息內容,并按照JSON格式轉換。
      var data = payload("json");
      //獲取上報消息的設備名稱。
      var dn = deviceName();
      //流轉指定設備的物模型上報數據。
      if (dn == 'device01') { 
          writeAmqp(1000, data, "調試");  
      }
  6. 單擊調試,根據頁面提示,選擇產品和設備,輸入Topic和Payload數據,驗證腳本可執行。

    參數示例如下:調試示例

    運行結果如下,表示腳本執行成功。

    調試結果

  7. 單擊發布

  8. 回到云產品流轉頁面的解析器頁簽,單擊解析器DataParser對應的啟動按鈕,啟動解析器。

步驟二:運行AMQP客戶端

重要
  1. 登錄ECS實例。登錄方式,請參見連接方式概述

  2. 執行以下命令,下載Demo文件。

    wget https://linkkit-export.oss-cn-shanghai.aliyuncs.com/amqp/amqp-demo.zip
  3. 執行以下命令,解壓demo文件。

    unzip amqp-demo.zip
  4. src/main/java/com.aliyun.iotx.demo目錄下AmqpClient.java文件中,參照下表修改AMQP的接入信息。

    重要

    本示例Demo代碼中,添加了結束程序的代碼(Thread.sleep(60 * 1000);),即程序啟動成功,運行一分鐘后會結束。實際場景中,您可根據需要自行設置運行時間。

    參數

    說明

    accessKey

    阿里云主賬號或RAM用戶的AccessKey ID和AccessKey Secret。

    登錄物聯網平臺控制臺,將鼠標移至賬號頭像上,然后單擊AccessKey管理,獲取AccessKey ID和AccessKey Secret。

    重要

    為避免將AccessKey硬編碼到業務代碼中帶來的安全風險,可采用配置環境變量的方法管理AccessKey。

    您需在本地操作系統中添加環境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET,并分別寫入已準備好的AccessKey ID和AccessKey Secret。

    在示例代碼中可通過以下方法獲取:

    • System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")

    • System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")

    accessSecret

    consumerGroupId

    當前物聯網平臺對應實例中的消費組ID。

    登錄物聯網平臺控制臺,在對應實例的消息轉發 > 服務端訂閱 > 消費組列表查看您的消費組ID。

    iotInstanceId

    實例ID。您可在物聯網平臺控制臺實例概覽頁面,查看當前實例的ID。

    • 若有ID值,必須傳入該ID值。

    • 若無實例概覽頁面或ID值,傳入空值,即iotInstanceId = ""

    clientId

    表示客戶端ID,用戶自定義,長度不可超過64個字符。建議使用您的AMQP客戶端所在服務器UUID、MAC地址、IP等唯一標識。

    AMQP客戶端接入并啟動成功后,登錄物聯網平臺控制臺,在對應實例的消息轉發 > 服務端訂閱 > 消費組列表頁簽,單擊消費組對應的查看消費組詳情頁面將顯示該參數,方便您識別區分不同的客戶端。

    connectionCount

    啟動AMQP客戶端的連接數,最大不超過128個。用于實時消息推送的擴容。

    消費組詳情頁面會以${clientId}+"-"+數字形式,顯示連接的客戶端。其中數字最小值為0。

    host

    AMQP接入域名。

    ${YourHost}對應的AMQP接入域名信息,請參見查看和配置實例終端節點信息(Endpoint)

  5. pom.xml文件中,已添加相關Maven依賴。在amqp-demo根目錄執行以下命令,重新加載Maven變更,構建項目。

    mvn clean package
  6. amqp-demo/target目錄執行以下命令,運行生成的JAR包。

    java -jar demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
  7. 運行示例代碼后返回如下信息,表示AMQP客戶端已接入物聯網平臺并成功接收消息。

    重要

    只有當AMQP客戶端在線時,才能在服務器上收到設備消息。

    10:42:43.254 [main] INFO com.aliyun.iotx.demo.AmqpClient - amqp demo is started successfully, and will exit after 60s 
    10:59:46.405 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Dispatching received message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.409 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1:1 } ackType: DELIVERED (5)
    10:59:46.432 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Delivered Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.441 [JmsSession [ID:4e6d6f2b-1423-4c44-91ca-37202******:1:1] delivery dispatcher] DEBUG org.apache.qpid.jms.provider.failover.FailoverProvider - Executing Failover Task: message acknowledge -> JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 } ackType: ACCEPTED (6)
    10:59:46.442 [AmqpProvider :(3):[amqps://iot-cn-6******.amqp.iothub.aliyuncs.com:5671]] DEBUG org.apache.qpid.jms.provider.amqp.AmqpConsumer - Accepted Ack of message: JmsInboundMessageDispatch { sequence = 1, messageId = 1731508564705******, consumerId = ID:4e6d6f2b-1423-4c44-91ca-37202a******:1:1:1 }
    10:59:46.452 [pool-1-thread-1] INFO com.aliyun.iotx.demo.AmqpClient - receive message,
     topic = /g18******/device01/thing/event/property/post,
     messageId = 1731508564705******,
     content = {"temperature":10,"humidity":56}

    在相應消費組顯示在線的AMQP客戶端。amqp-democonnectionCount = 4代表4個客戶端。

    image.png

后續操作

  1. 如果AMQP客戶端不在線,AMQP服務端訂閱消息會堆積,AMQP客戶端重新上線后,物聯網平臺重新推送消息。如果不需要消費堆積的消息,可在AMQP客戶端上線前,清空堆積的消息

  2. 所有配置完成,設備上報訂閱數據并被AMQP客戶端接收后,您可以登錄物聯網平臺控制臺,進入對應實例查看消息運行日志。

    • 監控運維 > 日志服務 > 云端運行日志頁簽,查看設備上報數據、物聯網平臺轉發數據到AMQP客戶端和AMQP客戶端返回ACK的日志記錄。具體操作,請參見查詢云端運行日志

    • 消息轉發 > 服務端訂閱 > 消費組列表頁簽,單擊目標消費組右側操作列的查看,在消費組詳情頁面,查看消息消費速率、消息堆積量、消費日志等。具體操作,請參見查看和監控消費組

常見問題