日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用函數計算作為Transform

本文介紹使用函數計算作為Transform時的背景信息、注意事項及操作步驟。

背景信息

當Transform選擇阿里云的函數計算時,您可以通過編寫函數代碼對事件進行更復雜、更加定制化的處理。整體流程如圖所示。

image
  1. 源端(Source)拉取事件后,先經過攢批,達到攢批的條件后,攢批的事件會進入Filter階段。

  2. Filter階段會對攢批的事件中的每條事件進行判斷,決定是否過濾該事件。經過攢批和Filter過濾的事件會進入Transform階段。

  3. Transform會對攢批和Filter過濾的事件進一步處理,以確保發送給函數的多條事件符合Payload限制。如果攢批和Filter過濾的事件總大小超過了Payload限制,則將其拆分成符合Payload限制的事件數量,依次調用函數進行處理。

  4. Transform調用完函數之后,會將函數返回的內容推送給目標端(Sink)。

注意事項

項目

說明

函數調用方式

僅支持同步方式請求函數計算,函數請求的Payload值限制為16 MB。

函數入參協議

  • 多條CloudEvents,CloudEvents的入參格式為[cloudEvents1,cloudEvents2]

  • 單條CloudEvents,CloudEvents的入參格式為[cloudEvents1]

關于CloudEvents事件格式的更多信息,請參見事件概述

函數執行超時時間

  • 如果您通過函數模板創建函數,則新建函數的默認超時時間為60 s。如果60 s不滿足業務需求,可在函數創建后跳轉至函數計算控制臺自定義修改,更多信息,請參見管理函數

  • 如果您選擇綁定自己創建的函數,則超時時間由您的函數決定。

函數返回值

函數可以返回任何值。但需注意以下事項:

  • 由于目前函數計算的限制,函數在使用不同Runtime時,函數代碼中返回的內容與函數“真正”的返回值(也就是事件流接收到的內容)存在不一致的現象,詳見下文返回值與實際值的對比。此時,發送給目標端的數據格式,以“真正”的返回值為準,也就是以事件流接收到的內容為準。

  • 返回值大小受目標端的限制。

  • 針對目標端為非函數計算的產品來說,例如輕量消息隊列(原 MNS)云消息隊列 RocketMQ 版云消息隊列 Kafka 版等產品。返回值中單條數據的大小,不能超過下游服務能接收的最大值。具體來說,如果是非數組格式的數據,則返回值不能超過下游服務能接收的最大值;如果是數組格式的數據,則數組中的每條數據都不能超過下游服務能接收的最大值。

  • 針對目標端為函數計算來說。返回值不能超過16 MB。

異常處理機制

Transform的異常處理策略與當前任務的異常處理策略保持一致,即“任務屬性”對Transform也生效。

返回值與實際值對比

由于目前函數計算的限制,函數使用不同Runtime時,函數代碼中返回的內容與函數“真正”的返回值即事件流接收到的內容存在不一致現象。以Python、Node、Go為例,差異如下。

Runtime

函數返回的內容

事件流接收的內容

Python 3.9

["test1", "test2"]

["test1", "test2"]

"[\"test1\", \"test2\"]"

["test1", "test2"]

"test"

test

"\"test\""

"test"

Node 14

["test1", "test2"]

["test1", "test2"]

"[\"test1\", \"test2\"]"

["test1", "test2"]

"test"

test

"\"test\""

"test"

Go 1.x

["test1", "test2"]

["test1", "test2"]

"[\"test1\", \"test2\"]"

"[\"test1\", \"test2\"]"

"test"

"test"

"\"test\""

"\"test\""

前提條件

步驟一:創建事件流

  1. 登錄事件總線EventBridge控制臺,在左側導航欄,單擊事件流
  2. 在頂部菜單欄,選擇地域,然后單擊創建事件流
  3. 創建事件流面板,設置任務名稱描述,配置以下參數。

    • 任務創建

      1. Source(源)配置向導,選擇數據提供方,本文以云消息隊列 Kafka 版為例,配置以下參數,然后單擊下一步

        參數

        說明

        示例

        地域

        選擇云消息隊列 Kafka 版源實例所在的地域。

        華北2(北京)

        kafka 實例

        選擇生產云消息隊列 Kafka 版消息的源實例。

        alikafka_post_115964845466****_ByBeUp3p

        Topic

        選擇生產云消息隊列 Kafka 版消息的Topic。

        topic

        Group ID

        選擇源實例的消費組名稱。請使用獨立的消費組來創建事件源,不要和已有的業務混用消費組,以免影響已有的消息收發。

        快速創建

        消費位點

        選擇開始消費消息的位點。

        最新位點

        網絡配置

        選擇路由消息的網絡類型。

        基礎網絡

        專有網絡VPC

        選擇VPC ID。當網絡配置設置為自建公網時需要設置此參數。

        vpc-bp17fapfdj0dwzjkd****

        交換機

        選擇vSwitch ID。當網絡配置設置為自建公網時需要設置此參數。

        vsw-bp1gbjhj53hdjdkg****

        安全組

        選擇安全組。當網絡配置設置為自建公網時需要設置此參數。

        alikafka_pre-cn-7mz2****

        批量推送條數(可選)

        批量推送可幫您批量聚合多個事件,當批量推送條數批量推送間隔(單位:秒)兩者條件達到其一時即會觸發批量推送。調用函數發送的最大批量消息條數,當積壓的消息數量到達設定值時才會發送請求,取值范圍為 [1,10000]。

        100

        批量推送間隔(單位:秒)(可選)

        調用函數的間隔時間,系統每到間隔時間點會將消息聚合后發給函數計算,取值范圍為[0,15],單位為秒。0秒表示無等待時間,直接投遞。

        3

      2. Filtering(過濾)配置向導,配置模式內容,默認值為{},表示不過濾任何事件。更多信息,請參見事件模式

        說明

        Filtering為可選步驟,可以點擊流程中右上角的刪除按鈕,刪除Filtering步驟;也可以點擊流程中右上角的+Filtering(過濾)添加Filtering步驟 ,Filtering步驟在鏈路中的位置不會改變。

      3. Transform(轉換) 配置向導,設置選擇阿里云服務函數計算,設置以下參數。本示例以新建函數模板為例,使用默認生成的服務、函數及函數模板。

        配置方式

        參數

        說明

        新建函數模板

        服務

        函數計算中的服務名。提供了系統生成的隨機值,可以根據需要編輯。當任務保存之后,會在函數計算中創建對應名字的服務。

        函數

        函數計算中的函數名。提供了系統生成的值,可以根據需要編輯。當任務保存之后,會在函數計算中創建對應名字的函數。

        函數模板

        默認提供四種函數模板:

        • 內容分割

        • 內容映射

        • 內容富化

        • 動態路由

        可以使用提供的簡易版函數代碼編輯框,對函數模板中的代碼進行編輯和調試,完成編輯之后,無需保存,當任務保存之后,函數中使用的代碼為編輯框中的代碼。

        綁定現有函數

        服務

        選擇函數計算中已創建的服務。

        函數

        選擇上一步選擇的服務中的函數。

        版本和別名

        選擇服務的版本或別名。

      4. Sink(目標)配置向導,選擇服務類型,配置相關參數。本示例選擇函數計算

        參數

        說明

        示例

        服務

        選擇已創建的函數計算的服務。

        test

        函數

        選擇已創建的函數計算的函數。

        test

        服務版本和別名

        選擇服務版本或服務別名。

        默認版本

        執行方式

        選擇同步執行或異步執行。

        異步

        事件

        選擇事件內容轉換類型。更多信息,請參考事件內容轉換

        完整事件

    • 任務屬性(可選)

      設置事件流的重試策略及死信隊列。更多信息,請參見重試和死信。本示例配置重試策略為退避重試、容錯策略為允許容錯,不使用死信隊列。

  4. 配置完成后,單擊保存

  5. 返回事件流頁面,找到創建好的事件流,在其右側操作欄,單擊啟用

    啟用事件流后,會有30秒~60秒的延遲時間,您可以在事件流頁面的狀態欄查看啟動進度。

步驟二:測試事件流

  1. 在事件流頁面,找到創建好的事件流,單擊事件源列的Kafka的Topic,跳轉至云消息隊列 Kafka 版控制臺,單擊右上角的體驗發送消息

  2. 快速體驗消息收發面板,設置發送方式消息key消息內容,單擊確定

  3. 查看Transform中使用的函數是否收到了請求。

    回到事件流頁面單擊操作列的詳情,單擊Transform(轉換)區域的函數名稱。

    • 單擊調用日志,查看是否有對應的日志。如果調用日志中的調用結果列顯示調用成功,則表示 Transform執行成功。

      說明

      如果沒有開通日志服務,請先開通日志服務,之后需要重新發送 Kafka 消息。

    • 點擊監控指標,查看是否有調用次數。如果有調用次數,且錯誤次數為0,則表示Sink成功收到了Transform后的內容。transform-cn

  4. 查看Sink中使用的函數是否收到了請求。

    回到事件流頁面單擊操作列的詳情,單擊事件目標列的函數名稱。

    • 單擊調用日志,查看是否有對應的日志。如果調用日志中的調用結果列顯示調用成功,則表示 Transform執行成功。

      說明

      如果沒有開通日志服務,請先開通日志服務,之后需要重新發送 Kafka 消息。

    • 點擊監控指標,查看是否有調用次數。如果有調用次數,且錯誤次數為0,則表示Sink成功收到了Transform后的內容。sink-cn

相關文檔

使用函數計算實現消息數據清洗