云消息隊列 Kafka 版觸發器(以下簡稱Kafka觸發器)是通過事件總線EventBridge將云消息隊列 Kafka 版作為事件源與函數計算進行集成。創建完成后,您可以在函數計算控制臺和事件總線EventBridge控制臺查看創建的信息。當有消息入隊時,事件總線EventBridge會根據您的攢批配置將一個或多個消息事件以批的形式推送到函數中進行處理。
注意事項
作為觸發源的云消息隊列 Kafka 版實例必須和函數計算的函數在相同的地域。
創建的事件流數量超過上限后,將無法再創建Kafka觸發器。關于事件流數量的限制,請參見使用限制。
前提條件
事件總線EventBridge
函數計算
云消息隊列 Kafka 版
步驟一:創建Kafka觸發器
- 登錄函數計算控制臺,在左側導航欄,單擊服務及函數。
- 在頂部菜單欄,選擇地域,然后在服務列表頁面,單擊目標服務。
在函數管理頁面,單擊目標函數名稱。
在函數詳情頁面,單擊觸發器管理頁簽,從版本或別名下拉列表選擇要創建觸發器的版本或別名,然后單擊創建觸發器。
在創建觸發器面板,填寫相關信息。然后單擊確定。
基礎配置項說明如下所示。
配置項
操作
本文示例
觸發器類型
選擇消息隊列 Kafka 版。
消息隊列 Kafka 版
名稱
填寫自定義的觸發器名稱。
kafka-trigger
版本或別名
默認值為LATEST,如果您需要創建其他版本或別名的觸發器,首先需要在函數詳情頁的右上角切換到該版本或別名。關于版本和別名的簡介,請參見管理版本和管理別名。
LATEST
Kafka 實例
選擇已創建的云消息隊列 Kafka 版實例。
alikafka_pre-cn-i7m2t7t1****
Topic
選擇已創建的云消息隊列 Kafka 版實例的Topic。
topic1
Group ID
選擇已創建的云消息隊列 Kafka 版實例的Group ID。
說明請使用獨立的Group ID來創建觸發器,不要與已有的業務混用Group ID,否則會影響已有的消息收發。
GID_group1
消費任務并發數
消費者的并發數量,取值范圍為[1,Topic的分區數]。
2
消費位點
選擇消息的消費位點,即云消息隊列 Kafka 版從事件總線開始拉取消息的位置。
取值說明如下。
最早位點:從最早位點開始消費。
最新位點:從最新位點開始消費。
最新位點
調用方式
選擇函數調用方式。
同步調用
投遞并發最大值
Kafka消息投遞到函數計算的并發最大值,取值范圍為1~300。該參數僅對同步調用生效。如果需要更高的并發,請進入EventBridge配額中心申請配額名稱為EventStreaming FC Sink 同步投遞最大并發數的配額。
1
觸發器啟用狀態
創建觸發器后是否立即啟用。默認勾選啟用觸發器,即創建觸發器后立即啟用觸發器。
不涉及
關于推送配置、重試和死信等高級配置項說明,請參見觸發器高級功能。
創建完成后,在觸發器名稱列表中顯示已創建的觸發器。如需對創建的觸發器進行修改或刪除,具體操作,請參見觸發器管理。
步驟二:配置函數入口參數
云消息隊列 Kafka 版事件源會以event
的形式作為輸入參數傳遞給函數,您可以手動將event
傳給函數模擬觸發事件。
在函數詳情頁面,單擊函數代碼頁簽,然后單擊測試函數右側圖標,從下拉列表中,選擇配置測試參數。
在配置測試參數面板,選擇創建新測試事件或編輯已有測試事件頁簽,填寫事件名稱和事件內容。然后單擊確定。
event
格式如下所示:[ { "specversion":"1.0", "id":"8e215af8-ca18-4249-8645-f96c1026****", "source":"acs:alikafka", "type":"alikafka:Topic:Message", "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic", "datacontenttype":"application/json; charset=utf-8", "time":"2022-06-23T02:49:51.589Z", "aliyunaccountid":"164901546557****", "data":{ "topic":"****", "partition":7, "offset":25, "timestamp":1655952591589, "headers":{ "headers":[ ], "isReadOnly":false }, "key":"keytest", "value":"hello kafka msg" } }, { "specversion":"1.0", "id":"8e215af8-ca18-4249-8645-f96c1026****", "source":"acs:alikafka", "type":"alikafka:Topic:Message", "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic", "datacontenttype":"application/json; charset=utf-8", "time":"2022-06-23T02:49:51.589Z", "aliyunaccountid":"164901546557****", "data":{ "topic":"****", "partition":7, "offset":25, "timestamp":1655952591589, "headers":{ "headers":[ ], "isReadOnly":false }, "key":"keytest", "value":"hello kafka msg" } } ]
CloudEvents規范中定義的參數解釋,請參見事件概述。
data字段包含的參數解釋如下表所示。
參數
類型
示例值
描述
topic
String
TopicName
Topic的名稱。
partition
Int
1
云消息隊列 Kafka 版的消費分區信息。
offset
Int
0
云消息隊列 Kafka 版的消息位點。
timestamp
String
1655952591589
開始消費時間戳。
步驟三:編寫函數代碼并測試
觸發器創建完成后,您可以開始編寫并測試函數代碼,以驗證代碼的正確性。在實際操作過程中,當云消息隊列 Kafka 版事件發生時,觸發器會自動觸發函數的執行。
在函數詳情頁面,單擊函數代碼頁簽,在代碼編輯器中編寫代碼,然后單擊部署代碼。
本文以Node.js函數代碼為例。
'use strict'; /* To enable the initializer feature please implement the initializer function as below: exports.initializer = (context, callback) => { console.log('initializing'); callback(null, ''); }; */ exports.handler = (event, context, callback) => { console.log("event: %s", event); //解析event參數,對event進行處理。 callback(null, 'return result'); }
單擊函數代碼頁簽的測試函數。
執行完成后,您可以在函數代碼頁簽的上方查看執行結果。
常見問題
事件總線EventBridge會根據您的攢批配置將消息推送給函數處理,不能通過函數調用次數判斷消息是否全部被消費。您可以在函數中打印日志來確認消費的消息情況。更多關于攢批配置的信息請參見推送配置。
暫時不支持。函數計算不會主動監測自建的Kafka消息隊列是否有新的消息,您可以使用HTTP函數,將消息附加到HTTP請求中主動觸發函數。