ClickHouse
本文為您介紹如何使用ClickHouse連接器。
背景信息
ClickHouse是一個用于聯(lián)機分析(OLAP)的列式數(shù)據(jù)庫管理系統(tǒng),詳情請參見什么是ClickHouse。
ClickHouse連接器支持的信息如下.
類別 | 詳情 |
支持類型 | 僅支持結(jié)果表 |
運行模式 | 批模式和流模式 |
數(shù)據(jù)格式 | 暫不適用 |
特有監(jiān)控指標(biāo) |
說明 指標(biāo)含義詳情,請參見監(jiān)控指標(biāo)說明。 |
API種類 | SQL |
是否支持更新或刪除結(jié)果表數(shù)據(jù) | 當(dāng)Flink結(jié)果表的DDL上指定了Primary Key,且參數(shù) ignoreDelete設(shè)置為false時,則支持更新或刪除結(jié)果表數(shù)據(jù),但性能會顯著下降。 |
特色功能
對于ClickHouse的分布式表,支持直接寫對應(yīng)的本地表。
對于EMR的ClickHouse,提供Exactly Once的語義。
前提條件
已創(chuàng)建ClickHouse表,詳情請參見創(chuàng)建表。
已配置白名單。
如果您使用的是阿里云數(shù)據(jù)庫ClickHouse,配置白名單詳情請參見設(shè)置白名單。
如果您使用的是阿里云E-MapReduce的ClickHouse,配置白名單詳情請參見管理安全組。
如果您使用的是阿里云ECS上自建的ClickHouse,配置白名單詳情請參見安全組概述。
如果為其他情況,請您自行配置ClickHouse所在機器的白名單讓其可被Flink所在機器訪問即可。
說明如何查看Flink虛擬交換機的網(wǎng)段,請參見如何設(shè)置白名單?。
使用限制
暫不支持配置sink.parallelism參數(shù)。
ClickHouse結(jié)果表保證At-Least-Once語義。
僅Flink計算引擎VVR 3.0.2及以上版本支持ClickHouse連接器。
僅Flink計算引擎VVR 3.0.3,VVR 4.0.7及以上版本支持ignoreDelete選項。
僅Flink計算引擎VVR 4.0.10及以上版本支持ClickHouse的Nested類型。
僅Flink計算引擎VVR 4.0.11及以上版本支持直接將數(shù)據(jù)寫入到ClickHouse分布式表對應(yīng)的本地表。
僅Flink計算引擎VVR 4.0.11及以上版本提供寫EMR的ClickHouse的Exactly Once語義。但對EMR-3.45.1和EMR-5.11.1之后版本的ClickHouse,由于EMR ClickHouse產(chǎn)品能力變更,也不再提供Exactly Once語義。
僅Flink計算引擎VVR 8.0.7及以上版本支持使用balance的策略來均勻地將數(shù)據(jù)寫入ClickHouse的本地表。
僅ClickHouse社區(qū)兼容版支持寫ClickHouse本地表。
語法結(jié)構(gòu)
CREATE TABLE clickhouse_sink (
id INT,
name VARCHAR,
age BIGINT,
rate FLOAT
) WITH (
'connector' = 'clickhouse',
'url' = '<yourUrl>',
'userName' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTablename>',
'maxRetryTimes' = '3',
'batchSize' = '8000',
'flushIntervalMs' = '1000'
'ignoreDelete' = 'true',
'shardWrite' = 'false',
'writeMode' = 'partition',
'shardingKey' = 'id'
);
WITH參數(shù)
參數(shù) | 說明 | 數(shù)據(jù)類型 | 是否必填 | 默認(rèn)值 | 備注 |
connector | 結(jié)果表類型。 | String | 是 | 無 | 固定值為clickhouse。 |
url | ClickHouse的JDBC連接地址。 | String | 是 | 無 | URL格式為 說明 如果您要將數(shù)據(jù)寫入ClickHouse分布式表,則URL為該分布式表所在節(jié)點的JDBC URL。 |
userName | ClickHouse的用戶名。 | String | 是 | 無 | 無。 |
password | ClickHouse的密碼。 | String | 是 | 無 | 無。 |
tableName | ClickHouse的表名稱。 | String | 是 | 無 | 無。 |
maxRetryTimes | 向結(jié)果表插入數(shù)據(jù)失敗后的最大嘗試次數(shù)。 | Int | 否 | 3 | 無。 |
batchSize | 一次批量寫入的數(shù)據(jù)條數(shù)。 | Int | 否 | 100 | 如果緩存中的數(shù)據(jù)條數(shù)達到了batchSize參數(shù)值,或者等待時間超過flushIntervalMs后,系統(tǒng)將會自動將緩存中的數(shù)據(jù)寫入ClickHouse表中。 |
flushIntervalMs | 清空緩存的時間間隔。 | Long | 否 | 1000 | 單位為毫秒。 |
ignoreDelete | 是否忽略Delete消息。 | Boolean | 否 | true | 參數(shù)取值如下:
說明 如果設(shè)置ignoreDelete=false,則無法支持以partition的方式寫ClickHouse分布表的本地表,所以就不能再設(shè)置writeMode為partition。 |
shardWrite | 對于ClickHouse分布式表,是否直接寫ClickHouse的本地表。 | Boolean | 否 | false | 參數(shù)取值如下:
|
inferLocalTable | 對于寫ClickHouse分布式表,是否嘗試推測分布式表的本地表信息,然后直接寫入本地表中。 | Boolean | 否 | false | 參數(shù)取值如下:
說明 對于寫ClickHouse非分布式表,可直接忽略該參數(shù)。 |
writeMode | 對于ClickHouse分布式表,采用何種策略寫ClickHouse的本地表。 | Enum | 否 | default | 參數(shù)取值如下:
說明 如果設(shè)置了writeMode=partition,請確保配置項ignoreDelete為true。 |
shardingKey | 按何種key將數(shù)據(jù)寫到同一個節(jié)點的本地表。 | default | 否 | 無 | 當(dāng)writeMode取值為partition時,shardingKey值必填,可包含多個字段,多個字段以英文逗號(,)分隔。 |
exactlyOnce | 是否開啟exactlyOnce語義。 | Boolean | 否 | false | 參數(shù)取值如下:
說明
|
類型映射
Flink字段類型 | ClickHouse字段類型 |
BOOLEAN | UInt8 / Boolean 說明 ClickHouse v21.12及以上版本支持Boolean類型。如果您使用的ClickHouse是v21.12以下版本,F(xiàn)link的Boolean類型則對應(yīng)ClickHouse的UInt8類型。 |
TINYINT | Int8 |
SMALLINT | Int16 |
INTEGER | Int32 |
BIGINT | Int64 |
BIGINT | UInt32 |
FLOAT | Float32 |
DOUBLE | Float64 |
CHAR | FixedString |
VARCHAR | String |
BINARY | FixedString |
VARBINARY | String |
DATE | Date |
TIMESTAMP(0) | DateTime |
TIMESTAMP(x) | Datetime64(x) |
DECIMAL | DECIMAL |
ARRAY | ARRAY |
Nested |
ClickHouse暫不支持Flink的TIME、MAP、MULTISET和ROW類型。
對于ClickHouse的Nested類型,需要將其映射成Flink的ARRAY類型,例如:
-- ClickHouse
CREATE TABLE visits (
StartDate Date,
Goals Nested
(
ID UInt32,
OrderID String
)
...
);
需要映射為:
-- Flink
CREATE TABLE visits (
StartDate DATE,
`Goals.ID` ARRAY<LONG>,
`Goals.OrderID` ARRAY<STRING>
);
ClickHouse的DateTime類型可以精確到秒,Datetime64可以精確到納秒。對于VVR-6.0.6之前的版本,因為ClickHouse官方提供的JDBC寫Datetime64數(shù)據(jù)類型會出現(xiàn)精度丟失,只能精確到秒的問題,所以通過Flink只能寫入秒級別的TIMESTAMP,即TIMESTAMP(0)。VVR-6.0.6及之后的版本修復(fù)了這個精度丟失問題,通過Flink可以正常寫Datetime64類型的數(shù)據(jù)。
使用示例
示例1:寫ClickHouse單節(jié)點表。
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = '<yourUrl>', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = '<yourTablename>' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;
示例2:寫ClickHouse分布式表。
假設(shè)您已經(jīng)有三個本地表,表名為local_table_test,分別在192.XX.XX.1、192.XX.XX.2和192.XX.XX.3節(jié)點上。然后基于這三個本地表,創(chuàng)建了一個分布式表distributed_table_test。
此時,如果您希望Flink可以直接寫本地表,并且可以按照某個key將相同key的數(shù)據(jù)寫到同一個節(jié)點的本地表中,則DDL代碼示例如下。
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002,192.XX.XX.2:3002,192.XX.XX.3:3002/default', 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'local_table_test', 'shardWrite' = 'true', 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;
此時,如果您不想手動指定本地表的節(jié)點,可以讓Flink來自動推測本地表節(jié)點,DDL代碼示例如下:
CREATE TEMPORARY TABLE clickhouse_source ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '50' ); CREATE TEMPORARY TABLE clickhouse_output ( id INT, name VARCHAR, age BIGINT, rate FLOAT ) WITH ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://192.XX.XX.1:3002/default', -- 分布式表所在節(jié)點對應(yīng)的JDBC URL。 'userName' = '<yourUsername>', 'password' = '<yourPassword>', 'tableName' = 'distributed_table_test', --為分布式表的名字。 'shardWrite' = 'true', 'inferLocalTable' = 'true', --需設(shè)置inferLocalTable為true。 'writeMode' = 'partition', 'shardingKey' = 'name' ); INSERT INTO clickhouse_output SELECT id, name, age, rate FROM clickhouse_source;