時序數(shù)據(jù)庫InfluxDB
本文為您介紹如何使用時序數(shù)據(jù)庫InfluxDB連接器。
背景信息
時序數(shù)據(jù)庫InfluxDB?版是一款專門處理高寫入和查詢負(fù)載的時序數(shù)據(jù)庫,用于存儲大規(guī)模的時序數(shù)據(jù)并進(jìn)行實時分析,包括來自DevOps監(jiān)控、應(yīng)用指標(biāo)和IoT傳感器上的數(shù)據(jù)。時序數(shù)據(jù)庫InfluxDB?版詳情請參見InfluxDB??介紹。
InfluxDB連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 結(jié)果表 |
運行模式 | 流模式 |
數(shù)據(jù)格式 | Point |
特有監(jiān)控指標(biāo) |
說明 指標(biāo)含義詳情,請參見監(jiān)控指標(biāo)說明。 |
API種類 | SQL |
是否支持更新或刪除結(jié)果表數(shù)據(jù) | 否 |
前提條件
已創(chuàng)建InfluxDB的數(shù)據(jù)庫,詳情請參見管理用戶賬號和數(shù)據(jù)庫。
使用限制
僅Flink計算引擎VVR 2.1.5及以上版本支持InfluxDB Connector。
語法結(jié)構(gòu)
CREATE TABLE stream_test_influxdb(
`metric` VARCHAR,
`timestamp` BIGINT,
`tag_value1` VARCHAR,
`field_fieldValue1` DOUBLE
) WITH (
'connector' = 'influxdb',
'url' = 'http://service.cn.influxdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' ='300',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);
建表默認(rèn)格式:
第0列:metric(VARCHAR),必填。
第1列:timestamp(BIGINT),必填,單位為毫秒。
第2列:tag_value1(VARCHAR),必填,最少填寫一個。
第3列:field_fieldValue1(DOUBLE),必填,最少填寫一個。
寫入多個field_fieldValue時,您需要按照如下格式填寫。
field_fieldValue1 類型, field_fieldValue2 類型, ... field_fieldValueN 類型
示例如下。
field_fieldValue1 DOUBLE, field_fieldValue2 INTEGER, ... field_fieldValueNINTEGER
結(jié)果表中只支持metric、timestamp、tag_*和field_*,不能出現(xiàn)其他的字段。
WITH參數(shù)
參數(shù) | 說明 | 是否必填 | 備注 |
connector | 結(jié)果表類型。 | 是 | 固定值為influxdb。 |
url | InfluxDB的服務(wù)地址。 | 是 | 在InfluxDB中,URL為VPC網(wǎng)絡(luò)地址,例如:https://localhost:8086或http://localhost:3242。 URL支持HTTP和HTTPS。 |
database | InfluxDB的數(shù)據(jù)庫名稱。 | 是 | 例如db-flink。 |
username | 數(shù)據(jù)庫的用戶名。 | 是 | 需要對目標(biāo)數(shù)據(jù)庫有寫權(quán)限。用戶名詳情請參見管理用戶賬號和數(shù)據(jù)庫。 |
password | 數(shù)據(jù)庫的密碼。 | 是 | 密碼詳情請參見管理用戶賬號和數(shù)據(jù)庫。 |
batchSize | 批量提交的記錄條數(shù)。 | 否 | 默認(rèn)每次批量提交300條記錄。 |
retentionPolicy | 保留策略。 | 否 | 如果您不配置該參數(shù)時,該參數(shù)會被默認(rèn)填寫為每個數(shù)據(jù)庫的默認(rèn)保留策略autogen,保留策略詳情請參見管理用戶賬號和數(shù)據(jù)庫。 |
ignoreErrorData | 是否忽略異常數(shù)據(jù)。 | 否 | 參數(shù)取值如下:
|
類型映射
InfluxDB字段類型 | Flink版字段類型 |
BOOLEAN | BOOLEAN |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DECIMAL | DECIMAL |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
VARCHAR | VARCHAR |
使用示例
CREATE TEMPORARY TABLE datahub_source(
`metric` VARCHAR,
`timestamp` BIGINT,
`filedvalue` DOUBLE,
`tagvalue` VARCHAR
) WITH (
'connector' = 'datagen',
'fields.metric.length' = '3',
'fields.tagvalue.length' = '3',
'fields.timestamp.min' = '1587539547000',
'fields.timestamp.max' = '1619075547000',
'fields.filedvalue.min' = '1',
'fields.filedvalue.max' = '100000',
'rows-per-second' = '50'
);
CREATE TEMPORARY TABLE influxdb_sink(
`metric` VARCHAR,
`timestamp` BIGINT,
`field_fieldValue1` DOUBLE,
`tag_value1` VARCHAR
) WITH (
'connector' = 'influxdb',
'url' = 'https://***********.influxdata.tsdb.aliyuncs.com:****',
'database' = '<yourDatabaseName>',
'username' = '<yourDatabaseUserName>',
'password' = '<yourDatabasePassword>',
'batchSize' ='100',
'retentionPolicy' = 'autogen',
'ignoreErrorData' = 'false'
);
INSERT INTO influxdb_sink
SELECT
`metric`,
`timestamp`,
`filedvalue`,
`tagvalue`
FROM datahub_source;