基于事件流實現(xiàn)RocketMQ消息路由
本文介紹如何應(yīng)用事件總線EventBridge的事件流功能實現(xiàn)云消息隊列 RocketMQ 版的消息路由。
前提條件
您已購買并部署云消息隊列 RocketMQ 版實例,且實例處于服務(wù)中狀態(tài)。具體步驟,請參見創(chuàng)建實例。
背景信息
事件流作為更輕量、實時端到端的流式事件通道,提供輕量流式數(shù)據(jù)的過濾和轉(zhuǎn)換的能力,在不同的數(shù)據(jù)倉庫之間、數(shù)據(jù)處理程序之間、數(shù)據(jù)分析和處理系統(tǒng)之間進(jìn)行數(shù)據(jù)同步。源端云消息隊列 RocketMQ 版生產(chǎn)的消息可以通過事件流這個通道被路由到目標(biāo)端的云消息隊列 RocketMQ 版,無需定義事件總線。更多信息,請參見事件流概述。
步驟一:在目標(biāo)端創(chuàng)建事件流
事件流需要在目標(biāo)端創(chuàng)建,例如如果需要把華北2(北京)的RocketMQ消息路由到華東1(杭州),那么需要在華東1(杭州)創(chuàng)建事件流任務(wù)。
- 登錄事件總線EventBridge控制臺。
- 在頂部菜單欄,選擇地域。
- 在左側(cè)導(dǎo)航欄,單擊事件流。
在事件流頁面,單擊創(chuàng)建事件流。
在創(chuàng)建事件流面板,設(shè)置任務(wù)名稱和描述,配置以下參數(shù),然后單擊保存。
任務(wù)創(chuàng)建
在Source(源)配置向?qū)Вx擇數(shù)據(jù)提供方為消息隊列 RocketMQ 版,設(shè)置以下參數(shù),然后單擊下一步。
參數(shù)
說明
示例
地域
選擇消息隊列RocketMQ版源實例所在的地域。
華東1(杭州)
版本
選擇RocketMQ實例版本。
RocketMQ 4.x
RocketMQ 實例
選擇生產(chǎn)消息隊列RocketMQ版消息的源實例。
MQ_INST_115964845466****_ByBehioo
Topic
選擇生產(chǎn)消息隊列RocketMQ版消息的Topic。
topic
Tag
配置源實例中用于過濾消息的Tag。
test
Group ID
選擇源實例中的消費(fèi)組名稱。請使用獨(dú)立的消費(fèi)組來創(chuàng)建事件源,不要和已有的業(yè)務(wù)混用消費(fèi)組,以免影響已有的消息收發(fā)。
GID_http_1
消費(fèi)位點
選擇開始消費(fèi)消息的位點。
最新位點
數(shù)據(jù)格式
數(shù)據(jù)格式是針對支持二進(jìn)制傳遞的數(shù)據(jù)源端推出的指定內(nèi)容格式的編碼能力。針對消息路由場景,這里配置參數(shù)為Binary。
Binary
批量推送
批量推送可幫您批量聚合多個事件,當(dāng)批量推送條數(shù)和批量推送間隔(單位:秒)兩者條件達(dá)到其一時即會觸發(fā)批量推送。
例如:您設(shè)置的推送條數(shù)為100 條,間隔時間為15 s,在10 s內(nèi)消息條數(shù)已達(dá)到100條,那么該次推送則不會等15 s后再推送。
開啟
批量推送條數(shù)
調(diào)用函數(shù)發(fā)送的最大批量消息條數(shù),當(dāng)積壓的消息數(shù)量到達(dá)設(shè)定值時才會發(fā)送請求,取值范圍為 [1,10000]。
100
批量推送間隔(單位:秒)
調(diào)用函數(shù)的間隔時間,系統(tǒng)每到間隔時間點會將消息聚合后發(fā)給函數(shù)計算,取值范圍為[0,15],單位為秒。0秒表示無等待時間,直接投遞。
3
在Filtering(過濾)、Transform(轉(zhuǎn)換)配置向?qū)ВO(shè)置事件過濾、轉(zhuǎn)換規(guī)則,單擊下一步。事件轉(zhuǎn)換的配置說明,請參見使用函數(shù)計算實現(xiàn)消息數(shù)據(jù)清洗。
在Sink(目標(biāo))配置向?qū)Вx擇服務(wù)類型為消息隊列RocketMQ版,配置以下參數(shù),單擊保存。
參數(shù)
說明
示例
版本
選擇云消息隊列 RocketMQ 版實例版本。
RocketMQ 4.x
實例ID
選擇已創(chuàng)建的云消息隊列 RocketMQ 版實例。
test
Topic
選擇已創(chuàng)建的Topic。
test
消息體(body)
事件總線EventBridge通過二進(jìn)制提取獲取指定的事件中的數(shù)據(jù),Base64解碼后路由到事件目標(biāo)。
二進(jìn)制提取
$.data.body
自定義屬性(Properties)
選擇模板。您可以自定義一個模板,定義模板里需要的變量,事件總線EventBridge可以提取事件中的字段,按照模板定義的形式進(jìn)行轉(zhuǎn)換。
說明如果需要全量傳遞源端的RocketMQ消息的屬性,推薦使用示例中的配置。
變量:
{ "userProperties":"$.data.userProperties", "msgId":"$.data.systemProperties.UNIQ_KEY" }
模板:
{ "EB_SYS_EMBED_OBJECT":"${userProperties}", "UNIQ_KEY":"${msgId}" }
消息索引(Keys)
事件總線EventBridge通過JSONPath提取事件中的數(shù)據(jù),將指定的事件內(nèi)容路由到事件目標(biāo)。
部分事件
$.data.systemProperties.KEYS
消息標(biāo)簽(Tags)
事件總線EventBridge通過JSONPath提取事件中的數(shù)據(jù),將指定的事件內(nèi)容路由到事件目標(biāo)。
部分事件
$.data.systemProperties.TAGS
任務(wù)屬性
設(shè)置事件流的重試策略及死信隊列。更多信息,請參見重試和死信。
返回事件流頁面,找到創(chuàng)建好的事件流,在其右側(cè)操作欄,單擊啟用。
在提示對話框,閱讀提示信息,然后單擊確認(rèn)。
啟用事件流后,會有30秒~60秒的延遲時間,您可以在事件流頁面的狀態(tài)欄查看啟動進(jìn)度。
步驟二:測試驗證
在頂部菜單欄,選擇步驟一:在目標(biāo)端創(chuàng)建事件流中源實例所在的地域。
在左側(cè)導(dǎo)航欄,單擊實例列表。
在實例列表頁面,找到步驟一:在目標(biāo)端創(chuàng)建事件流中配置的源實例,在其操作列,單擊詳情。
在左側(cè)導(dǎo)航欄,單擊Topic 管理。
在Topic列表,單擊步驟一:在目標(biāo)端創(chuàng)建事件流中配置的源實例的Topic名稱。
在Topic詳情頁面,單擊右上角的快速體驗。
在快速體驗的消息生產(chǎn)和消費(fèi)面板,選擇發(fā)送方式為控制臺,然后配置消息內(nèi)容、消息 Key和消息 Tag,單擊確定。
消息發(fā)送成功后,界面會提示消息發(fā)送成功!,并顯示Message ID。
在源實例完成生產(chǎn)消息后,返回實例列表頁面。
在實例列表頁面,找到步驟一:在目標(biāo)端創(chuàng)建事件流中配置的目標(biāo)實例,在其操作列,單擊詳情。
在左側(cè)導(dǎo)航欄,單擊Topic 管理。
在Topic列表,單擊步驟一:在目標(biāo)端創(chuàng)建事件流中配置的目標(biāo)實例的Topic名稱。
在Topic詳情頁面,單擊消息查詢。
配置查詢方式和查詢范圍,單擊查詢。
查看查詢到的Message ID、Tag和Key值是否與生產(chǎn)的消息一致。