消息隊列 Kafka 版作為事件源通過事件總線EventBridge與函數計算集成后,通過消息隊列 Kafka 版觸發器(以下簡稱Kafka觸發器)能夠觸發關聯函數執行,通過函數可以對發布到消息隊列 Kafka 版的消息進行自定義處理。本文介紹如何在函數計算控制臺創建Kafka觸發器、配置入口參數以及編寫代碼并測試代碼。
功能簡介
您在函數計算的控制臺提交觸發器創建請求之后,函數計算會根據觸發器的配置信息,自動在事件總線EventBridge側創建事件流資源。
創建完成后,您可以在函數計算控制臺查看觸發器信息,同時也可以在事件總線EventBridge控制臺查看自動創建的資源信息。當源消息隊列 Kafka 版中有消息入隊時,將會觸發函數計算執行,觸發時會根據您的攢批配置將一個或多個消息事件以批的形式推送到函數中進行處理。
注意事項
作為觸發源的消息隊列 Kafka 版實例必須和函數計算的函數在相同的地域。
創建的事件流數量超過上限后,將無法再創建Kafka觸發器。關于事件流數量的限制,請參見使用限制。
前提條件
事件總線EventBridge
函數計算
消息隊列 Kafka 版
步驟一:創建Kafka觸發器
當您已經創建好Kafka的實例,您需要登錄函數計算控制臺,進入目標函數,選擇配置頁簽,創建觸發器,創建完成點擊確定如下圖所示。
基礎配置項說明如下所示。
配置項 | 操作 | 本文示例 |
消費位點 | 選擇消息的消費位點,即消息隊列 Kafka 版從事件總線開始拉取消息的位置。 取值說明如下。
| 最早位點 |
調用方式 | 選擇函數調用方式。 | 同步調用 |
投遞并發最大值 | Kafka消息投遞到函數計算的并發最大值,取值范圍為1~300。該參數僅對同步調用生效。如果需要更高的并發,請進入EventBridge配額中心申請配額名稱為EventStreaming FC Sink 同步投遞最大并發數的配額。 | 1 |
關于推送配置、重試和死信等高級配置項說明,請參見觸發器高級功能。
步驟二:(可選)配置函數入口參數
消息隊列 Kafka 版事件源會以event
的形式作為輸入參數傳遞給函數,您可以使用代碼解析event參數,并對event進行處理。您可以手動將event
傳給函數模擬觸發事件。
在函數詳情頁面的代碼頁簽,單擊測試函數右側的圖標,從下拉列表中,選擇配置測試參數。
在配置測試參數面板,選擇創建新測試事件或編輯已有測試事件,填寫事件名稱和事件內容,然后單擊確定。
event
格式如下所示:[ { "specversion":"1.0", "id":"8e215af8-ca18-4249-8645-f96c1026****", "source":"acs:alikafka", "type":"alikafka:Topic:Message", "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic", "datacontenttype":"application/json; charset=utf-8", "time":"2022-06-23T02:49:51.589Z", "aliyunaccountid":"164901546557****", "data":{ "topic":"****", "partition":7, "offset":25, "timestamp":1655952591589, "headers":{ "headers":[ ], "isReadOnly":false }, "key":"keytest", "value":"hello kafka msg" } }, { "specversion":"1.0", "id":"8e215af8-ca18-4249-8645-f96c1026****", "source":"acs:alikafka", "type":"alikafka:Topic:Message", "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic", "datacontenttype":"application/json; charset=utf-8", "time":"2022-06-23T02:49:51.589Z", "aliyunaccountid":"164901546557****", "data":{ "topic":"****", "partition":7, "offset":25, "timestamp":1655952591589, "headers":{ "headers":[ ], "isReadOnly":false }, "key":"keytest", "value":"hello kafka msg" } } ]
CloudEvents規范中定義的參數解釋,請參見事件概述。
data字段包含的參數解釋如下表所示。
參數
類型
示例值
描述
topic
String
TopicName
Topic的名稱。
partition
Int
1
云消息隊列 Kafka 版的消費分區信息。
offset
Int
0
云消息隊列 Kafka 版的消息位點。
timestamp
String
1655952591589
開始消費時間戳。
步驟三:編寫函數代碼并測試
觸發器創建完成后,您可以開始編寫并測試函數代碼,以驗證代碼的正確性。在實際操作過程中,當消息隊列 Kafka 版事件發生時,觸發器會自動觸發函數的執行。
在函數詳情頁面的代碼頁簽,在代碼編輯器中編寫代碼,然后單擊部署代碼。
本文以Node.js函數代碼為例。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); // 解析event參數,對event進行處理。 callback(null, 'return result'); }
測試函數。
方式一:如果您是配置函數入口參數
event
模擬事件源,單擊測試函數。方式二:登錄云消息隊列 Kafka 版控制臺,選擇您創建的目標Topic,點擊發送消息,如下圖。
執行完成后,在實時日志查看結果。
更多信息
如需對創建的觸發器進行修改或刪除,具體操作,請參見觸發器管理。