本文介紹如何通過創建Tablestore Sink Connector,將數據從云消息隊列 Kafka 版實例的數據源Topic導出至表格存儲(Tablestore)。
前提條件
云消息隊列 Kafka 版
已為實例開啟Connector。具體操作,請參見開啟Connector。
已為實例創建數據源Topic。更多信息,請參見步驟一:創建Topic。
表格存儲
已開通表格存儲服務。具體操作,請參見步驟一:開通表格存儲服務。
已創建表格存儲實例。具體操作,請參見步驟二:創建實例。
注意事項
操作流程
使用Tablestore Sink Connector將數據從云消息隊列 Kafka 版實例的數據源Topic導出至表格存儲操作流程如下:
可選:創建Tablestore Sink Connector依賴的Topic和Group
如果您不需要自定義Topic和Group,您可以直接跳過該步驟,在下一步驟選擇自動創建。
重要部分Tablestore Sink Connector依賴的Topic的存儲引擎必須為Local存儲,大版本為0.10.2的云消息隊列 Kafka 版實例不支持手動創建Local存儲的Topic,只支持自動創建。
結果驗證
創建Tablestore Sink Connector依賴的Topic
您可以在云消息隊列 Kafka 版控制臺手動創建Tablestore Sink Connector依賴的5個Topic,包括:任務位點Topic、任務配置Topic、任務狀態Topic、死信隊列Topic以及異常數據Topic。每個Topic所需要滿足的分區數與存儲引擎會有差異,具體信息,請參見配置源服務參數列表。
在概覽頁面的資源分布區域,選擇地域。
重要Topic需要在應用程序所在的地域(即所部署的ECS的所在地域)進行創建。Topic不能跨地域使用。例如Topic創建在華北2(北京)這個地域,那么消息生產端和消費端也必須運行在華北2(北京)的ECS。
在實例列表頁面,單擊目標實例名稱。
在左側導航欄,單擊Topic 管理。
在Topic 管理頁面,單擊創建 Topic。
在創建 Topic面板,設置Topic屬性,然后單擊確定。
參數
說明
示例
名稱
Topic名稱。
demo
描述
Topic的簡單描述。
demo test
分區數
Topic的分區數量。
12
存儲引擎
說明當前僅專業版實例支持選擇存儲引擎類型,標準版暫不支持,默認選擇為云存儲類型。
Topic消息的存儲引擎。
云消息隊列 Kafka 版支持以下兩種存儲引擎。
云存儲:底層接入阿里云云盤,具有低時延、高性能、持久性、高可靠等特點,采用分布式3副本機制。實例的規格類型為標準版(高寫版)時,存儲引擎只能為云存儲。
Local 存儲:使用原生Kafka的ISR復制算法,采用分布式3副本機制。
云存儲
消息類型
Topic消息的類型。
普通消息:默認情況下,保證相同Key的消息分布在同一個分區中,且分區內消息按照發送順序存儲。集群中出現機器宕機時,可能會造成消息亂序。當存儲引擎選擇云存儲時,默認選擇普通消息。
分區順序消息:默認情況下,保證相同Key的消息分布在同一個分區中,且分區內消息按照發送順序存儲。集群中出現機器宕機時,仍然保證分區內按照發送順序存儲。但是會出現部分分區發送消息失敗,等到分區恢復后即可恢復正常。當存儲引擎選擇Local 存儲時,默認選擇分區順序消息。
普通消息
日志清理策略
Topic日志的清理策略。
當存儲引擎選擇Local 存儲(當前僅專業版實例支持選擇存儲引擎類型為Local存儲,標準版暫不支持)時,需要配置日志清理策略。
云消息隊列 Kafka 版支持以下兩種日志清理策略。
Delete:默認的消息清理策略。在磁盤容量充足的情況下,保留在最長保留時間范圍內的消息;在磁盤容量不足時(一般磁盤使用率超過85%視為不足),將提前刪除舊消息,以保證服務可用性。
Compact:使用Kafka Log Compaction日志清理策略。Log Compaction清理策略保證相同Key的消息,最新的value值一定會被保留。主要適用于系統宕機后恢復狀態,系統重啟后重新加載緩存等場景。例如,在使用Kafka Connect或Confluent Schema Registry時,需要使用Kafka Compact Topic存儲系統狀態信息或配置信息。
重要Compact Topic一般只用在某些生態組件中,例如Kafka Connect或Confluent Schema Registry,其他情況的消息收發請勿為Topic設置該屬性。具體信息,請參見云消息隊列 Kafka 版Demo庫。
Compact
標簽
Topic的標簽。
demo
創建完成后,在Topic 管理頁面的列表中顯示已創建的Topic。
創建Tablestore Sink Connector依賴的Group
您可以在云消息隊列 Kafka 版控制臺手動創建Tablestore Sink Connector數據同步任務使用的Group。該Group的名稱必須為connect-任務名稱,具體信息,請參見配置源服務參數列表。
在概覽頁面的資源分布區域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在左側導航欄,單擊Group 管理。
在Group 管理頁面,單擊創建 Group。
在創建 Group面板的Group ID文本框輸入Group的名稱,在描述文本框簡要描述Group,并給Group添加標簽,單擊確定。
創建完成后,在Group 管理頁面的列表中顯示已創建的Group。
創建并部署Tablestore Sink Connector
創建并部署將數據從云消息隊列 Kafka 版同步至表格存儲的Tablestore Sink Connector。
在概覽頁面的資源分布區域,選擇地域。
在左側導航欄,單擊Connector 任務列表。
在Connector 任務列表頁面,從選擇實例的下拉列表選擇Connector所屬的實例,然后單擊創建 Connector。
在創建 Connector配置向導頁面,完成以下操作。
在配置基本信息頁簽,按需配置以下參數,然后單擊下一步。
參數
描述
示例值
名稱
Connector的名稱。命名規則:
可以包含數字、小寫英文字母和短劃線(-),但不能以短劃線(-)開頭,長度限制為48個字符。
同一個云消息隊列 Kafka 版實例內保持唯一。
Connector的數據同步任務必須使用名稱為connect-任務名稱的Group。如果您未手動創建該Group,系統將為您自動創建。
kafka-ts-sink
實例
默認配置為實例的名稱與實例ID。
demo alikafka_post-cn-st21p8vj****
在配置源服務頁簽,選擇數據源為消息隊列Kafka版,并配置以下參數,然后單擊下一步。
說明如果您已創建好Topic和Group,那么請選擇手動創建資源,并填寫已創建的資源信息。否則,請選擇自動創建資源。
表 1. 配置源服務參數列表
參數
描述
示例值
數據源 Topic
需要同步數據的Topic。
ts-test-input
消費線程并發數
數據源Topic的消費線程并發數。默認值為6。取值說明如下:
1
2
3
6
12
6
消費初始位置
開始消費的位置。取值說明如下:
最早位點:從最初位點開始消費。
最近位點:從最新位點開始消費。
最早位點
VPC ID
數據同步任務所在的VPC。單擊配置運行環境顯示該參數。默認為云消息隊列 Kafka 版實例所在的VPC,您無需填寫。
vpc-bp1xpdnd3l***
vSwitch ID
數據同步任務所在的交換機。單擊配置運行環境顯示該參數。該交換機必須與云消息隊列 Kafka 版實例處于同一VPC。默認為部署云消息隊列 Kafka 版實例時填寫的交換機。
vsw-bp1d2jgg81***
失敗處理
消息發送失敗后,是否繼續訂閱出現錯誤的Topic的分區。單擊配置運行環境顯示該參數。取值說明如下。
繼續訂閱:繼續訂閱出現錯誤的Topic的分區,并打印錯誤日志。
停止訂閱:停止訂閱出現錯誤的Topic的分區,并打印錯誤日志
說明如何查看日志,請參見Connector相關操作。
如何根據錯誤碼查找解決方案,請參見錯誤碼。
繼續訂閱
創建資源方式
選擇創建Connector所依賴的Topic與Group的方式。單擊配置運行環境顯示該參數。
自動創建
手動創建
自動創建
Connector 消費組
Connector的數據同步任務使用的Group。單擊配置運行環境顯示該參數。該Group的名稱必須為connect-任務名稱。
connect-cluster-kafka-ots-sink
任務位點 Topic
用于存儲消費位點的Topic。單擊配置運行環境顯示該參數。
Topic:建議以connect-offset開頭。
分區數:Topic的分區數量必須大于1。
存儲引擎:Topic的存儲引擎必須為Local存儲。
cleanup.policy:Topic的日志清理策略必須為compact。
connect-offset-kafka-ots-sink
任務配置 Topic
用于存儲任務配置的Topic。單擊配置運行環境顯示該參數。
Topic:建議以connect-config開頭。
分區數:Topic的分區數量必須為1。
存儲引擎:Topic的存儲引擎必須為Local存儲。
cleanup.policy:Topic的日志清理策略必須為compact。
connect-config-kafka-ots-sink
任務狀態 Topic
用于存儲任務狀態的Topic。單擊配置運行環境顯示該參數。
Topic:建議以connect-status開頭。
分區數:Topic的分區數量建議為6。
存儲引擎:Topic的存儲引擎必須為Local存儲。
cleanup.policy:Topic的日志清理策略必須為compact。
connect-status-kafka-ots-sink
死信隊列 Topic
用于存儲Connect框架的異常數據的Topic。單擊配置運行環境顯示該參數。該Topic可以和異常數據Topic為同一個Topic,以節省Topic資源。
Topic:建議以connect-error開頭。
分區數:Topic的分區數量建議為6。
存儲引擎:Topic的存儲引擎可以為Local存儲或云存儲。
connect-error-kafka-ots-sink
異常數據 Topic
用于存儲Sink的異常數據的Topic。單擊配置運行環境顯示該參數。該Topic可以和死信隊列Topic為同一個Topic,以節省Topic資源。
Topic:建議以connect-error開頭。
分區數:Topic的分區數量建議為6。
存儲引擎:Topic的存儲引擎可以為Local存儲或云存儲。
connect-error-kafka-ots-sink
在配置目標服務頁簽,選擇目標服務為表格存儲,并配置以下參數,然后單擊創建。
參數
描述
示例值
實例名稱
表格存儲的實例名稱。
k00eny67****
自動創建目標表
是否在表格存儲中自動創建表。
是:在表格存儲中根據配置的表名自動創建一個存儲同步數據的表。
否:使用已創建的表存儲同步數據。
是
目標表名
存儲同步數據的表名稱。如果自動創建目標表選擇否,表名稱需與表格存儲實例中已有表名稱相同。
kafka_table
表格存儲
存儲同步數據的表類型。
寬表模型
時序模型
寬表模型
消息 Key 格式
消息Key的輸入格式。支持String和JSON兩種格式,默認值為JSON。當表格存儲選擇寬表模型時顯示該參數。
String:直接將消息的Key作為字符串解析。
JSON:消息的Key必須符合JSON格式。
String
消息 Value 格式
消息值的輸入格式。支持String和JSON兩種格式。默認值為JSON。當表格存儲選擇寬表模型時顯示該參數。
String:直接將消息的Value作為字符串解析。
JSON:消息的Value必須符合JSON格式。
String
JSON消息字段轉化
JSON消息的字段處理方式。消息 Key 格式或消息 Value 格式選擇JSON時顯示該參數。取值范圍如下:
全部作為String寫入:將所有字段轉化為表格存儲中對應的String。
自動識別字段類型:將JSON消息體中的String和Boolean字段分別轉化為表格存儲中對應的String和Boolean字段。JSON消息體中的Integer和Float數據類型,將被轉化為表格存儲中的Double類型。
全部作為String寫入
主鍵模式
指定主鍵模式。支持從云消息隊列 Kafka 版消息記錄的不同部分提取數據表主鍵,包括消息記錄的Coordinates(Topic,Partition,Offset),Key和Value。當表格存儲選擇寬表模型時顯示該參數。默認值為kafka。
kafka:表示以<connect_topic>_<connect_partition>和 <connect_offset>作為數據表的主鍵。
record_key:表示以Record Key中的字段作為數據表的主鍵。
record_value:表示以 Record Value 中的字段作為數據表的主鍵。
kafka
主鍵列名配置
數據表的主鍵列名和對應的數據類型。支持string和Integer兩種數據類型,表示從Record Key或Record Value中提取與配置的主鍵列名相同的字段作為數據表的主鍵。
消息 Key 格式選擇JSON,且主鍵模式選擇record_key,或消息 Value 格式選擇JSON,且主鍵模式選擇record_value,顯示該參數。
單擊添加可以增加列名。最多支持配置四個列名。
無
寫入模式
指定寫入模式,支持put和update兩種寫入模式,默認值為put。當表格存儲選擇寬表模型時顯示該參數。
put:表示覆蓋寫。
update:表示更新寫。
put
刪除模式
當云消息隊列 Kafka 版消息記錄出現空值時,您可以選擇是否進行刪除行或刪除屬性列的操作。主鍵模式選擇record_key顯示該參數。取值范圍如下:
none:默認值,不允許進行任何刪除。
row:允許刪除行。
column:允許刪除屬性列。
row_and_column:允許刪除行和屬性列。
刪除操作與寫入模式的配置相關。具體如下:
如果寫入模式為put,選擇任意一種刪除模式,當Value中存在空值時,數據均覆蓋寫入表格存儲數據表。
如果寫入模式為update,選擇none或row刪除模式,當Value所有字段值均為空值時,數據作為臟數據處理。當Value部分字段值為空值時,自動忽略空值,將非空值寫入表格存儲數據表。選擇column或row_and_column刪除模式,當Value存在空值時,刪除行和屬性列后,將數據寫入表格存儲數據表。
無
度量名稱字段
將該字段映射為表格存儲時序模型中的度量名稱字段(_m_name),表示時序數據所度量的物理量或者監控指標的名稱,比如temperature、speed等,不能為空。當表格存儲選擇時序模型時顯示該參數。
measurement
數據源字段
將該字段映射為表格存儲時序模型中的數據源字段(_data_source),作為產生某個時間序列數據的數據源標識,比如機器名或者設備ID等,可以為空。當表格存儲選擇時序模型時顯示該參數。
source
標簽字段
將一個或多個字段作為表格存儲時序模型中的標簽字段(_tags)。每個標簽是一個字符串類型的Key和Value,Key為配置的字段名,Value為字段內容。標簽作為時間線元數據的一部分,度量名稱、數據源、標簽共同標識一條時間線,可以為空。當表格存儲選擇時序模型時顯示該參數。
tag1, tag2
時間戳字段
將該字段映射為表格存儲時序模型中的時間戳字段(_time)。表示該行時序數據所對應的時間點,比如產生物理量的時刻等。在數據寫入表格存儲時,會將時間戳字段轉換成微秒單位進行寫入和存儲。當表格存儲選擇時序模型時顯示該參數。
time
時間戳單位
視時間戳字段實際情況進行配置。當表格存儲選擇時序模型時顯示該參數。取值范圍如下:
SECONDS(秒)
MILLISECONDS(毫秒)
MICROSECONDS(微秒)
NANOSECONDS(納秒)
MILLISECONDS
是否映射全部非主鍵字段
是否將非主鍵字段(主鍵字段為已經映射為度量名稱、數據源、標簽或時間戳的字段)全部映射為數據字段。當表格存儲選擇時序模型時顯示該參數。取值范圍如下:
是:會自動映射字段并判斷數據類型,數值類型會全部轉換為Double類型。
否:需要指定需要映射的字段和類型。
是
配置映射全部非主鍵字段
時序表的非主鍵字段名稱對應的字段類型。支持Double、Integer、String、Binary和Boolean五種數據類型。當是否映射全部非主鍵字段選擇否時顯示該參數。
String
創建完成后,在Connector 任務列表頁面,查看創建的Connector 。
創建完成后,在Connector 任務列表頁面,找到創建的Connector ,單擊其操作列的部署。
單擊確認。
發送測試消息
部署Tablestore Sink Connector后,您可以向云消息隊列 Kafka 版的數據源Topic發送消息,測試數據能否被同步至表格存儲。
在Connector 任務列表頁面,找到目標Connector,在其右側操作列,單擊測試。
在發送消息面板,發送測試消息。
發送方式選擇控制臺。
在消息 Key文本框中輸入消息的Key值,例如demo。
在消息內容文本框輸入測試的消息內容,例如 {"key": "test"}。
設置發送到指定分區,選擇是否指定分區。
單擊是,在分區 ID文本框中輸入分區的ID,例如0。如果您需查詢分區的ID,請參見查看分區狀態。
單擊否,不指定分區。
發送方式選擇Docker,執行運行 Docker 容器生產示例消息區域的Docker命令,發送消息。
發送方式選擇SDK,根據您的業務需求,選擇需要的語言或者框架的SDK以及接入方式,通過SDK發送消息。
查看表數據
向云消息隊列 Kafka 版的數據源Topic發送消息后,在表格存儲控制臺查看表數據,驗證是否收到消息。
登錄表格存儲控制臺。
在概覽頁面,單擊實例名稱或在操作列單擊實例管理。
在實例詳情頁簽,數據表列表區域,查看對應的數據表。
單擊數據表名稱,在表管理頁面的數據管理頁簽,查看表數據。