消息隊列RocketMQ版作為事件源通過事件總線EventBridge與函數計算集成后,通過消息隊列RocketMQ版觸發器(以下簡稱RocketMQ觸發器)能夠觸發關聯函數執行,通過函數可以對發布到消息隊列RocketMQ版中的消息進行自定義處理。本文介紹如何在函數計算控制臺創建RocketMQ觸發器、配置函數入口參數和編寫代碼并測試。
功能簡介
您在函數計算的控制臺提交觸發器創建請求之后,函數計算會根據觸發器的配置信息,自動創建事件總線EventBridge相關的資源。目前提供事件模式和事件流模式兩種消息推送模式,每個模式創建的資源如下:
注意事項
- 作為觸發源的消息隊列RocketMQ版的實例必須和函數計算的函數在相同的地域。
- 創建的自定義總線以及事件規則的數量超過上限后,將無法再創建事件模式的RocketMQ觸發器。
- 創建的事件流數量超過上限后,將無法再創建事件流模式的RocketMQ觸發器。
在單個阿里云賬號單個地域維度下,關于創建觸發器涉及的資源數量的限制,請參見使用限制。
前提條件
- 事件總線EventBridge
- 函數計算
- 消息隊列RocketMQ版
步驟一:創建觸發器
- 登錄函數計算控制臺,在左側導航欄,單擊服務及函數。
- 在頂部菜單欄,選擇地域,然后在服務列表頁面,單擊目標服務。
- 在函數管理頁面,單擊目標函數名稱。
- 在函數詳情頁面,單擊觸發器管理頁簽,從版本或別名下拉列表選擇要創建觸發器的版本或別名,然后單擊創建觸發器。
- 在創建觸發器面板,填寫相關信息。然后單擊確定。基礎配置項說明如下所示。
配置項 操作 本文示例 觸發器類型 選擇消息隊列 RocketMQ 版。 消息隊列 RocketMQ 版 名稱 填寫自定義的觸發器名稱。 rocketmq-trigger 版本或別名 默認值為LATEST,如果您需要創建其他版本或別名的觸發器,首先需要在函數詳情頁的右上角切換到該版本或別名。關于版本和別名的簡介,請參見管理版本和管理別名。 LATEST RocketMQ 實例 選擇已創建的消息隊列RocketMQ版的實例。 MQ_INST_164901546557****_BX7**** Topic 選擇已創建的消息隊列RocketMQ版實例的Topic。 topic1 Tag 填寫消息過濾標簽。 只有收到包含此處設置的過濾標簽字符串的消息時,才會觸發函數執行。
tag Group ID 選擇已創建的消息隊列RocketMQ版實例的Group ID。 GID_group1 消費位點 選擇消息的消費位點,即消息隊列RocketMQ版從事件總線開始拉取消息的位置。取值說明如下。 - 最新位點:從最新位點開始消費。
- 最早位點:從最早位點開始消費。
- 指定時間戳:從指定時間戳開始消費。
最新位點 調用方式 選擇函數調用方式。 同步調用 消息推送模式 消息數據推送到函數計算時的底層應用模式。 取值說明如下。- 事件流模式:會根據您的攢批配置將一個或多個消息事件以批的形式推送到函數中進行處理,適合端到端的流式數據處理場景。
- 事件模式:每次會將單個消息作為事件參數傳入函數中,事件遵循CloudEvents規范。消息內容和CloudEvents的關系,請參見步驟二:配置函數入口參數。
事件模 觸發器啟用狀態 創建觸發器后是否立即啟用。默認勾選啟用觸發器,即創建觸發器后立即啟用觸發器。 不涉及 關于推送配置、重試和死信等高級配置項說明,請參見觸發器高級功能。
創建完成后,在觸發器名稱列表中顯示已創建的觸發器。如需對創建的觸發器進行修改或刪除,具體操作,請參見觸發器管理。
步驟二:配置函數入口參數
消息隊列RocketMQ版事件源會以event
的形式作為輸入參數傳遞給函數,您可以手動將event
傳給函數模擬觸發事件,測試函數代碼是否正確。
- 在函數詳情頁面,單擊函數代碼頁簽,然后單擊圖標,從下拉列表中,選擇配置測試參數。
- 在配置測試參數面板,選擇創建新測試事件或編輯已有測試事件頁簽,填寫事件名稱和事件內容。然后單擊確定。事件模式的
event
格式如下所示。{ "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****", "source":"RocketMQ-Function-rocketmq-trigger", "specversion":"1.0", "type":"mq:Topic:SendMessage", "datacontenttype":"application/json; charset=utf-8", "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName", "time":"2021-04-08T06:01:20.766Z", "aliyunaccountid":"164901546557****", "aliyunpublishtime":"2021-10-15T02:05:16.791Z", "aliyunoriginalaccountid":"164901546557****", "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger", "aliyunregionid":"cn-chengdu", "aliyunpublishaddr":"42.120.XX.XX", "data":{ "topic":"TopicName", "systemProperties":{ "MIN_OFFSET":"0", "TRACE_ON":"true", "MAX_OFFSET":"8", "MSG_REGION":"cn-hangzhou", "KEYS":"systemProperties.KEYS", "CONSUME_START_TIME":1628577790396, "TAGS":"systemProperties.TAGS", "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi" }, "userProperties":{ }, "body":"TEST" } }
事件流模式的event
格式如下所示。[ { "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****", "source":"RocketMQ-Function-rocketmq-trigger", "specversion":"1.0", "type":"mq:Topic:SendMessage", "datacontenttype":"application/json; charset=utf-8", "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName", "time":"2021-04-08T06:01:20.766Z", "aliyunaccountid":"164901546557****", "aliyunpublishtime":"2021-10-15T02:05:16.791Z", "aliyunoriginalaccountid":"164901546557****", "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger", "aliyunregionid":"cn-chengdu", "aliyunpublishaddr":"42.120.XX.XX", "data":{ "topic":"TopicName", "systemProperties":{ "MIN_OFFSET":"0", "TRACE_ON":"true", "MAX_OFFSET":"8", "MSG_REGION":"cn-hangzhou", "KEYS":"systemProperties.KEYS", "CONSUME_START_TIME":1628577790396, "TAGS":"systemProperties.TAGS", "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi" }, "userProperties":{ }, "body":"TEST" } }, { "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****", "source":"RocketMQ-Function-rocketmq-trigger", "specversion":"1.0", "type":"mq:Topic:SendMessage", "datacontenttype":"application/json; charset=utf-8", "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName", "time":"2021-04-08T06:01:20.766Z", "aliyunaccountid":"164901546557****", "aliyunpublishtime":"2021-10-15T02:05:16.791Z", "aliyunoriginalaccountid":"164901546557****", "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger", "aliyunregionid":"cn-chengdu", "aliyunpublishaddr":"42.120.XX.XX", "data":{ "topic":"TopicName", "systemProperties":{ "MIN_OFFSET":"0", "TRACE_ON":"true", "MAX_OFFSET":"8", "MSG_REGION":"cn-hangzhou", "KEYS":"systemProperties.KEYS", "CONSUME_START_TIME":1628577790396, "TAGS":"systemProperties.TAGS", "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi" }, "userProperties":{ }, "body":"TEST" } } ]
data字段包含的參數解釋如下表所示。關于CloudEvents規范中定義的參數解釋,請參見事件概述。參數 類型 示例值 描述 topic String TopicName Topic名稱。 systemProperties Map 系統屬性。 MIN_OFFSET Int 0 最低位點。 TRACE_ON Boolean true 是否有消息軌跡。取值說明如下: - true:有消息軌跡。
- false:無消息軌跡。
MAX_OFFSET Int 8 最高位點。 MSG_REGION String cn-hangzhou 發送消息的地域。 KEYS String systemProperties.KEYS 過濾屬性。 CONSUME_START_TIME Long 1628577790396 開始消費時間。單位:毫秒。 UNIQ_KEY String AC14C305069E1B28CDFA3181CDA2**** 消息唯一鍵。 TAGS String systemProperties.TAGS 過濾屬性。 INSTANCE_ID String MQ_INST_123456789098****_BXhFHryi 實例ID。 userProperties Map 無 用戶屬性。 body String TEST 消息內容。
步驟三:編寫函數代碼并測試
完成觸發器創建后,您可以開始編寫并測試函數代碼,以驗證代碼的正確性。在實際操作過程中,當消息隊列RocketMQ版事件通過事件總線EventBridge投遞到函數計算時,觸發器會自動觸發函數的執行。
- 在函數詳情頁面,單擊函數代碼頁簽,在代碼編輯器中編寫代碼,然后單擊部署代碼。本文以Node.js函數代碼為例。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); //解析event參數,對event進行處理。 callback(null, 'return result'); }
- 單擊函數代碼頁簽的測試函數。執行完成后,您可以在函數代碼頁簽的上方查看執行結果。
更多信息
如需對創建的觸發器進行修改或刪除,具體操作,請參見觸發器管理。