本文以云消息隊列 Kafka 版作為事件源為例,介紹如何在事件總線EventBridge控制臺添加表格存儲(Tablestore)作為事件流中的事件目標以實現將云消息隊列 Kafka 版中的消息寫入到表格存儲(Tablestore)中進行數據存儲和管理。
前提條件
開通消息隊列 Kafka,并完成kafka實例的創建和部署。具體操作,請參見購買和部署實例。
注意事項
計費說明
Connector任務運行在阿里云函數計算平臺,任務加工傳輸消耗的計算資源將按函數計算的單價計費。函數計算計費信息,請參見計費概述和【產品變更】函數計算定向減免消息類產品和云工作流的函數調用次數費用。
工作流程
事件總線將云消息隊列 Kafka 版數據路由到表格存儲(Tablestore)的過程如下:
配置事件鏈路:用戶配置事件源為云消息隊列 Kafka 版,添加事件目標為表格存儲(Tablestore),建立起從Kafka到Tablestore的數據傳輸通道。
數據流動:事件總線從云消息隊列 Kafka 版中拉取數據,經過攢批、過濾、轉換之后將數據寫入表格存儲(Tablestore)。
數據存儲與管理:用戶可基于表格存儲(Tablestore)對寫入的數據進行存儲和管理。
這一過程不僅提高了數據處理的時效性,還確保了數據流轉的高效性和可靠性,助力用戶構建靈活、可擴展的事件驅動架構。
步驟一:創建目標服務資源
創建表格存儲實例和數據表。具體操作,請參見通過控制臺使用寬表模型。
步驟二:創建事件流
- 登錄事件總線EventBridge控制臺,在左側導航欄,單擊事件流。
- 在頂部菜單欄,選擇地域,然后單擊創建事件流。
在創建事件流頁面,設置任務名稱和描述,配置以下參數。
在Source(源)配置向導,選擇數據提供方為消息隊列Kafka版,并配置相關參數。
參數
說明
示例
地域
選擇云消息隊列Kafka版源實例所在的地域。
華北2(北京)
kafka 實例
選擇生產云消息隊列Kafka版消息的源實例。
alikafka_post-cn-jte3****
Topic
選擇生產云消息隊列Kafka版消息的 Topic。
topic
Group ID
數據源所在的云消息隊列 Kafka 版實例的Group ID。
快速創建:自動創建以GID_EVENTBRIDGE_xxx命名的Group ID。
使用已有:選擇已創建的Group,請選擇獨立的Group ID,不要和已有的業務混用,以免影響已有的消息收發。
使用已有
消費位點
選擇開始消費消息的位點。
最新位點:從最新位點開始消費。
最早位點:從最初位點開始消費。
最新位點
網絡配置
選擇路由消息的網絡類型。
基礎網絡:將默認打通實例間的網絡連接,僅支持非跨境場景的數據傳輸使用。
自建公網:若配置項涉及跨境傳輸則需自行配置VPC網絡,請選擇帶有公網NAT網關的VPC資源。
基礎網絡
專有網絡VPC
選擇 VPC ID。僅當網絡配置設置為自建公網時需要設置此參數。
vpc-bp17fapfdj0dwzjkd****
交換機
選擇 vSwitch ID。僅當網絡配置設置為自建公網時需要設置此參數。
vsw-bp1gbjhj53hdjdkg****
安全組
選擇安全組。僅當網絡配置設置為自建公網時需要設置此參數。
alikafka_pre-cn-7mz2****
數據格式
數據格式是針對支持二進制傳遞的數據源端推出的指定內容格式的編碼能力,支持多種數據格式編碼:
Json( Json 格式編碼,二進制數據按照 utf8 編碼為 Json 格式放入 Payload)
Text(默認文本格式編碼,二進制數據按照 utf8 編碼為字符串放入 Payload)
Binary(二進制格式編碼,二進制數據按照 Base64 編碼為字符串放入 Payload)
Json( Json 格式編碼,二進制數據按照 utf8 編碼為 Json 格式放入 Payload)
批量推送條數
調用函數發送的最大批量消息條數,當積壓的消息數量到達設定值時才會發送請求,取值范圍為[1,10000]。
100
批量推送間隔(單位:秒)
調用函數的間隔時間,系統每到間隔時間點會將消息聚合后發給函數計算,取值范圍為[0,15],單位為秒。0秒表示無等待時間,直接投遞。
3
在Filtering(過濾)配置向導,設置數據模式內容過濾發送的請求。更多信息,請參見消息過濾。
在Transform(轉換)配置向導,設置數據清洗,實現分割、映射、富化及動態路由等繁雜數據加工能力。更多信息,請參見事件內容轉換。
在Sink(目標)配置向導,選擇服務類型為表格存儲TableStore,配置相關參數,然后單擊保存。
參數
說明
示例
實例名稱
選擇待寫入數據的Tablestore實例。
ts_sink_test_1
目標表
選擇待寫入數據的Tablestore表。
table
主鍵
填寫Tablestore中主鍵列的主鍵值,僅支持配置JsonPath語法。
$.data.value.key
屬性列
填寫針對Tablestore中屬性列名稱,及其屬性類型、屬性值、存儲格式。
屬性列名稱:支持常量或JsonPath的方式,不支持兩者混合使用。如果選擇JsonPath的方式,則將JsonPath提取后的內容作為列名。
屬性類型:數值提取后的類型。
屬性值:僅支持JsonPath的方式。
存儲格式:支持默認和JSON兩種存儲格式。
默認:直接存儲提取到的數值,比如通過屬性值提取了test1字符串,則直接以字符串類型存儲test1。
JSON:如果通過屬性值提取到的是JSON內容,則會遍歷該JSON內容進行存儲。比如屬性列名稱配置為column,且屬性值提取到的是
{"jsonKey":"jsonValue"}
,則實際存儲的列名則是“column_jsonKey”
,存儲的屬性值是“jsonValue”
。
屬性列名稱:column
屬性類型:字符串
屬性值:
$.data.value.name
存儲格式:默認
寫入模式
選擇數據寫入的模式。
put:當兩條數據主鍵相同時,新數據會覆蓋老數據。
update:當兩條數據主鍵相同時,只會在此行中寫入增量列,不會刪除存量列。
put
網絡配置
選擇路由消息的網絡類型。
公網
專有網絡VPC
選擇 VPC ID。當網絡配置設置為自建公網時需要設置此參數。
vpc-bp17fapfdj0dwzjkd****
交換機
選擇 vSwitch ID。當網絡配置設置為自建公網時需要設置此參數。
vsw-bp1gbjhj53hdjdkg****
安全組
選擇安全組。當網絡配置設置為自建公網時需要設置此參數。
alikafka_pre-cn-7mz2****
任務屬性。
配置事件推送失敗時的重試策略及錯誤發生時的處理方式。更多信息,請參見重試和死信。
任務啟動。
啟用事件流后,會有30秒~60秒的延遲時間,您可以在事件流頁面的狀態欄查看是否啟動成功。
步驟三:測試Tablestore Sink Connector
在事件流頁面,在Tablestore Sink Connector任務的事件源列單擊源Topic名稱。
在Topic 詳情頁面,單擊體驗發送消息。
在快速體驗消息收發面板,按照下圖配置消息內容,然后單擊確定。
返回事件流頁面,在Table Sink Connector任務的事件目標列單擊目標Tablestore表名稱。
在表管理頁面,單擊數據管理頁簽,然后單擊查詢數據,設置查詢范圍,單擊確定。