自建Apache RocketMQ觸發(fā)器
Apache RocketMQ作為事件源通過事件總線EventBridge與函數(shù)計(jì)算集成后,通過Apache RocketMQ觸發(fā)器能夠觸發(fā)關(guān)聯(lián)函數(shù)執(zhí)行,通過函數(shù)可以對(duì)發(fā)布到Apache RocketMQ的消息進(jìn)行自定義處理。本文介紹如何在函數(shù)計(jì)算控制臺(tái)創(chuàng)建Apache RocketMQ觸發(fā)器、配置入口參數(shù)以及編寫代碼并測試。
背景信息
您在函數(shù)計(jì)算的控制臺(tái)提交觸發(fā)器創(chuàng)建請(qǐng)求后,函數(shù)計(jì)算會(huì)根據(jù)觸發(fā)器的配置信息,自動(dòng)在事件總線EventBridge側(cè)創(chuàng)建事件流資源。
創(chuàng)建完成后,您可以在函數(shù)計(jì)算控制臺(tái)查看觸發(fā)器信息,同時(shí)也可以在事件總線EventBridge控制臺(tái)查看自動(dòng)創(chuàng)建的資源信息。當(dāng)Apache RocketMQ中有消息入隊(duì)時(shí),將會(huì)觸發(fā)函數(shù)計(jì)算執(zhí)行,觸發(fā)時(shí)會(huì)根據(jù)您的攢批配置將一個(gè)或多個(gè)消息事件以批的形式推送到函數(shù)中進(jìn)行處理。
前提條件
事件總線EventBridge
函數(shù)計(jì)算
Apache RocketMQ
自建Apache RocketMQ集群
創(chuàng)建Topic
創(chuàng)建ConsumerGroup
使用限制
作為觸發(fā)源的Apache RocketMQ必須支持公網(wǎng)可訪問或者阿里云VPC內(nèi)可訪問。
當(dāng)Apache RocketMQ支持阿里云VPC內(nèi)可訪問時(shí),VPC實(shí)例必須和函數(shù)計(jì)算的函數(shù)在相同的地域。
創(chuàng)建的事件流數(shù)量超過上限后,將無法再創(chuàng)建Apache RocketMQ觸發(fā)器。關(guān)于事件流數(shù)量的限制,請(qǐng)參見使用限制。
步驟一:創(chuàng)建Apache RocketMQ觸發(fā)器
- 登錄函數(shù)計(jì)算控制臺(tái),在左側(cè)導(dǎo)航欄,單擊服務(wù)及函數(shù)。
- 在頂部菜單欄,選擇地域,然后在服務(wù)列表頁面,單擊目標(biāo)服務(wù)。
- 在函數(shù)管理頁面,單擊目標(biāo)函數(shù)名稱。
在函數(shù)詳情頁面,單擊觸發(fā)器管理頁簽,從版本或別名下拉列表選擇要?jiǎng)?chuàng)建觸發(fā)器的版本或別名,然后單擊創(chuàng)建觸發(fā)器。
在創(chuàng)建觸發(fā)器面板,填寫相關(guān)信息,然后單擊確定。
配置項(xiàng)
操作
本文示例
觸發(fā)器類型
選擇自建 Apache RocketMQ。
自建 Apache RocketMQ
名稱
填寫自定義的觸發(fā)器名稱。
apache-rocketmq-trigger
版本或別名
默認(rèn)值為LATEST,如果您需要?jiǎng)?chuàng)建其他版本或別名的觸發(fā)器,需要在函數(shù)詳情頁的右上角切換到該版本或別名。關(guān)于版本和別名的介紹,請(qǐng)參見管理版本和管理別名。
LATEST
接入點(diǎn)
填寫集群NameServer地址。
192.168.X.X:9876
Topic
選擇已創(chuàng)建的Apache RocketMQ實(shí)例的Topic。
testTopic
Group ID
選擇已創(chuàng)建的Apache RocketMQ實(shí)例的Consumer Group ID。
testGroup
FilterType
選擇消息過濾類型。取值說明如下:
Tag:通過Tag標(biāo)簽進(jìn)行消息過濾。
SQL:通過SQL語句進(jìn)行消息過濾,可匹配消息的屬性標(biāo)識(shí)以及屬性值。
Tag
Filter
選擇FilterType后,需要配置對(duì)應(yīng)的過濾類型下的過濾語句。
TagA
認(rèn)證模式
選擇認(rèn)證模式,支持ACL模式。
ACL
用戶名
當(dāng)認(rèn)證模式選擇ACL后,需要配置Apache RocketMQ用戶名用于身份驗(yàn)證。
admin
密碼
當(dāng)認(rèn)證模式選擇ACL后,需要配置Apache RocketMQ密碼用于身份驗(yàn)證。
******
消費(fèi)位點(diǎn)
選擇消息的消費(fèi)位點(diǎn),即Apache RocketMQ從事件總線開始拉取消息的位置。取值說明如下。
最新位點(diǎn):從最新位點(diǎn)開始消費(fèi)。
最早位點(diǎn):從最早位點(diǎn)開始消費(fèi)。
指定時(shí)間戳:從指定時(shí)間戳開始消費(fèi)。
最新位點(diǎn)
網(wǎng)絡(luò)配置
選擇路由消息的網(wǎng)絡(luò)類型。取值說明如下。
公網(wǎng):通過公網(wǎng)訪問Apache RocketMQ集群。
專有網(wǎng)絡(luò):通過阿里云專有網(wǎng)絡(luò)訪問Apache RocketMQ集群。您需要選擇對(duì)應(yīng)的專有網(wǎng)絡(luò)VPC、交換機(jī)、安全組。
公網(wǎng)
調(diào)用方式
選擇函數(shù)調(diào)用方式。
取值說明如下:
同步調(diào)用:默認(rèn)調(diào)用方式。事件觸發(fā)函數(shù)執(zhí)行,等待函數(shù)調(diào)用完成后,函數(shù)計(jì)算返回執(zhí)行結(jié)果。更多信息,請(qǐng)參見同步調(diào)用。
異步調(diào)用:適用于調(diào)度延時(shí)較長的函數(shù),事件觸發(fā)函數(shù)執(zhí)行后,函數(shù)計(jì)算立即返回響應(yīng)結(jié)果并且確保函數(shù)至少被成功執(zhí)行一次,但不會(huì)返回具體執(zhí)行結(jié)果。更多信息,請(qǐng)參見異步調(diào)用。
同步調(diào)用
觸發(fā)器啟用狀態(tài)
創(chuàng)建觸發(fā)器后是否立即啟用。默認(rèn)勾選啟用觸發(fā)器,即創(chuàng)建觸發(fā)器后立即啟用觸發(fā)器。
啟用觸發(fā)器
關(guān)于推送配置、重試和死信等高級(jí)配置項(xiàng)說明,請(qǐng)參見觸發(fā)器高級(jí)功能。
創(chuàng)建完成后,在觸發(fā)器管理頁簽下會(huì)顯示已創(chuàng)建的觸發(fā)器。如果需要對(duì)創(chuàng)建的觸發(fā)器進(jìn)行修改或刪除,請(qǐng)參見觸發(fā)器管理。
步驟二:配置函數(shù)入口參數(shù)
自建Apache RocketMQ事件源會(huì)以event的形式作為輸入?yún)?shù)傳遞給函數(shù),您可以手動(dòng)將event傳給函數(shù)模擬觸發(fā)事件。
在函數(shù)詳情頁面,單擊函數(shù)代碼頁簽,然后單擊測試函數(shù)右側(cè)圖標(biāo),從下拉列表中,選擇配置測試參數(shù)。
在配置測試參數(shù)面板,選擇創(chuàng)建新測試事件或編輯已有測試事件頁簽,填寫事件名稱和事件內(nèi)容。然后單擊確定。
event格式如下所示。
[ { "msgId": "7F0000010BDD2A84AEE70DA49B57****", "topic": "testTopic", "systemProperties": { "UNIQ_KEY": "7F0000010BDD2A84AEE70DA49B57****", "CLUSTER": "DefaultCluster", "MIN_OFFSET": "0", "TAGS": "TagA", "MAX_OFFSET": "128" }, "userProperties": {}, "body": "Hello RocketMQ" } ]
event字段包含的參數(shù)解釋如下表所示。
參數(shù)
類型
示例值
描述
msgId
String
7F0000010BDD2A84AEE70DA49B57****
Apache RocketMQ消息 ID。
topic
String
testTopic
Topic名稱。
systemProperties
Map
系統(tǒng)屬性。
UNIQ_KEY
String
7F0000010BDD2A84AEE70DA49B57****
消息唯一鍵。
CLUSTER
String
DefaultCluster
Apache RocketMQ集群名稱。
MIN_OFFSET
Int
0
最低位點(diǎn)。
MAX_OFFSET
Int
128
最高位點(diǎn)。
TAGS
String
TagA
過濾屬性。
userProperties
Map
無
用戶屬性。
body
String
Hello RocketMQ
消息內(nèi)容。
步驟三:編寫函數(shù)代碼并測試
觸發(fā)器創(chuàng)建完成后,您可以開始編寫并測試函數(shù)代碼,以驗(yàn)證代碼的正確性。在實(shí)際操作過程中,當(dāng)Apache RocketMQ接收到消息后,觸發(fā)器會(huì)自動(dòng)觸發(fā)函數(shù)執(zhí)行。
在函數(shù)詳情頁面,單擊函數(shù)代碼頁簽,在代碼編輯器中編寫代碼,然后單擊部署代碼。
本文以Node.js函數(shù)代碼為例,示例代碼如下。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); //解析event參數(shù),對(duì)event進(jìn)行處理。 callback(null, 'return result'); }
單擊函數(shù)代碼頁簽的測試函數(shù)。
執(zhí)行完成后,您可以在函數(shù)代碼頁簽的上方查看執(zhí)行結(jié)果。
更多信息
除了函數(shù)計(jì)算控制臺(tái),您還可以通過以下方式配置觸發(fā)器:
通過Serverless Devs工具配置觸發(fā)器。更多操作,請(qǐng)參見創(chuàng)建觸發(fā)器。
通過SDK配置觸發(fā)器。更多操作,請(qǐng)參見SDK列表。
如需對(duì)創(chuàng)建的觸發(fā)器進(jìn)行修改或刪除,具體操作,請(qǐng)參見觸發(fā)器管理。