數(shù)據(jù)轉(zhuǎn)發(fā)到消息隊(duì)列RocketMQ
如果當(dāng)前產(chǎn)品下設(shè)備與物聯(lián)網(wǎng)平臺(tái)之間的上下行消息量大于1,000 QPS,需要將設(shè)備上報(bào)至物聯(lián)網(wǎng)平臺(tái)的數(shù)據(jù)轉(zhuǎn)發(fā)至您業(yè)務(wù)服務(wù)器的應(yīng)用中進(jìn)行消費(fèi)(例如過濾、分析、存儲(chǔ)等),可以使用云產(chǎn)品流轉(zhuǎn)功能將物聯(lián)網(wǎng)平臺(tái)數(shù)據(jù)轉(zhuǎn)發(fā)到消息隊(duì)列(RocketMQ)中消費(fèi),以實(shí)現(xiàn)消息從設(shè)備、物聯(lián)網(wǎng)平臺(tái)、RocketMQ到應(yīng)用服務(wù)器之間的全鏈路高可靠傳輸能力。本文以物模型數(shù)據(jù)上報(bào)Topic為例,介紹流轉(zhuǎn)消息數(shù)據(jù)的完整流程。
工作原理
云產(chǎn)品流轉(zhuǎn)可將設(shè)備消息轉(zhuǎn)發(fā)到RocketMQ SDK客戶端消費(fèi)。上圖工作流程:
業(yè)務(wù)服務(wù)器:使用RocketMQ SDK注冊(cè)一個(gè)消費(fèi)者,用于接收物聯(lián)網(wǎng)平臺(tái)轉(zhuǎn)發(fā)到RocketMQ中的消息。
物聯(lián)網(wǎng)平臺(tái):配置云產(chǎn)品流轉(zhuǎn)將設(shè)備消息轉(zhuǎn)發(fā)到RocketMQ。
數(shù)據(jù)源:支持的Topic類型消息,請(qǐng)參見數(shù)據(jù)格式(非云網(wǎng)關(guān)產(chǎn)品和設(shè)備)、自定義Topic(MQTT云網(wǎng)關(guān))、消息轉(zhuǎn)發(fā)Topic(GB/T 32960云網(wǎng)關(guān))、消息轉(zhuǎn)發(fā)Topic(JT/T 808云網(wǎng)關(guān))、消息轉(zhuǎn)發(fā)Topic(SL 651云網(wǎng)關(guān))。
數(shù)據(jù)目的:RocketMQ中接收設(shè)備消息的Topic,需要先在云消息隊(duì)列RocketMQ版控制臺(tái)創(chuàng)建。具體操作,請(qǐng)參見創(chuàng)建Topic。
解析器腳本:配置通過數(shù)據(jù)流轉(zhuǎn)函數(shù)
writeMq(destinationId, payload, tag)
將設(shè)備數(shù)據(jù)轉(zhuǎn)發(fā)到RocketMQ的Topic中。函數(shù)說明,請(qǐng)參見流轉(zhuǎn)數(shù)據(jù)到數(shù)據(jù)目的函數(shù)。
您可為設(shè)備消息設(shè)置屬性和標(biāo)簽,用于后續(xù)消費(fèi)者消費(fèi)時(shí)指定過濾條件。
云消息隊(duì)列RocketMQ版:RocketMQ SDK注冊(cè)的消費(fèi)者獲取消息時(shí)會(huì)觸發(fā)服務(wù)端的動(dòng)態(tài)過濾計(jì)算,RocketMQ根據(jù)該消費(fèi)者上報(bào)的過濾條件的表達(dá)式進(jìn)行匹配,并將符合條件的消息投遞給該消費(fèi)者,實(shí)現(xiàn)消費(fèi)者所屬業(yè)務(wù)服務(wù)器接收設(shè)備上報(bào)至物聯(lián)網(wǎng)平臺(tái)的消息。
應(yīng)用場景
業(yè)務(wù)服務(wù)器接收設(shè)備消息:云產(chǎn)品流轉(zhuǎn)可以靈活地轉(zhuǎn)發(fā)設(shè)備消息到消息隊(duì)列RocketMQ中消費(fèi)。
對(duì)設(shè)備數(shù)據(jù)進(jìn)行復(fù)雜或精細(xì)化處理的海量設(shè)備場景。
設(shè)備消息量大于1,000 QPS的場景。
使用限制
物聯(lián)網(wǎng)平臺(tái)實(shí)例及所在地域支持將數(shù)據(jù)轉(zhuǎn)發(fā)到消息隊(duì)列RocketMQ。支持的地域詳細(xì)信息,請(qǐng)參見各地域功能說明。
目前,RocketMQ實(shí)例所在地域必須與當(dāng)前物聯(lián)網(wǎng)平臺(tái)實(shí)例所在地域保持一致。存量的跨地域?qū)嵗聰?shù)據(jù)流轉(zhuǎn)配置不受影響,仍可繼續(xù)正常流轉(zhuǎn)數(shù)據(jù)。
僅支持將數(shù)據(jù)流轉(zhuǎn)到RocketMQ 4.x或5.x版本實(shí)例的Topic中。如果是RocketMQ 5.x版本實(shí)例的Topic,該Topic的消息類型必須為普通消息。
目前,新版和舊版云產(chǎn)品流轉(zhuǎn)功能均支持將數(shù)據(jù)流轉(zhuǎn)到消息隊(duì)列RocketMQ。舊版云產(chǎn)品流轉(zhuǎn)使用示例,請(qǐng)參見數(shù)據(jù)轉(zhuǎn)發(fā)到消息隊(duì)列RocketM(舊版)。
前提條件
已添加待轉(zhuǎn)發(fā)的設(shè)備Topic數(shù)據(jù)源。例如:創(chuàng)建數(shù)據(jù)源DataSource,添加指定設(shè)備的物模型數(shù)據(jù)上報(bào)Topic。具體步驟,請(qǐng)參見添加待流轉(zhuǎn)的數(shù)據(jù)源。
已創(chuàng)建消息隊(duì)列(RocketMQ)實(shí)例和用于接收數(shù)據(jù)的Topic和ConsumerGroup。具體操作,請(qǐng)參見創(chuàng)建資源。
已在業(yè)務(wù)服務(wù)器中調(diào)用RocketMQ SDK用來消費(fèi)消息。具體操作,請(qǐng)參見調(diào)用SDK消費(fèi)消息。
背景信息
當(dāng)使用RocketMQ 5.0時(shí),轉(zhuǎn)發(fā)的數(shù)據(jù)目的配置完成后,會(huì)自動(dòng)完成以下配置,實(shí)現(xiàn)設(shè)備數(shù)據(jù)通過物聯(lián)網(wǎng)平臺(tái)的規(guī)則引擎轉(zhuǎn)發(fā)到消息隊(duì)列RocketMQ。
物聯(lián)網(wǎng)平臺(tái)占用RocketMQ實(shí)例所在虛擬交換機(jī)的2個(gè)IP地址。
在RocketMQ實(shí)例所在的VPC網(wǎng)絡(luò)下創(chuàng)建托管安全組,安全組名稱默認(rèn)以sg-nsm-開頭。
步驟一:創(chuàng)建數(shù)據(jù)目的
在實(shí)例概覽頁簽的全部環(huán)境下,找到對(duì)應(yīng)的實(shí)例,單擊實(shí)例卡片。
在左側(cè)導(dǎo)航欄,選擇 。
在云產(chǎn)品流轉(zhuǎn)頁面,單擊右上角體驗(yàn)新版,進(jìn)入新版功能頁面。
說明如果您已執(zhí)行過此操作,再次進(jìn)入云產(chǎn)品流轉(zhuǎn)頁面,會(huì)直接進(jìn)入新版功能頁面。
單擊數(shù)據(jù)目的頁簽,然后單擊創(chuàng)建數(shù)據(jù)目的。
在創(chuàng)建數(shù)據(jù)目的對(duì)話框,輸入數(shù)據(jù)目的名稱,例如DataPurpose,按照以下參數(shù)說明,完成配置,然后單擊確定。
參數(shù)
說明
選擇操作
選擇發(fā)送數(shù)據(jù)到消息隊(duì)列(RocketMQ)中。
地域
選擇RocketMQ所在地域。
實(shí)例
選擇RocketMQ實(shí)例。
您可以單擊創(chuàng)建實(shí)例,跳轉(zhuǎn)到消息隊(duì)列控制臺(tái),創(chuàng)建RocketMQ實(shí)例,請(qǐng)參見消息隊(duì)列文檔。
Topic
選擇用于接收物聯(lián)網(wǎng)平臺(tái)數(shù)據(jù)的RocketMQ Topic。
您可以單擊創(chuàng)建Topic,跳轉(zhuǎn)到消息隊(duì)列控制臺(tái),創(chuàng)建RocketMQ Topic。
授權(quán)
授權(quán)物聯(lián)網(wǎng)平臺(tái)將數(shù)據(jù)寫入RocketMQ。
如您還未創(chuàng)建相關(guān)角色,單擊創(chuàng)建RAM角色,跳轉(zhuǎn)到RAM控制臺(tái),創(chuàng)建角色和授權(quán)策略,請(qǐng)參見創(chuàng)建RAM角色。
步驟二:配置并啟動(dòng)解析器
創(chuàng)建解析器,例如DataParser。具體操作,請(qǐng)參見步驟一:創(chuàng)建解析器。
在解析器詳情頁面,關(guān)聯(lián)數(shù)據(jù)源。
在配置向?qū)У?b data-tag="uicontrol" id="uicontrol-f9s-25z-bkv" class="uicontrol">數(shù)據(jù)源下,單擊關(guān)聯(lián)數(shù)據(jù)源。
在彈出的對(duì)話框中,單擊數(shù)據(jù)源下拉列表,選擇已創(chuàng)建的數(shù)據(jù)源DataSource,單擊確定。
在解析器詳情頁面,關(guān)聯(lián)數(shù)據(jù)目的。
單擊配置向?qū)У?b data-tag="uicontrol" id="uicontrol-msn-zik-exs" class="uicontrol">數(shù)據(jù)目的,然后單擊數(shù)據(jù)目的列表右上方的關(guān)聯(lián)數(shù)據(jù)目的。
在彈出的對(duì)話框中,單擊數(shù)據(jù)目的下拉列表,選擇已創(chuàng)建的數(shù)據(jù)目的DataPurpose,單擊確定。
在數(shù)據(jù)目的列表,查看并保存數(shù)據(jù)目的ID,例如為1000。
后續(xù)解析腳本中,需使用此處的數(shù)據(jù)目的ID。
在解析器詳情頁面,單擊解析器。
在腳本輸入框,輸入解析腳本。
函數(shù)參數(shù)說明,請(qǐng)參見函數(shù)列表。
//通過payload函數(shù),獲取設(shè)備上報(bào)的消息內(nèi)容,并按照J(rèn)SON格式轉(zhuǎn)換。 var data = payload("json"); //直接流轉(zhuǎn)物模型上報(bào)數(shù)據(jù)。 writeMq(1000, data, "調(diào)試");
單擊調(diào)試,根據(jù)頁面提示,選擇產(chǎn)品和設(shè)備,輸入Topic和Payload數(shù)據(jù),驗(yàn)證腳本可執(zhí)行。
參數(shù)示例如下:
運(yùn)行結(jié)果如下,表示腳本執(zhí)行成功。
單擊發(fā)布。
回到云產(chǎn)品流轉(zhuǎn)頁面的解析器頁簽,單擊解析器DataParser對(duì)應(yīng)的啟動(dòng)按鈕,啟動(dòng)解析器。
在RocketMQ控制臺(tái),查看是否成功接收到消息。
步驟三:查看設(shè)備消息轉(zhuǎn)發(fā)的運(yùn)行日志
在物聯(lián)網(wǎng)平臺(tái)控制臺(tái),單擊目標(biāo)企業(yè)版實(shí)例,查看設(shè)備狀態(tài)和消息轉(zhuǎn)發(fā)日志。具體操作,請(qǐng)參見云端運(yùn)行日志。
在業(yè)務(wù)服務(wù)器中已運(yùn)行的訂閱RocketMQ資源的終端,查看訂閱和消費(fèi)消息的日志。
在云消息隊(duì)列RocketMQ版控制臺(tái),在實(shí)例詳情頁面,查看消費(fèi)者接收到消息的具體內(nèi)容和消息軌跡。具體操作,請(qǐng)參見查詢消息軌跡。