Flink可以處理實時數據流,并將處理結果寫入Lindorm時序引擎,以實現實時數據監控等場景。本文介紹如何將Flink上實時的數據處理結果寫入到時序引擎。
前提條件
已開通實時計算Flink版或者已有自建Flink。實時計算Flink版的開通,請參見開通實時計算Flink版。
說明實時計算Flink版需要VVR 4.0.13及以上版本,VVR 4.0.13版本是基于Apache Flink 1.13。
為了保證網絡的連通性,確保云原生多模數據庫 Lindorm實例和實時計算Flink使用相同的專有網絡。
說明實時計算Flink版默認不具備訪問公網的能力,如需通過公網將數據寫入時序引擎,請參見Flink全托管集群如何訪問公網。
已開通時序引擎。
已將Flink的IP地址添加到Lindorm白名單。如果您使用的是實時計算Flink版,查看IP地址的操作,請參見如何設置白名單。添加Lindorm白名單的操作,請參見設置白名單。
背景信息
時序引擎Sink連接器用于連接其他系統與Lindorm時序引擎,負責從各種數據源接收數據并寫入到時序引擎。實時計算Flink版通過Flink SQL定義源表、維表和結果表,通過定義時序引擎Sink連接器的參數,將結果表映射到Lindorm時序表,從而將Flink處理后的結果數據寫入Lindorm時序引擎。使用時序引擎SINK插件,需要先獲取時序引擎SINK插件,再將JAR包上傳至實時計算Flink版控制臺,上傳方法請參見JAR作業開發。
語法
在實時計算Flink上創建結果表,并配置時序引擎Sink連接器參數,實現Flink結果表到Lindorm時序表的映射。
CREATE TEMPORARY TABLE tsdb_sink(
`timestamp` BIGINT,
tag_<tagname> VARCHAR,
field_<fieldname1> DOUBLE,
field_<fieldname2> VARCHAR,
field_<fieldname3> BIGINT,
field_<fieldname4> BOOLEAN
-- table VARCHAR(可選字段)
)
WITH (
'connector' = 'lindormtsdb',
'url'='<lindormTSDBHttpUrl>',
'table'='<yourTableName>',
'defaultDatabase'='<yourDatabaseName>',
'schemaPolicy'='<schemaPolicy>',
'sink.parallelism'='<sinkParallelism>'
'ignoreErrorData'='<ignoreErrorData>',
'maxRetries'='<maxRetries>',
'batchSize'='<batchSize>',
'connectTimeoutMs'='<connectTimeoutMs>',
'sync'='<sync>',
'debug'='<debug>'
);
參數說明
結果表結構參數說明
字段名 | 數據類型 | 是否必選 | 說明 |
timestamp | BIGINT | 是 | 字段名必須為 單位為毫秒(ms)。 說明
|
tag_tagname | VARCHAR | 是 | 指定時序數據的標簽(Tag)。 示例:tag_deviceid。 說明 tag_tagname可以為一列或者多列。 |
field_fieldname | DOUBLE、VARCHAR、BIGINT、BOOLEAN | 是 | 指定時序數據量測值(Field)。 示例:field_humidity。 說明 field_fieldname可以為一列或者多列。 |
table | VARCHAR | 否 | 指定時序數據表。
|
WITH參數說明
參數 | 是否必選 | 說明 |
connector | 是 | 固定值lindormtsdb,指定時序引擎SINK插件。 |
url | 是 | Lindorm時序引擎的HTTP連接地址,獲取方法請參見查看連接地址。 |
table | 否 | 指定時序數據表。
|
username | 條件必選 | 連接時序引擎的用戶名和密碼。 如已開啟用戶認證與權限校驗,則必須輸入用戶名和密碼。否則無需輸入。 說明 時序引擎默認未開啟用戶認證與權限校驗。為了數據安全,建議您開啟時序引擎的用戶認證與權限校驗。 |
password | 條件必選 | |
defaultDatabase | 否 | 寫入數據的數據庫。默認值為default。 |
schemaPolicy | 否 | Schema約束策略。
說明 更多信息請參見Schema約束。 |
sink.parallelism | 否 | 寫入并發度,當寫入數據量較大時可適當增加并發度,默認值為1 。 |
ignoreErrorData | 否 | 是否忽略寫入錯誤。
|
maxRetries | 否 | 寫入時遇到服務端內部錯誤或者網絡錯誤時最大重試次數,默認值為3。 |
batchSize | 否 | 批處理大小,即每次寫入數據庫的數據量,默認值為500個數據點。 |
connectTimeoutMs | 否 | HTTP連接超時時間,默認值為90000。單位為毫秒(ms)。 |
debug | 否 | 是否開啟debug模式,用來打印詳細數據點日志。
|
sync | 否 | 是否同步寫入,建議使用false。
|
使用示例
以datagen_source隨機數據生成器為例,將生成的數據寫入Lindorm時序表mytable中。示例代碼如下:
CREATE TEMPORARY TABLE datagen_source (
id INTEGER,
score DOUBLE,
name STRING
)
WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE tsdb_sink(
tag_tagk VARCHAR,
field_score DOUBLE,
field_name STRING,
`timestamp` BIGINT
)
WITH (
'connector' = 'lindormtsdb',
'url'='http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
'table'= 'mytable',
'schemaPolicy'='weak'
);
INSERT INTO tsdb_sink
SELECT
CAST(id as STRING) as tag_tagk,
score as field_score,
name as field_name,
UNIX_TIMESTAMP(now()) * 1000 as `timestamp`
FROM datagen_source;