當您需要將消息隊列Kafka的數據實時同步至云數據庫ClickHouse中時,本文檔為您提供了詳細的解決方案,以滿足您的數據實時處理需求。
云數據庫ClickHouse集群從Kafka進行數據同步目前僅支持云消息隊列Kafka和部署在ECS上的自建Kafka。本文以云消息隊列Kafka進行數據同步為例。
前提條件
操作步驟
在頁面左上角,選擇目標集群所在的地域。
在集群列表頁面,選擇目標集群對應類型的實例列表,單擊目標集群ID。
在集群信息頁面,單擊右上方導航欄的登錄數據庫。
在DMS數據管理服務控制臺的登錄實例頁面,輸入數據庫賬號和密碼,單擊登錄。
創建Kafka消費表。
說明創建Kafka消費表是為了將Kafka消費表的數據同步到云數據庫ClickHouse的表中。
Kafka消費表不能直接使用。
Kafka消費表只是用來消費Kafka數據,沒有真正地存儲數據,最終消費數據需要實現消息批處理后進行消費。
建表語法如下。
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'host:port1,host:port2,host:port3', kafka_topic_list = 'topic_name1,topic_name2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'[,] [kafka_row_delimiter = 'delimiter_symbol',] [kafka_num_consumers = N,] [kafka_thread_per_consume = 1,] [kafka_max_block_size = 0,] [kafka_skip_broken_messages = N,] [kafka_commit_every_batch = 0,] [kafka_auto_offset_reset = N]
常用參數說明如下。
名稱
是否必選
說明
kafka_broker_list
是
以英文逗號(,)分隔的Kafka的接入點地址列表。如何查看接入點,請參見查看接入點。
kafka_topic_list
是
以英文逗號(,)分隔的Topic名稱列表。如何查看Topic名稱,請參見創建Topic。
kafka_group_name
是
Kafka的消費組名稱。更多信息,請參見創建Group。
kafka_format
是
云數據庫ClickHouse支持處理的消息體格式。
說明云數據庫ClickHouse支持的消息體格式,更多信息請參見Formats for Input and Output Data。
kafka_row_delimiter
否
行分隔符,用于分隔不同的數據行。默認為“\n”,您也可以根據數據寫入的實際分隔格式進行設置。
kafka_num_consumers
否
單個表的消費者數量,默認值為1。
說明一個消費者的吞吐量不足時,需要指定更多的消費者。
消費者的總數不應超過Topic中的分區數,因為每個分區只能分配一個消費者。
kafka_thread_per_consumer
否
指定每個消費者是否啟動獨立線程進行消費,默認值為0。取值說明如下。
0:表示所有消費者共同使用1個線程消費。
1:表示每個消費者啟動獨立線程進行消費。
提升消費速度更多信息請參見Kafka性能調優。
kafka_max_block_size
否
Kafka消息的最大批次大小,單位:Byte,默認值為65536。
kafka_skip_broken_messages
否
Kafka消息解析器對于臟數據的容忍度,默認值為0。如果
kafka_skip_broken_messages=N
,則引擎將跳過N條無法解析的Kafka消息(一條消息等于一行數據)。kafka_commit_every_batch
否
執行Kafka Commit的頻率,默認值為0,取值說明如下:
0:完全寫入一整個Block數據塊的數據后才執行Commit。
1:每寫完一個Batch批次的數據就執行一次Commit。
kafka_auto_offset_reset
否
消息的偏移量,從哪個offset開始讀取Kafka數據,取值說明如下:
earliest(默認值):從最早的offset開始讀取Kafka數據。
latest:從最晚的offset開始讀取Kafka數據。
說明21.8版本的云數據庫ClickHouse集群不支持該參數。
說明更多參數說明,請參見Kafka。
示例語句如下。
CREATE TABLE default.kafka_src_table ON CLUSTER default ( --定義表結構的字段 id Int32, name String ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'alikafka-post-cn-tl32i5sc****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-tl32i5sc****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-tl32i5sc****-3-vpc.alikafka.aliyuncs.com:9092', kafka_topic_list = 'test', kafka_group_name = 'test', kafka_format = 'CSV';
創建云數據庫ClickHouse表。
說明創建本地表和分布式表的目的。更多信息,請參見基本概念。
創建本地表。
CREATE TABLE default.kafka_table_local ON CLUSTER default ( id Int32, name String ) ENGINE = MergeTree() ORDER BY (id);
創建分布式表。
說明如果您只需要同步數據至本地表,可跳過此步驟。
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);
創建視圖將Kafka消費表的數據同步到云數據庫ClickHouse的分布式表。
說明如果您同步的目的表是本地表,請將分布式表名更換為本地表名,再進行同步。
創建視圖語法如下。
CREATE MATERIALIZED VIEW [view.name] ON CLUSTER default TO [dest_table] AS SELECT * FROM [src_table];
示例語句如下。
CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;
驗證同步結果
您可以選擇以下任意一種方式驗證同步結果。
查詢云數據庫ClickHouse分布式表驗證同步結果
在云消息隊列Kafka版的Topic端發送消息。
登錄消息隊列Kafka控制臺。
在實例列表頁面,單擊目標實例名稱。
在Topic管理頁面,單擊目標Topic操作列的 。
在快速體驗消息收發頁面,輸入發送的消息內容。
本文以發送消息
1,a
和2,b
為例。單擊確定。
查詢云數據庫ClickHouse分布式表,確認數據是否同步成功,查詢語句如下。
SELECT * FROM kafka_table_distributed;
說明如果您同步的目的表是本地表,請將查詢語句中的分布式表名更換為本地表名,再進行查詢。
查詢結果如下。
┌─id─┬─name─┐ │ 1 │ a │ │ 2 │ b │ └────┴──────┘
說明當您執行查詢語句并成功返回結果時,說明數據已從Kafka同步至云數據庫ClickHouse。
查詢系統表驗證同步結果
通過查詢系統表system.kafka
查看Kafka消費表的消費狀態,查詢語句如下。
SELECT * FROM system.kafka
查詢結果如下。
┌─database─┬──────────table──────────────┬─topic─┬─consumer_group─┬─last_read_message_count─┬───────status──────┬─exception─┐
│ default │ kafka_table_distributed │ test │ test │ 2 │ attach_view │ │
└──────────┴─────────────────────────────┴───────┴────────────────┴─────────────────────────┴───────────────────┴───────────┘
查詢結果說明如下。
名稱 | 說明 |
database | Kafka消費表的數據庫名稱。 |
table | Kafka消費表的表名。 |
topic | Kafka消費表的topic名稱。 |
consumer_group | Kafka消費表的group名稱。 |
last_read_message_count | 拉取到的Kafka消費表的消息數量。 |
status | Kafka消費表的消費狀態。取值說明:
|
exception | 異常詳情。 說明 當status取值為error時,該參數返回異常詳情。 |
常見問題
從Kafka同步數據到云數據庫ClickHouse的常見問題及處理方法,請參見常見問題。