本文為您介紹如何使用云原生多模數據庫Lindorm連接器。
背景信息
Lindorm是面向物聯網、互聯網、車聯網等設計和優化的云原生多模超融合數據庫,是日志、監控、賬單、廣告、社交、出行、風控等場景首選數據庫,也是為阿里巴巴核心業務提供支撐的數據庫之一。詳情請參見什么是云原生多模數據庫Lindorm。
具備以下特性:
支持寬表、時序、文本、對象、流、空間等多種數據的統一訪問和融合處理。
兼容SQL、HBase/Cassandra/S3、TSDB、HDFS、Solr、Kafka等多種標準接口和無縫集成三方生態工具。
Lindorm連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 維表和結果表 |
運行模式 | 僅支持流模式 |
數據格式 | 暫不適用 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | SQL |
支持的Lindorm引擎 | 寬表引擎 |
是否支持更新或刪除結果表數據 | 是 |
前提條件
已經創建了Lindorm寬表引擎以及數據表,詳情請參見創建實例。
Lindorm集群與Flink全托管集群處于網絡連通的環境下,例如在同一個VPC下。
使用限制
僅Flink計算引擎VVR 4.0.8及以上版本支持Lindorm。
語法結構
CREATE TABLE white_list (
id varchar,
name varchar,
age int,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);
WITH參數
類型 | 參數 | 說明 | 數據類型 | 是否必填 | 默認值 | 備注 |
通用 | connector | 表類型。 | String | 是 | 無 | 固定值為lindorm。 |
seedserver | Lindorm服務器的連接地址。 | String | 是 | 無 | 實時計算Flink版使用HBase Java API的方式連接并使用Lindorm寬表引擎。Lindorm服務器的連接地址的格式為 | |
namespace | Lindorm的命名空間。 | String | 是 | 無 | 無。 | |
username | 連接Lindorm所用到的用戶名。 | String | 是 | 無 | 無。 | |
password | 連接Lindorm所用到的密碼。 | String | 是 | 無 | 無。 | |
tableName | Lindorm表名。 | String | 是 | 無 | 無。 | |
columnFamily | Lindorm表的列族名。 | String | 是 | 無 | 如果創建Lindorm表時未指定列族名,則填寫默認列族名f。 | |
retryIntervalMs | 讀取或寫入失敗時,再次重試讀取的時間間隔。 | Integer | 否 | 1000 | 單位為毫秒。 | |
maxRetryTimes | 最大嘗試次數。 | Integer | 否 | 5 | 無。 | |
結果表獨有 | bufferSize | 一次批量寫入數據的條數。 | Integer | 否 | 500 | 無。 |
flushIntervalMs | 當數據量比較少時,多長時間寫入一次。 | Integer | 否 | 2000 | 單位為毫秒。 | |
ignoreDelete | 是否忽略Delete操作。 | Boolean | 否 | false | 參數取值如下:
| |
dynamicColumnSink | 是否開啟動態表模式。關于動態表模式的介紹,請參見動態表模式。 | Boolean | 否 | false | 參數取值如下:
說明 實時計算引擎VVR 6.0.2及以上版本支持該參數。 | |
excludeUpdateColumns | 指定字段忽略更新,不會插入結果表。 | String | 否 | 無 | 使用逗號分隔要忽略的字段。例如: 說明 實時計算引擎VVR 8.0.9及以上版本支持該參數。 | |
維表獨有 | partitionedJoin | 是否額外使用JoinKey進行分區。 | Boolean | 否 | false | 參數取值如下:
|
shuffleEmptyKey | 遇到空Key時,是否將Key為空的記錄隨機向下游Shuffle。 | Boolean | 否 | false | 參數取值如下:
| |
cache | 緩存策略。 | String | 否 | None | 目前Lindorm支持以下兩種緩存策略:
需要配置相關參數:緩存大?。╟acheSize)和緩存失效超時時間(cacheTTLMs)。 | |
cacheSize | 緩存數據的行數。 | Integer | 否 | 1000 | 當選擇LRU緩存策略后,使用本參數可以設置緩存大小。 | |
cacheTTLMs | 緩存失效超時時間。 | Integer | 否 | 無 | 單位為毫秒。當選擇LRU緩存策略后,可以設置緩存失效的超時時間,默認不過期。 | |
cacheEmpty | 是否緩存join結果為空的數據。 | Boolean | 否 | true | 無。 | |
async | 是否異步返回數據。 | Boolean | 否 | false | 參數取值如下:
| |
asyncLindormRpcTimeoutMs | 在異步請求數據時的超時時間。 | Integer | 否 | 300000 | 單位毫秒。 |
動態表模式
動態表模式適用于在表定義中并未指定列名的情況,根據作業運行情況動態創建數據列并插入的場景。例如統計每天每小時的交易量,以天作為主鍵,小時作為列,每個小時的數據都是動態生成的,示例如下。
主鍵 | 列名:0點 | 列名:1點 |
2025-06-01 | 45 | 32 |
2025-06-02 | 76 | 34 |
動態表需要遵循特殊的DDL定義。其主鍵需要定義為前若干列,最后兩列中前一列的值作為列名變量,最后一列的值作為該列對應的值,且要求最后兩列的類型均為varchar。代碼示例如下。
CREATE TABLE lindorm_dynamic_output(
pk1 varchar,
pk2 varchar,
pk3 varchar,
c1 varchar,
c2 varchar,
PRIMARY KEY (pk1,pk2,pk3) NOT ENFORCED
) WITH (
'connector' = 'lindorm',
'seedserver' = '<yourSeedServer>',
'namespace' = '<yourNamespace>',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'tableName' = '<yourTableName>',
'columnFamily' = '<yourColumnFamily>'
);
上述定義中,pk1、pk2、pk3為主鍵,c1、c2為動態表模式所必須的兩列且一定為最后兩列,不可存在其他的非主鍵的列。每次寫入數據時,會在主鍵<pk1, pk2, pk3>對應的條目中添加或更改一列,列名為c1的值,該列的值為c2的值。每次一條數據來臨時,只會添加或更改一列對應的值,其他列的值不會改變。
類型映射
Lindorm中數據均為二進制形式,通過Flink某個字段類型來轉換或解析二進制的Bytes方法如下。
Flink SQL類型 | 轉換為寫入的Bytes使用的方法 | 從Lindorm讀取Bytes之后的解析 |
CHAR | org.apache.flink.table.data.StringData::toBytes | org.apache.flink.table.data.StringData::fromBytes |
VARCHAR | ||
BOOLEAN | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(boolean) | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
BINARY | 直接為bytes的形式。 | 直接返回bytes。 |
VARBINARY | ||
DECIMAL | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(BigDecimal) | com.alibaba.lindorm.client.core.utils.Bytes::toBigDecimal |
TINYINT | 直接將數據封裝成byte[]的第一個byte。 | 直接返回bytes[0]。 |
SMALLINT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(short) | com.alibaba.lindorm.client.core.utils.Bytes::toShort |
INT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int) | com.alibaba.lindorm.client.core.utils.Bytes::toInt |
BIGINT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long) | com.alibaba.lindorm.client.core.utils.Bytes::toLong |
FLOAT | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(float) | com.alibaba.lindorm.client.core.utils.Bytes::toFloat |
DOUBLE | com.alibaba.lindorm.client.core.utils.Bytes::toBytes(double) | com.alibaba.lindorm.client.core.utils.Bytes::toDouble |
DATE | 獲取自1970.01.01以來的天數后,調用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。 | com.alibaba.lindorm.client.core.utils.Bytes::toInt得到自1970.01.01以來的天數。 |
TIME | 獲取自當天00:00:00以來的毫秒數后,調用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(int)。 | com.alibaba.lindorm.client.core.utils.Bytes::toInt得到自當天00:00:00以來的毫秒數。 |
TIMESTAMP | 獲取自1970.01.01 00:00:00以來的毫秒數后,調用com.alibaba.lindorm.client.core.utils.Bytes::toBytes(long)。 | com.alibaba.lindorm.client.core.utils.Bytes::toLong得到自1970.01.01 00:00:00以來的毫秒數。 |
代碼示例
CREATE TEMPORARY TABLE example_source(
id INT,
proc_time AS PROCTIME()
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10',
'fields.id.kind' = 'sequence',
'fields.id.start' = '0',
'fields.id.end' = '9'
);
CREATE TEMPORARY TABLE lindorm_hbase_dim(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_dim_table}',
'seedserver'='${lindorm_seed_server}',
'namespace'='default',
'username'='${lindorm_username}',
'password'='${lindorm_username}'
);
CREATE TEMPORARY TABLE lindorm_hbase_sink(
`id` INT,
`name` VARCHAR,
`birth` VARCHAR,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector'='lindorm',
'tablename'='${lindorm_sink_table}',
'seedserver'='${lindorm_seed_server}',
'namespace'='default',
'username'='${lindorm_username}',
'password'='${lindorm_username}'
);
INSERT INTO lindorm_hbase_sink
SELECT example_source.id as id, lindorm_hbase_dim.name as name, lindorm_hbase_dim.birth as birth
FROM example_source JOIN lindorm_hbase_dim FOR SYSTEM_TIME AS OF PROCTIME() ON example_source.id = lindorm_hbase_dim.id;