本文介紹使用函數計算作為Transform時的背景信息、注意事項及操作步驟。
背景信息
當Transform選擇阿里云的函數計算時,您可以通過編寫函數代碼對事件進行更復雜、更加定制化的處理。整體流程如圖所示。
源端(Source)拉取事件后,先經過攢批,達到攢批的條件后,攢批的事件會進入Filter階段。
Filter階段會對攢批的事件中的每條事件進行判斷,決定是否過濾該事件。經過攢批和Filter過濾的事件會進入Transform階段。
Transform會對攢批和Filter過濾的事件進一步處理,以確保發送給函數的多條事件符合Payload限制。如果攢批和Filter過濾的事件總大小超過了Payload限制,則將其拆分成符合Payload限制的事件數量,依次調用函數進行處理。
Transform調用完函數之后,會將函數返回的內容推送給目標端(Sink)。
注意事項
項目 | 說明 |
函數調用方式 | 僅支持同步方式請求函數計算,函數請求的Payload值限制為16 MB。 |
函數入參協議 |
關于CloudEvents事件格式的更多信息,請參見事件概述。 |
函數執行超時時間 |
|
函數返回值 | 函數可以返回任何值。但需注意以下事項:
|
異常處理機制 | Transform的異常處理策略與當前任務的異常處理策略保持一致,即“任務屬性”對Transform也生效。 |
返回值與實際值對比
由于目前函數計算的限制,函數使用不同Runtime時,函數代碼中返回的內容與函數“真正”的返回值即事件流接收到的內容存在不一致現象。以Python、Node、Go為例,差異如下。
Runtime | 函數返回的內容 | 事件流接收的內容 |
Python 3.9 | ["test1", "test2"] | ["test1", "test2"] |
"[\"test1\", \"test2\"]" | ["test1", "test2"] | |
"test" | test | |
"\"test\"" | "test" | |
Node 14 | ["test1", "test2"] | ["test1", "test2"] |
"[\"test1\", \"test2\"]" | ["test1", "test2"] | |
"test" | test | |
"\"test\"" | "test" | |
Go 1.x | ["test1", "test2"] | ["test1", "test2"] |
"[\"test1\", \"test2\"]" | "[\"test1\", \"test2\"]" | |
"test" | "test" | |
"\"test\"" | "\"test\"" |
前提條件
步驟一:創建事件流
- 登錄事件總線EventBridge控制臺,在左側導航欄,單擊事件流。
- 在頂部菜單欄,選擇地域,然后單擊創建事件流。
在創建事件流面板,設置任務名稱和描述,配置以下參數。
任務創建
在Source(源)配置向導,選擇數據提供方,本文以云消息隊列 Kafka 版為例,配置以下參數,然后單擊下一步。
參數
說明
示例
地域
選擇云消息隊列 Kafka 版源實例所在的地域。
華北2(北京)
kafka 實例
選擇生產云消息隊列 Kafka 版消息的源實例。
alikafka_post_115964845466****_ByBeUp3p
Topic
選擇生產云消息隊列 Kafka 版消息的Topic。
topic
Group ID
選擇源實例的消費組名稱。請使用獨立的消費組來創建事件源,不要和已有的業務混用消費組,以免影響已有的消息收發。
快速創建
消費位點
選擇開始消費消息的位點。
最新位點
網絡配置
選擇路由消息的網絡類型。
基礎網絡
專有網絡VPC
選擇VPC ID。當網絡配置設置為自建公網時需要設置此參數。
vpc-bp17fapfdj0dwzjkd****
交換機
選擇vSwitch ID。當網絡配置設置為自建公網時需要設置此參數。
vsw-bp1gbjhj53hdjdkg****
安全組
選擇安全組。當網絡配置設置為自建公網時需要設置此參數。
alikafka_pre-cn-7mz2****
批量推送條數(可選)
批量推送可幫您批量聚合多個事件,當批量推送條數和批量推送間隔(單位:秒)兩者條件達到其一時即會觸發批量推送。調用函數發送的最大批量消息條數,當積壓的消息數量到達設定值時才會發送請求,取值范圍為 [1,10000]。
100
批量推送間隔(單位:秒)(可選)
調用函數的間隔時間,系統每到間隔時間點會將消息聚合后發給函數計算,取值范圍為[0,15],單位為秒。0秒表示無等待時間,直接投遞。
3
在Filtering(過濾)配置向導,配置模式內容,默認值為
{}
,表示不過濾任何事件。更多信息,請參見事件模式。說明Filtering為可選步驟,可以點擊流程中右上角的刪除按鈕,刪除Filtering步驟;也可以點擊流程中右上角的+Filtering(過濾)添加Filtering步驟 ,Filtering步驟在鏈路中的位置不會改變。
在 Transform(轉換) 配置向導,設置選擇阿里云服務為函數計算,設置以下參數。本示例以新建函數模板為例,使用默認生成的服務、函數及函數模板。
配置方式
參數
說明
新建函數模板
服務
函數計算中的服務名。提供了系統生成的隨機值,可以根據需要編輯。當任務保存之后,會在函數計算中創建對應名字的服務。
函數
函數計算中的函數名。提供了系統生成的值,可以根據需要編輯。當任務保存之后,會在函數計算中創建對應名字的函數。
函數模板
默認提供四種函數模板:
內容分割
內容映射
內容富化
動態路由
可以使用提供的簡易版函數代碼編輯框,對函數模板中的代碼進行編輯和調試,完成編輯之后,無需保存,當任務保存之后,函數中使用的代碼為編輯框中的代碼。
綁定現有函數
服務
選擇函數計算中已創建的服務。
函數
選擇上一步選擇的服務中的函數。
版本和別名
選擇服務的版本或別名。
在Sink(目標)配置向導,選擇服務類型,配置相關參數。本示例選擇函數計算。
參數
說明
示例
服務
選擇已創建的函數計算的服務。
test
函數
選擇已創建的函數計算的函數。
test
服務版本和別名
選擇服務版本或服務別名。
默認版本
執行方式
選擇同步執行或異步執行。
異步
事件
選擇事件內容轉換類型。更多信息,請參考事件內容轉換。
完整事件
任務屬性(可選)
設置事件流的重試策略及死信隊列。更多信息,請參見重試和死信。本示例配置重試策略為退避重試、容錯策略為允許容錯,不使用死信隊列。
配置完成后,單擊保存。
返回事件流頁面,找到創建好的事件流,在其右側操作欄,單擊啟用。
啟用事件流后,會有30秒~60秒的延遲時間,您可以在事件流頁面的狀態欄查看啟動進度。
步驟二:測試事件流
在事件流頁面,找到創建好的事件流,單擊事件源列的Kafka的Topic,跳轉至云消息隊列 Kafka 版控制臺,單擊右上角的體驗發送消息。
在快速體驗消息收發面板,設置發送方式、消息key、消息內容,單擊確定。
查看Transform中使用的函數是否收到了請求。
回到事件流頁面單擊操作列的詳情,單擊Transform(轉換)區域的函數名稱。
單擊調用日志,查看是否有對應的日志。如果調用日志中的調用結果列顯示調用成功,則表示 Transform執行成功。
說明如果沒有開通日志服務,請先開通日志服務,之后需要重新發送 Kafka 消息。
點擊監控指標,查看是否有調用次數。如果有調用次數,且錯誤次數為0,則表示Sink成功收到了Transform后的內容。
查看Sink中使用的函數是否收到了請求。
回到事件流頁面單擊操作列的詳情,單擊事件目標列的函數名稱。
單擊調用日志,查看是否有對應的日志。如果調用日志中的調用結果列顯示調用成功,則表示 Transform執行成功。
說明如果沒有開通日志服務,請先開通日志服務,之后需要重新發送 Kafka 消息。
點擊監控指標,查看是否有調用次數。如果有調用次數,且錯誤次數為0,則表示Sink成功收到了Transform后的內容。