日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

ClickHouse

本文為您介紹如何使用ClickHouse連接器。

背景信息

ClickHouse是一個用于聯(lián)機分析(OLAP)的列式數(shù)據(jù)庫管理系統(tǒng),詳情請參見什么是ClickHouse

ClickHouse連接器支持的信息如下.

類別

詳情

支持類型

僅支持結(jié)果表

運行模式

批模式和流模式

數(shù)據(jù)格式

暫不適用

特有監(jiān)控指標(biāo)

  • numRecordsOut

  • numRecordsOutPerSecond

  • currentSendTime

說明

指標(biāo)含義詳情,請參見監(jiān)控指標(biāo)說明

API種類

SQL

是否支持更新或刪除結(jié)果表數(shù)據(jù)

當(dāng)Flink結(jié)果表的DDL上指定了Primary Key,且參數(shù) ignoreDelete設(shè)置為false時,則支持更新或刪除結(jié)果表數(shù)據(jù),但性能會顯著下降。

特色功能

  • 對于ClickHouse的分布式表,支持直接寫對應(yīng)的本地表。

  • 對于EMR的ClickHouse,提供Exactly Once的語義。

前提條件

  • 已創(chuàng)建ClickHouse表,詳情請參見創(chuàng)建表

  • 已配置白名單。

    • 如果您使用的是阿里云數(shù)據(jù)庫ClickHouse,配置白名單詳情請參見設(shè)置白名單

    • 如果您使用的是阿里云E-MapReduce的ClickHouse,配置白名單詳情請參見管理安全組

    • 如果您使用的是阿里云ECS上自建的ClickHouse,配置白名單詳情請參見安全組概述

    • 如果為其他情況,請您自行配置ClickHouse所在機器的白名單讓其可被Flink所在機器訪問即可。

    說明

    如何查看Flink虛擬交換機的網(wǎng)段,請參見如何設(shè)置白名單?

使用限制

  • 暫不支持配置sink.parallelism參數(shù)。

  • ClickHouse結(jié)果表保證At-Least-Once語義。

  • 僅Flink計算引擎VVR 3.0.2及以上版本支持ClickHouse連接器。

  • 僅Flink計算引擎VVR 3.0.3,VVR 4.0.7及以上版本支持ignoreDelete選項。

  • 僅Flink計算引擎VVR 4.0.10及以上版本支持ClickHouse的Nested類型。

  • 僅Flink計算引擎VVR 4.0.11及以上版本支持直接將數(shù)據(jù)寫入到ClickHouse分布式表對應(yīng)的本地表。

  • 僅Flink計算引擎VVR 4.0.11及以上版本提供寫EMR的ClickHouse的Exactly Once語義。但對EMR-3.45.1和EMR-5.11.1之后版本的ClickHouse,由于EMR ClickHouse產(chǎn)品能力變更,也不再提供Exactly Once語義。

  • 僅Flink計算引擎VVR 8.0.7及以上版本支持使用balance的策略來均勻地將數(shù)據(jù)寫入ClickHouse的本地表。

  • 僅ClickHouse社區(qū)兼容版支持寫ClickHouse本地表。

語法結(jié)構(gòu)

CREATE TABLE clickhouse_sink (
  id INT,
  name VARCHAR,
  age BIGINT,
  rate FLOAT
) WITH (
  'connector' = 'clickhouse',
  'url' = '<yourUrl>',
  'userName' = '<yourUsername>',
  'password' = '<yourPassword>',
  'tableName' = '<yourTablename>',
  'maxRetryTimes' = '3',
  'batchSize' = '8000',
  'flushIntervalMs' = '1000'
  'ignoreDelete' = 'true',
  'shardWrite' = 'false',
  'writeMode' = 'partition',
  'shardingKey' = 'id'
);

WITH參數(shù)

參數(shù)

說明

數(shù)據(jù)類型

是否必填

默認(rèn)值

備注

connector

結(jié)果表類型。

String

固定值為clickhouse

url

ClickHouse的JDBC連接地址。

String

URL格式為jdbc:clickhouse://<yourNetworAddress>:<PortId>/<yourDatabaseName>直接寫本地表時,節(jié)點IP可以在ClickHouse執(zhí)行 select * from system.clusters獲取。如果不寫數(shù)據(jù)庫名稱,則使用默認(rèn)的default數(shù)據(jù)庫。

說明

如果您要將數(shù)據(jù)寫入ClickHouse分布式表,則URL為該分布式表所在節(jié)點的JDBC URL。

userName

ClickHouse的用戶名。

String

無。

password

ClickHouse的密碼。

String

無。

tableName

ClickHouse的表名稱。

String

無。

maxRetryTimes

向結(jié)果表插入數(shù)據(jù)失敗后的最大嘗試次數(shù)。

Int

3

無。

batchSize

一次批量寫入的數(shù)據(jù)條數(shù)。

Int

100

如果緩存中的數(shù)據(jù)條數(shù)達到了batchSize參數(shù)值,或者等待時間超過flushIntervalMs后,系統(tǒng)將會自動將緩存中的數(shù)據(jù)寫入ClickHouse表中。

flushIntervalMs

清空緩存的時間間隔。

Long

1000

單位為毫秒。

ignoreDelete

是否忽略Delete消息。

Boolean

true

參數(shù)取值如下:

  • true(默認(rèn)值):忽略。

  • false:不忽略。

    如果為false,并且在DDL中聲明了Primary Key,則會使用ClickHouse的ALTER語句來刪除數(shù)據(jù)。

說明

如果設(shè)置ignoreDelete=false,則無法支持以partition的方式寫ClickHouse分布表的本地表,所以就不能再設(shè)置writeMode為partition。

shardWrite

對于ClickHouse分布式表,是否直接寫ClickHouse的本地表。

Boolean

false

參數(shù)取值如下:

  • false(默認(rèn)值):先寫ClickHouse的分布式表,再由分布式表寫入對應(yīng)的本地表。此時tableName應(yīng)為分布式表的名稱。

  • true:跳過分布式表,直接將數(shù)據(jù)寫到該ClickHouse分布式表對應(yīng)的本地表。

    如果需要提高寫ClickHouse分布式表的吞吐量,則建議將該值設(shè)置為true。

    • 如果您需要在URL中手動指定要將數(shù)據(jù)寫到哪些節(jié)點的本地表中。此時tableName應(yīng)該為本地表的名字。代碼示例如下:

      'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002/default'
      'tableName' = 'local_table'
    • 如果您不需要手動指定本地表的節(jié)點,可以通過同時設(shè)置inferLocalTable參數(shù)來讓Flink自動推測本地表的節(jié)點。此時,tableName應(yīng)該為分布式表的名字,且url為該分布式表所在節(jié)點的JDBC URL。代碼示例如下:

      'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default' // 分布式表所在節(jié)點的JDBC URL
      'tableName' = 'distribute_table'

inferLocalTable

對于寫ClickHouse分布式表,是否嘗試推測分布式表的本地表信息,然后直接寫入本地表中。

Boolean

false

參數(shù)取值如下:

  • false(默認(rèn)值):如果是寫ClickHouse分布式表,并且在參數(shù)url中只指定了一個節(jié)點,則不會嘗試推測分布式表對應(yīng)的本地表信息,而是依然會直接寫入分布式表,再由分布式表寫入對應(yīng)的本地表。

  • true:Flink將嘗試推測分布式表的本地表信息,并直接寫入對應(yīng)的本地表。此時需要 shardWrite參數(shù)也同步設(shè)置為 true,tableName設(shè)置為分布式表的名字,并且url設(shè)置為該分布式表所在節(jié)點的JDBC URL。

說明

對于寫ClickHouse非分布式表,可直接忽略該參數(shù)。

writeMode

對于ClickHouse分布式表,采用何種策略寫ClickHouse的本地表。

Enum

default

參數(shù)取值如下:

  • default(默認(rèn)值):表示總是寫入到第一個節(jié)點的本地表。

  • partition:表示將數(shù)據(jù)按key寫到同一個節(jié)點的本地表。

  • random:表示隨機寫到某個節(jié)點的本地表。

  • balance:表示采用round-robin的方式,均勻地將數(shù)據(jù)寫入到本地表節(jié)點中。

說明

如果設(shè)置了writeMode=partition,請確保配置項ignoreDelete為true。

shardingKey

按何種key將數(shù)據(jù)寫到同一個節(jié)點的本地表。

default

當(dāng)writeMode取值為partition時,shardingKey值必填,可包含多個字段,多個字段以英文逗號(,)分隔。

exactlyOnce

是否開啟exactlyOnce語義。

Boolean

false

參數(shù)取值如下:

  • true:開啟。

  • false(默認(rèn)值):不開啟。

說明
  • 目前僅支持寫EMR的ClickHouse的Exactly Once語義。所以只有當(dāng)您寫EMR的ClickHouse時,才能將exactlyOnce設(shè)置為true。

  • 不支持以partition策略寫ClickHouse的本地表的Exactly Once語義。所以如果exactlyOnce設(shè)置為true,則writeMode不能設(shè)置為partition。

類型映射

Flink字段類型

ClickHouse字段類型

BOOLEAN

UInt8 / Boolean

說明

ClickHouse v21.12及以上版本支持Boolean類型。如果您使用的ClickHouse是v21.12以下版本,F(xiàn)link的Boolean類型則對應(yīng)ClickHouse的UInt8類型。

TINYINT

Int8

SMALLINT

Int16

INTEGER

Int32

BIGINT

Int64

BIGINT

UInt32

FLOAT

Float32

DOUBLE

Float64

CHAR

FixedString

VARCHAR

String

BINARY

FixedString

VARBINARY

String

DATE

Date

TIMESTAMP(0)

DateTime

TIMESTAMP(x)

Datetime64(x)

DECIMAL

DECIMAL

ARRAY

ARRAY

Nested

說明

ClickHouse暫不支持Flink的TIME、MAP、MULTISET和ROW類型。

對于ClickHouse的Nested類型,需要將其映射成Flink的ARRAY類型,例如:

-- ClickHouse
CREATE TABLE visits (
  StartDate Date,
  Goals Nested
  (
    ID UInt32,
    OrderID String
  )
  ...
);

需要映射為:

-- Flink
CREATE TABLE visits (
  StartDate DATE,
  `Goals.ID` ARRAY<LONG>,
  `Goals.OrderID` ARRAY<STRING>
);
說明

ClickHouse的DateTime類型可以精確到秒,Datetime64可以精確到納秒。對于VVR-6.0.6之前的版本,因為ClickHouse官方提供的JDBC寫Datetime64數(shù)據(jù)類型會出現(xiàn)精度丟失,只能精確到秒的問題,所以通過Flink只能寫入秒級別的TIMESTAMP,即TIMESTAMP(0)。VVR-6.0.6及之后的版本修復(fù)了這個精度丟失問題,通過Flink可以正常寫Datetime64類型的數(shù)據(jù)。

使用示例

  • 示例1:寫ClickHouse單節(jié)點表。

    CREATE TEMPORARY TABLE clickhouse_source (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '50'
    );
    
    CREATE TEMPORARY TABLE clickhouse_output (
      id INT,
      name VARCHAR,
      age BIGINT,
      rate FLOAT
    ) WITH (
      'connector' = 'clickhouse',
      'url' = '<yourUrl>',
      'userName' = '<yourUsername>',
      'password' = '<yourPassword>',
      'tableName' = '<yourTablename>'
    );
    
    INSERT INTO clickhouse_output
    SELECT
      id,
      name,
      age,
      rate
    FROM clickhouse_source;
  • 示例2:寫ClickHouse分布式表。

    假設(shè)您已經(jīng)有三個本地表,表名為local_table_test,分別在192.XX.XX.1、192.XX.XX.2和192.XX.XX.3節(jié)點上。然后基于這三個本地表,創(chuàng)建了一個分布式表distributed_table_test。

    • 此時,如果您希望Flink可以直接寫本地表,并且可以按照某個key將相同key的數(shù)據(jù)寫到同一個節(jié)點的本地表中,則DDL代碼示例如下。

      CREATE TEMPORARY TABLE clickhouse_source (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default',
        'userName' = '<yourUsername>',
        'password' = '<yourPassword>',
        'tableName' = 'local_table_test',
        'shardWrite' = 'true',
        'writeMode' = 'partition',
        'shardingKey' = 'name'
      );
      
      INSERT INTO clickhouse_output
      SELECT
        id,
        name,
        age,
        rate
      FROM clickhouse_source;
    • 此時,如果您不想手動指定本地表的節(jié)點,可以讓Flink來自動推測本地表節(jié)點,DDL代碼示例如下:

      CREATE TEMPORARY TABLE clickhouse_source (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '50'
      );
      
      CREATE TEMPORARY TABLE clickhouse_output (
        id INT,
        name VARCHAR,
        age BIGINT,
        rate FLOAT
      ) WITH (
        'connector' = 'clickhouse',
        'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- 分布式表所在節(jié)點對應(yīng)的JDBC URL。
        'userName' = '<yourUsername>',
        'password' = '<yourPassword>',
        'tableName' = 'distributed_table_test', --為分布式表的名字。
        'shardWrite' = 'true',
        'inferLocalTable' = 'true', --需設(shè)置inferLocalTable為true。
        'writeMode' = 'partition',
        'shardingKey' = 'name'
      );
      
      INSERT INTO clickhouse_output
      SELECT
        id,
        name,
        age,
        rate
      FROM clickhouse_source;

常見問題