日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

從Kafka同步數據

當您需要將消息隊列Kafka的數據實時同步至云數據庫ClickHouse中時,本文檔為您提供了詳細的解決方案,以滿足您的數據實時處理需求。

說明

云數據庫ClickHouse集群從Kafka進行數據同步目前僅支持云消息隊列Kafka和部署在ECS上的自建Kafka。本文以云消息隊列Kafka進行數據同步為例。

前提條件

  • 已創建目標云數據庫ClickHouse集群,且保證消息隊列Kafka和目標云數據庫ClickHouse集群在同一地域并使用相同的VPC。如何創建,請參見新建集群

  • 目標云數據庫ClickHouse集群已創建登錄數據庫的賬號且已具備數據庫的操作權限。如何創建,請參見創建賬號準備權限

操作步驟

  1. 登錄云數據庫ClickHouse控制臺

  2. 在頁面左上角,選擇目標集群所在的地域。

  3. 集群列表頁面,選擇目標集群對應類型的實例列表,單擊目標集群ID。

  4. 集群信息頁面,單擊右上方導航欄的登錄數據庫

  5. 在DMS數據管理服務控制臺的登錄實例頁面,輸入數據庫賬號密碼,單擊登錄

  6. 創建Kafka消費表。

    說明
    • 創建Kafka消費表是為了將Kafka消費表的數據同步到云數據庫ClickHouse的表中。

    • Kafka消費表不能直接使用。

    • Kafka消費表只是用來消費Kafka數據,沒有真正地存儲數據,最終消費數據需要實現消息批處理后進行消費。

    建表語法如下。

    CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
    (
        name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
        name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
        ...
    ) ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = 'host:port1,host:port2,host:port3',
        kafka_topic_list = 'topic_name1,topic_name2,...',
        kafka_group_name = 'group_name',
        kafka_format = 'data_format'[,]
        [kafka_row_delimiter = 'delimiter_symbol',]
        [kafka_num_consumers = N,]
        [kafka_thread_per_consume = 1,]
        [kafka_max_block_size = 0,]
        [kafka_skip_broken_messages = N,]
        [kafka_commit_every_batch = 0,]
        [kafka_auto_offset_reset = N]

    常用參數說明如下。

    名稱

    是否必選

    說明

    kafka_broker_list

    以英文逗號(,)分隔的Kafka的接入點地址列表。如何查看接入點,請參見查看接入點

    kafka_topic_list

    以英文逗號(,)分隔的Topic名稱列表。如何查看Topic名稱,請參見創建Topic

    kafka_group_name

    Kafka的消費組名稱。更多信息,請參見創建Group

    kafka_format

    云數據庫ClickHouse支持處理的消息體格式。

    說明

    云數據庫ClickHouse支持的消息體格式,更多信息請參見Formats for Input and Output Data

    kafka_row_delimiter

    行分隔符,用于分隔不同的數據行。默認為“\n”,您也可以根據數據寫入的實際分隔格式進行設置。

    kafka_num_consumers

    單個表的消費者數量,默認值為1。

    說明
    • 一個消費者的吞吐量不足時,需要指定更多的消費者。

    • 消費者的總數不應超過Topic中的分區數,因為每個分區只能分配一個消費者。

    kafka_thread_per_consumer

    指定每個消費者是否啟動獨立線程進行消費,默認值為0。取值說明如下。

    • 0:表示所有消費者共同使用1個線程消費。

    • 1:表示每個消費者啟動獨立線程進行消費。

    提升消費速度更多信息請參見Kafka性能調優

    kafka_max_block_size

    Kafka消息的最大批次大小,單位:Byte,默認值為65536。

    kafka_skip_broken_messages

    Kafka消息解析器對于臟數據的容忍度,默認值為0。如果kafka_skip_broken_messages=N,則引擎將跳過N條無法解析的Kafka消息(一條消息等于一行數據)。

    kafka_commit_every_batch

    執行Kafka Commit的頻率,默認值為0,取值說明如下:

    • 0:完全寫入一整個Block數據塊的數據后才執行Commit。

    • 1:每寫完一個Batch批次的數據就執行一次Commit。

    kafka_auto_offset_reset

    消息的偏移量,從哪個offset開始讀取Kafka數據,取值說明如下:

    • earliest(默認值):從最早的offset開始讀取Kafka數據。

    • latest:從最晚的offset開始讀取Kafka數據。

    說明

    21.8版本的云數據庫ClickHouse集群不支持該參數。

    說明

    更多參數說明,請參見Kafka

    示例語句如下。

    CREATE TABLE default.kafka_src_table ON CLUSTER default
    (   --定義表結構的字段
        id Int32,
        name String               
    ) ENGINE = Kafka()
    SETTINGS
        kafka_broker_list = 'alikafka-post-cn-tl32i5sc****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-tl32i5sc****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-tl32i5sc****-3-vpc.alikafka.aliyuncs.com:9092',
        kafka_topic_list = 'test',
        kafka_group_name = 'test',
        kafka_format = 'CSV';
  7. 創建云數據庫ClickHouse表。

    說明

    創建本地表和分布式表的目的。更多信息,請參見基本概念

    1. 創建本地表。

      CREATE TABLE default.kafka_table_local ON CLUSTER default (
        id Int32,
        name String
      ) ENGINE = MergeTree()
      ORDER BY (id);
    2. 創建分布式表。

      說明

      如果您只需要同步數據至本地表,可跳過此步驟。

      CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
      ENGINE = Distributed(default, default, kafka_table_local, id);
  8. 創建視圖將Kafka消費表的數據同步到云數據庫ClickHouse的分布式表。

    說明

    如果您同步的目的表是本地表,請將分布式表名更換為本地表名,再進行同步。

    創建視圖語法如下。

    CREATE MATERIALIZED VIEW [view.name] ON CLUSTER default TO [dest_table] AS SELECT * FROM [src_table];

    示例語句如下。

    CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;

驗證同步結果

您可以選擇以下任意一種方式驗證同步結果。

查詢云數據庫ClickHouse分布式表驗證同步結果

  1. 在云消息隊列Kafka版的Topic端發送消息。

    1. 登錄消息隊列Kafka控制臺

    2. 實例列表頁面,單擊目標實例名稱。

    3. Topic管理頁面,單擊目標Topic操作列的更多 > 體驗發送消息

    4. 快速體驗消息收發頁面,輸入發送的消息內容

      本文以發送消息1,a2,b為例。

    5. 單擊確定

  2. 查詢云數據庫ClickHouse分布式表,確認數據是否同步成功,查詢語句如下。

    SELECT * FROM kafka_table_distributed; 
    說明

    如果您同步的目的表是本地表,請將查詢語句中的分布式表名更換為本地表名,再進行查詢。

    查詢結果如下。

    ┌─id─┬─name─┐
    │  1 │  a   │
    │  2 │  b   │
    └────┴──────┘
    說明

    當您執行查詢語句并成功返回結果時,說明數據已從Kafka同步至云數據庫ClickHouse

查詢系統表驗證同步結果

通過查詢系統表system.kafka查看Kafka消費表的消費狀態,查詢語句如下。

SELECT * FROM system.kafka

查詢結果如下。

┌─database─┬──────────table──────────────┬─topic─┬─consumer_group─┬─last_read_message_count─┬───────status──────┬─exception─┐
│  default │  kafka_table_distributed    │ test  │   test         │          2              │   attach_view     │           │  
└──────────┴─────────────────────────────┴───────┴────────────────┴─────────────────────────┴───────────────────┴───────────┘

查詢結果說明如下。

名稱

說明

database

Kafka消費表的數據庫名稱。

table

Kafka消費表的表名。

topic

Kafka消費表的topic名稱。

consumer_group

Kafka消費表的group名稱。

last_read_message_count

拉取到的Kafka消費表的消息數量。

status

Kafka消費表的消費狀態。取值說明:

  • no_view:Kafka消費表沒有創建視圖。

  • attach_view:Kafka消費表創建了視圖。

  • normal:正常狀態。

    說明

    當kafka消費表有消費數據時,Kafka消費表的消費狀態為normal

  • skip_parse:跳過錯誤解析。

  • error:消費異常。

exception

異常詳情。

說明

status取值為error時,該參數返回異常詳情。

常見問題

從Kafka同步數據到云數據庫ClickHouse的常見問題及處理方法,請參見常見問題