本文以云消息隊列 Kafka 版作為事件源為例,介紹如何在事件總線EventBridge控制臺添加云原生大數據計算服務 MaxCompute作為事件流中的事件目標以實現將云消息隊列 Kafka 版中的消息寫入到云原生大數據計算服務 MaxCompute中進行數據計算和分析。
前提條件
計費說明
Connector任務運行在函數計算平臺,任務加工傳輸消耗的計算資源將按函數計算的單價計費。函數計算計費信息,請參見計費概述和【產品變更】函數計算定向減免消息類產品和云工作流的函數調用次數費用。
工作流程
事件總線將云消息隊列 Kafka 版路由到云原生大數據計算服務 MaxCompute的過程如下:
配置事件鏈路:用戶配置事件源為云消息隊列 Kafka 版,添加事件目標為云原生大數據計算服務 MaxCompute,建立起從Kafka到MaxCompute的數據傳輸通道。
數據流動:事件總線從云消息隊列 Kafka 版中拉取數據,經過攢批、過濾、轉換之后將數據寫入云原生大數據計算服務 MaxCompute。
數據計算和分析:用戶可基于云原生大數據計算服務 MaxCompute對寫入的數據進行計算和分析。
這一過程,不僅提高了數據處理的時效性,還確保了數據流轉的高效性和可靠性,助力用戶構建靈活、可擴展的事件驅動架構。
步驟一:創建事件目標服務資源
在云原生大數據計算服務 MaxCompute控制臺創建一個Project并在Project中創建表。
通過MaxCompute控制臺創建Project。具體操作,請參見創建MaxCompute項目。
通過MaxCompute控制臺創建表。具體操作,請參見創建表。
步驟二:創建事件流
- 登錄事件總線EventBridge控制臺,在左側導航欄,單擊事件流。
- 在頂部菜單欄,選擇地域,然后單擊創建事件流。
在創建事件流頁面,設置任務名稱和描述,配置以下參數,然后單擊保存。
任務創建
在Source(源)配置向導,選擇數據提供方為消息隊列Kafka版,并配置相關參數。
參數
說明
示例
地域
選擇云消息隊列Kafka版源實例所在的地域。
華北2(北京)
kafka 實例
選擇生產云消息隊列Kafka版消息的源實例。
alikafka_post-cn-jte3****
Topic
選擇生產云消息隊列Kafka版消息的 Topic。
topic
Group ID
數據源所在的云消息隊列 Kafka 版實例的Group ID。
快速創建:自動創建以GID_EVENTBRIDGE_xxx命名的Group ID。
使用已有:選擇已創建的Group,請選擇獨立的Group ID,不要和已有的業務混用,以免影響已有的消息收發。
使用已有
消費位點
選擇開始消費消息的位點。
最新位點:從最新位點開始消費。
最早位點:從最初位點開始消費。
最新位點
網絡配置
選擇路由消息的網絡類型。
基礎網絡:將默認打通實例間的網絡連接,僅支持非跨境場景的數據傳輸使用。
自建公網:若配置項涉及跨境傳輸則需自行配置VPC網絡,請選擇帶有公網NAT網關的VPC資源。
基礎網絡
專有網絡VPC
選擇 VPC ID。僅當網絡配置設置為自建公網時需要設置此參數。
vpc-bp17fapfdj0dwzjkd****
交換機
選擇 vSwitch ID。僅當網絡配置設置為自建公網時需要設置此參數。
vsw-bp1gbjhj53hdjdkg****
安全組
選擇安全組。僅當網絡配置設置為自建公網時需要設置此參數。
alikafka_pre-cn-7mz2****
數據格式
數據格式是針對支持二進制傳遞的數據源端推出的指定內容格式的編碼能力,支持多種數據格式編碼:
Json( Json 格式編碼,二進制數據按照 utf8 編碼為 Json 格式放入 Payload)
Text(默認文本格式編碼,二進制數據按照 utf8 編碼為字符串放入 Payload)
Binary(二進制格式編碼,二進制數據按照 Base64 編碼為字符串放入 Payload)
Json( Json 格式編碼,二進制數據按照 utf8 編碼為 Json 格式放入 Payload)
批量推送條數
調用函數發送的最大批量消息條數,當積壓的消息數量到達設定值時才會發送請求,取值范圍為[1,10000]。
100
批量推送間隔(單位:秒)
調用函數的間隔時間,系統每到間隔時間點會將消息聚合后發給函數計算,取值范圍為[0,15],單位為秒。0秒表示無等待時間,直接投遞。
3
在Filtering(過濾)配置向導,設置數據模式內容過濾發送的請求。更多信息,請參見消息過濾。
在Transform(轉換)配置向導,設置數據清洗,實現分割、映射、富化及動態路由等繁雜數據加工能力。更多信息,請參見事件內容轉換。
在Sink(目標)配置向導,選擇服務類型為大數據計算服務 Maxcompute,配置相關參數。
參數
說明
示例
賬號AccessKey ID
阿里云賬號的AccessKey ID,用于訪問MaxCompute服務。
LTAI5tHPVCZywsoEVvFu****
賬號AccessKey Secret
將數據寫入MaxCompute表時使用的 AccessKey Secret。
說明所填AccessKey ID和AccessKey Secret必須要有寫入MaxCompute表的權限。具體操作,請參見授權給其他用戶。
4RAKUQpZtUntDgvoKu0tvrkrOM****
MaxCompute項目名稱
選擇待寫入數據的MaxCompute項目。
mc_sink_project
MaxCompute表名稱
選擇待寫入數據的MaxCompute表。
table
MaxCompute表入參
選擇MaxCompute表后,此處會展示表的列名和類型信息,配置數據提取規則即可。
$.data.value.key
分區配置
關閉:不開啟分區能力。
開啟:開啟分區能力。
僅當分區配置為開啟時需配置此參數。
支持{yyyy}、{MM}、{dd}、{HH}、{mm}時間變量參數,分別對應年、月、日、時、分。時間變量大小寫敏感。
支持填寫常量。
開啟
{yyyy}-{MM}-{dd}.{HH}:{mm}.suffix
網絡配置
選擇路由消息的網絡類型。
專有網絡:通過專有網絡VPC將上游數據投遞到MaxCompute。
公網:通過公網將上游數據投遞到MaxCompute。
公網
專有網絡VPC
選擇 VPC ID。僅當網絡配置為專有網絡時需要配置此參數。
vpc-bp17fapfdj0dwzjkd****
交換機
選擇 vSwitch ID。僅當網絡配置為專有網絡時需要配置此參數。
vsw-bp1gbjhj53hdjdkg****
安全組
選擇安全組。僅當網絡配置為專有網絡時需要配置此參數。
alikafka_pre-cn-7mz2****
任務屬性
配置事件推送失敗時的重試策略及錯誤發生時的處理方式。更多信息,請參見重試和死信。
任務啟動
返回事件流頁面,找到創建好的事件流,在其右側操作欄,單擊啟用。
啟用事件流后,會有30秒~60秒的延遲時間,您可以在事件流頁面的狀態欄查看啟動進度。
步驟三:測試MaxCompute Sink Connector
在事件流頁面,在MaxCompute Sink Connector任務的事件源列單擊源Topic。
在Topic詳情頁面,單擊體驗發送消息。
在快速體驗消息收發面板,按照下圖配置消息內容,然后單擊確定。
在事件流頁面,在 MaxCompute Sink Connector 任務的事件目標列單擊MaxCompute項目。
在MaxCompute控制臺頂部菜單欄選擇地域,然后在左側導航欄選擇 ,輸入
SELECT * FROM <table>;
查詢語句并單擊運行查看結果。