本文介紹如何通過創建AnalyticDB Sink Connector,將數據從云消息隊列 Kafka 版實例的數據源Topic通過函數計算服務導出至云原生數據倉庫 AnalyticDB MySQL 版或云原生數據倉庫AnalyticDB PostgreSQL版。
前提條件
在導出數據前,請確保您已完成以下操作:
云消息隊列 Kafka 版
為云消息隊列 Kafka 版實例開啟Connector。更多信息,請參見開啟Connector。
為云消息隊列 Kafka 版實例創建數據源Topic。更多信息,請參見步驟一:創建Topic。
函數計算
云原生數據倉庫 AnalyticDB MySQL 版和云原生數據倉庫AnalyticDB PostgreSQL版
云原生數據倉庫 AnalyticDB MySQL 版:在云原生數據倉庫 AnalyticDB MySQL 版控制臺創建集群、數據庫賬號,連接集群并創建數據庫。更多信息,請參見創建集群、創建數據庫賬號、連接集群和創建數據庫。
云原生數據倉庫AnalyticDB PostgreSQL版:在云原生數據倉庫AnalyticDB PostgreSQL版控制臺創建實例、數據庫賬號和登錄數據庫。更多信息,請參見創建實例、創建數據庫賬號和客戶端連接。
注意事項
僅支持在同地域內,將數據從云消息隊列 Kafka 版實例的數據源Topic導出至函數計算,再由函數計算導出至云原生數據倉庫 AnalyticDB MySQL 版或云原生數據倉庫AnalyticDB PostgreSQL版。關于Connector的限制說明,請參見使用限制。
該功能基于函數計算服務提供。函數計算為您提供了一定的免費額度,超額部分將產生費用,請以函數計算的計費規則為準。計費詳情,請參見計費概述。
函數計算的函數調用支持日志查詢,以便您迅速排查問題。具體操作步驟,請參見配置日志。
消息轉儲時,云消息隊列 Kafka 版中消息用UTF-8 String序列化,暫不支持二進制的數據格式。
如果AnalyticDB Sink Connector接入點是私網接入點,函數計算運行環境默認無法訪問,為確保網絡暢通,需在函數計算控制臺為函數服務配置與云原生數據倉庫一致的VPC和vSwitch信息。更多信息,請參見更新服務。
創建Connector時,云消息隊列 Kafka 版會為您自動創建服務關聯角色。
如果未創建服務關聯角色,云消息隊列 Kafka 版會為您自動創建一個服務關聯角色,以便您使用云消息隊列 Kafka 版導出數據至表格存儲的功能。
如果已創建服務關聯角色,云消息隊列 Kafka 版不會重復創建。
關于服務關聯角色的更多信息,請參見服務關聯角色。
操作流程
使用AnalyticDB Sink Connector將數據從云消息隊列 Kafka 版實例的數據源Topic導出至云原生數據倉庫操作流程如下:
可選:創建AnalyticDB Sink Connector依賴的Topic和Group
如果您不需要自定義Topic和Group,您可以直接跳過該步驟,在下一步驟選擇自動創建。
重要部分AnalyticDB Sink Connector依賴的Topic的存儲引擎必須為Local存儲,大版本為0.10.2的云消息隊列 Kafka 版實例不支持手動創建Local存儲的Topic,只支持自動創建。
服務配置
結果驗證
創建AnalyticDB Sink Connector依賴的Topic
您可以在云消息隊列 Kafka 版控制臺手動創建AnalyticDB Sink Connector依賴的5個Topic,包括:任務位點Topic、任務配置Topic、任務狀態Topic、死信隊列Topic以及異常數據Topic。每個Topic所需要滿足的分區數與存儲引擎會有差異,具體信息,請參見配置源服務參數列表。
在概覽頁面的資源分布區域,選擇地域。
重要Topic需要在應用程序所在的地域(即所部署的ECS的所在地域)進行創建。Topic不能跨地域使用。例如Topic創建在華北2(北京)這個地域,那么消息生產端和消費端也必須運行在華北2(北京)的ECS。
在實例列表頁面,單擊目標實例名稱。
在左側導航欄,單擊Topic 管理。
在Topic 管理頁面,單擊創建 Topic。
在創建 Topic面板,設置Topic屬性,然后單擊確定。
參數
說明
示例
名稱
Topic名稱。
demo
描述
Topic的簡單描述。
demo test
分區數
Topic的分區數量。
12
存儲引擎
說明當前僅專業版實例支持選擇存儲引擎類型,標準版暫不支持,默認選擇為云存儲類型。
Topic消息的存儲引擎。
云消息隊列 Kafka 版支持以下兩種存儲引擎。
云存儲:底層接入阿里云云盤,具有低時延、高性能、持久性、高可靠等特點,采用分布式3副本機制。實例的規格類型為標準版(高寫版)時,存儲引擎只能為云存儲。
Local 存儲:使用原生Kafka的ISR復制算法,采用分布式3副本機制。
云存儲
消息類型
Topic消息的類型。
普通消息:默認情況下,保證相同Key的消息分布在同一個分區中,且分區內消息按照發送順序存儲。集群中出現機器宕機時,可能會造成消息亂序。當存儲引擎選擇云存儲時,默認選擇普通消息。
分區順序消息:默認情況下,保證相同Key的消息分布在同一個分區中,且分區內消息按照發送順序存儲。集群中出現機器宕機時,仍然保證分區內按照發送順序存儲。但是會出現部分分區發送消息失敗,等到分區恢復后即可恢復正常。當存儲引擎選擇Local 存儲時,默認選擇分區順序消息。
普通消息
日志清理策略
Topic日志的清理策略。
當存儲引擎選擇Local 存儲(當前僅專業版實例支持選擇存儲引擎類型為Local存儲,標準版暫不支持)時,需要配置日志清理策略。
云消息隊列 Kafka 版支持以下兩種日志清理策略。
Delete:默認的消息清理策略。在磁盤容量充足的情況下,保留在最長保留時間范圍內的消息;在磁盤容量不足時(一般磁盤使用率超過85%視為不足),將提前刪除舊消息,以保證服務可用性。
Compact:使用Kafka Log Compaction日志清理策略。Log Compaction清理策略保證相同Key的消息,最新的value值一定會被保留。主要適用于系統宕機后恢復狀態,系統重啟后重新加載緩存等場景。例如,在使用Kafka Connect或Confluent Schema Registry時,需要使用Kafka Compact Topic存儲系統狀態信息或配置信息。
重要Compact Topic一般只用在某些生態組件中,例如Kafka Connect或Confluent Schema Registry,其他情況的消息收發請勿為Topic設置該屬性。具體信息,請參見云消息隊列 Kafka 版Demo庫。
Compact
標簽
Topic的標簽。
demo
創建完成后,在Topic 管理頁面的列表中顯示已創建的Topic。
創建AnalyticDB Sink Connector依賴的Group
您可以在云消息隊列 Kafka 版控制臺手動創建AnalyticDB Sink Connector數據同步任務使用的Group。該Group的名稱必須為connect-任務名稱,具體信息,請參見配置源服務參數列表。
在概覽頁面的資源分布區域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在左側導航欄,單擊Group 管理。
在Group 管理頁面,單擊創建 Group。
在創建 Group面板的Group ID文本框輸入Group的名稱,在描述文本框簡要描述Group,并給Group添加標簽,單擊確定。
創建完成后,在Group 管理頁面的列表中顯示已創建的Group。
創建并部署AnalyticDB Sink Connector
在概覽頁面的資源分布區域,選擇地域。
在左側導航欄,單擊Connector 任務列表。
在Connector 任務列表頁面,從選擇實例的下拉列表選擇Connector所屬的實例,然后單擊創建 Connector。
在創建 Connector配置向導頁面,完成以下操作。
在配置基本信息頁簽,按需配置以下參數,然后單擊下一步。
參數
描述
示例值
名稱
Connector的名稱。命名規則:
可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字符。
同一個云消息隊列 Kafka 版實例內保持唯一。
Connector的數據同步任務必須使用名稱為connect-任務名稱的Group。如果您未手動創建該Group,系統將為您自動創建。
kafka-adb-sink
實例
默認配置為實例的名稱與實例ID。
demo alikafka_post-cn-st21p8vj****
在配置源服務頁簽,選擇數據源為消息隊列Kafka版,并配置以下參數,然后單擊下一步。
表 1. 配置源服務參數列表
參數
描述
示例值
數據源 Topic
需要同步數據的Topic。
adb-test-input
消費線程并發數
數據源Topic的消費線程并發數。默認值為6。取值說明如下:
1
2
3
6
12
6
消費初始位置
開始消費的位置。取值說明如下:
最早位點:從最初位點開始消費。
最近位點:從最新位點開始消費。
最早位點
VPC ID
數據同步任務所在的VPC。單擊配置運行環境顯示該參數。默認為云消息隊列 Kafka 版實例所在的VPC,您無需填寫。
vpc-bp1xpdnd3l***
vSwitch ID
數據同步任務所在的交換機。單擊配置運行環境顯示該參數。該交換機必須與云消息隊列 Kafka 版實例處于同一VPC。默認為部署云消息隊列 Kafka 版實例時填寫的交換機。
vsw-bp1d2jgg81***
失敗處理
消息發送失敗后,是否繼續訂閱出現錯誤的Topic的分區。單擊配置運行環境顯示該參數。取值說明如下:
繼續訂閱:繼續訂閱出現錯誤的Topic的分區,并打印錯誤日志。
停止訂閱:停止訂閱出現錯誤的Topic的分區,并打印錯誤日志。
說明如何查看日志,請參見Connector相關操作。
如何根據錯誤碼查找解決方案,請參見錯誤碼。
繼續訂閱
創建資源方式
選擇創建Connector所依賴的Topic與Group的方式。單擊配置運行環境顯示該參數。
自動創建
手動創建
自動創建
Connector 消費組
Connector的數據同步任務使用的Group。單擊配置運行環境顯示該參數。該Group的名稱必須為connect-任務名稱。
connect-kafka-adb-sink
任務位點 Topic
用于存儲消費位點的Topic。單擊配置運行環境顯示該參數。
Topic:建議以connect-offset開頭。
分區數:Topic的分區數量必須大于1。
存儲引擎:Topic的存儲引擎必須為Local存儲。
cleanup.policy:Topic的日志清理策略必須為compact。
connect-offset-kafka-adb-sink
任務配置 Topic
用于存儲任務配置的Topic。單擊配置運行環境顯示該參數。
Topic:建議以connect-config開頭。
分區數:Topic的分區數量必須為1。
存儲引擎:Topic的存儲引擎必須為Local存儲。
cleanup.policy:Topic的日志清理策略必須為compact。
connect-config-kafka-adb-sink
任務狀態 Topic
用于存儲任務狀態的Topic。單擊配置運行環境顯示該參數。
Topic:建議以connect-status開頭。
分區數:Topic的分區數量建議為6。
存儲引擎:Topic的存儲引擎必須為Local存儲。
cleanup.policy:Topic的日志清理策略必須為compact。
connect-status-kafka-adb-sink
死信隊列 Topic
用于存儲Connect框架的異常數據的Topic。單擊配置運行環境顯示該參數。該Topic可以和異常數據Topic為同一個Topic,以節省Topic資源。
Topic:建議以connect-error開頭。
分區數:Topic的分區數量建議為6。
存儲引擎:Topic的存儲引擎可以為Local存儲或云存儲。
connect-error-kafka-adb-sink
異常數據 Topic
用于存儲Sink的異常數據的Topic。單擊配置運行環境顯示該參數。該Topic可以和死信隊列Topic為同一個Topic,以節省Topic資源。
Topic:建議以connect-error開頭。
分區數:Topic的分區數量建議為6。
存儲引擎:Topic的存儲引擎可以為Local存儲或云存儲。
connect-error-kafka-adb-sink
在配置目標服務頁簽,選擇目標服務為云原生數據倉庫,并配置以下參數,然后單擊創建。
參數
描述
示例值
實例類型
云原生數據倉庫實例類型。支持MySQL版和PostgreSQL版。
MySQL版
AnalyticDB 實例 ID
阿里云云原生數據倉庫 AnalyticDB MySQL 版或云原生數據倉庫AnalyticDB PostgreSQL版實例ID。
am-bp139yqk8u1ik****
數據庫名
阿里云云原生數據倉庫實例的數據庫名稱。
adb_demo
表名
云原生數據倉庫中存儲消息表名稱。
user
數據庫用戶名
連接云原生數據倉庫實例數據庫導入數據的數據庫用戶名。
adbmysql
數據庫密碼
連接云原生數據倉庫實例數據庫導入數據的數據庫密碼。用戶的密碼在創建實例時設定,如果忘記可重置。
云原生數據倉庫 AnalyticDB MySQL 版:關于重置密碼的具體操作,請參見重置高權限賬號密碼。
云原生數據倉庫AnalyticDB PostgreSQL版:登錄云原生數據倉庫AnalyticDB PostgreSQL版控制臺,單擊實例名稱,在左側導航欄單擊賬號管理,找到需要重置密碼的賬號,在操作列單擊重置密碼。
********
說明用戶名和用戶密碼是云消息隊列 Kafka 版創建任務時作為環境變量傳遞至函數計算的函數,任務創建成功后,云消息隊列 Kafka 版不保存相關信息。
創建完成后,在Connector 任務列表頁面,查看創建的Connector 。
創建完成后,在Connector 任務列表頁面,找到創建的Connector ,單擊其操作列的部署。
配置函數服務
您在云消息隊列 Kafka 版控制臺成功創建并部署AnalyticDB Sink Connector后,函數計算會自動為您創建給該Connector使用的函數服務和函數,服務命名格式為kafka-service-<connector_name>-<隨機String>,函數命名格式為fc-adb-<隨機String>。
配置云原生數據庫
您在配置完函數計算服務后,需要在云原生數據倉庫控制臺將函數計算服務所屬的網段加入白名單。所屬網段可以在專有網絡管理控制臺的交換機頁面,函數計算服務對應的VPC和交換機所在行查看。
云原生數據倉庫 AnalyticDB MySQL 版:登錄云原生數據倉庫 AnalyticDB MySQL 版控制臺,配置白名單。具體操作,請參見設置白名單。
云原生數據倉庫AnalyticDB PostgreSQL版:登錄云原生數據倉庫AnalyticDB PostgreSQL版控制臺,配置白名單。具體操作,請參見設置白名單。
發送測試消息
您可以向云消息隊列 Kafka 版的數據源Topic發送消息,測試數據能否被導出至阿里云云原生數據倉庫AnalyticDB MySQL版或云原生數據倉庫AnalyticDB PostgreSQL版。
消息內容(Value)格式需為JSON格式,Value將通過JSON解析為K-V形式,其中K對應云原生數據倉庫的數據庫中的字段名,V對應該字段插入的數據,因此云消息隊列 Kafka 版發送的消息內容中的每個K在數據庫中需要有對應的相同名稱的字段名。字段名可以在云原生數據倉庫 AnalyticDB MySQL 版控制臺或云原生數據倉庫AnalyticDB PostgreSQL版控制臺連接數據庫,在數據庫表中查看。
在Connector 任務列表頁面,找到目標Connector,在其右側操作列,單擊測試。
在發送消息面板,發送測試消息。
發送方式選擇控制臺。
在消息 Key文本框中輸入消息的Key值,例如demo。
在消息內容文本框輸入測試的消息內容,例如 {"key": "test"}。
設置發送到指定分區,選擇是否指定分區。
單擊是,在分區 ID文本框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態。
單擊否,不指定分區。
發送方式選擇Docker,執行運行 Docker 容器生產示例消息區域的Docker命令,發送消息。
發送方式選擇SDK,根據您的業務需求,選擇需要的語言或者框架的SDK以及接入方式,通過SDK發送消息。
驗證結果
向云消息隊列 Kafka 版的數據源Topic發送消息后,登錄云原生數據倉庫 AnalyticDB MySQL 版控制臺或云原生數據倉庫AnalyticDB PostgreSQL版控制臺,連接數據庫,進入數據管理DMS 5.0的SQL窗口界面,找到對應實例的表,驗證數據導出結果。
云消息隊列 Kafka 版數據導出至云原生數據倉庫 AnalyticDB MySQL 版示例如下: