日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Flink寫入時序引擎

Flink可以處理實時數據流,并將處理結果寫入Lindorm時序引擎,以實現實時數據監控等場景。本文介紹如何將Flink上實時的數據處理結果寫入到時序引擎。

前提條件

  • 已開通實時計算Flink版或者已有自建Flink。實時計算Flink版的開通,請參見開通實時計算Flink版

    說明

    實時計算Flink版需要VVR 4.0.13及以上版本,VVR 4.0.13版本是基于Apache Flink 1.13。

  • 為了保證網絡的連通性,確保云原生多模數據庫 Lindorm實例和實時計算Flink使用相同的專有網絡。

    說明

    實時計算Flink版默認不具備訪問公網的能力,如需通過公網將數據寫入時序引擎,請參見Flink全托管集群如何訪問公網

  • 已開通時序引擎。

  • Lindorm時序引擎為3.4.7及以上版本,如何查看或升級時序引擎版本,請參見時序引擎版本說明升級小版本

  • 已將Flink的IP地址添加到Lindorm白名單。如果您使用的是實時計算Flink版,查看IP地址的操作,請參見如何設置白名單。添加Lindorm白名單的操作,請參見設置白名單

背景信息

時序引擎Sink連接器用于連接其他系統與Lindorm時序引擎,負責從各種數據源接收數據并寫入到時序引擎。實時計算Flink版通過Flink SQL定義源表、維表和結果表,通過定義時序引擎Sink連接器的參數,將結果表映射到Lindorm時序表,從而將Flink處理后的結果數據寫入Lindorm時序引擎。使用時序引擎SINK插件,需要先獲取時序引擎SINK插件,再將JAR包上傳至實時計算Flink版控制臺,上傳方法請參見JAR作業開發

語法

在實時計算Flink上創建結果表,并配置時序引擎Sink連接器參數,實現Flink結果表到Lindorm時序表的映射。

CREATE TEMPORARY TABLE tsdb_sink(
  `timestamp` BIGINT,
  tag_<tagname> VARCHAR,
  field_<fieldname1> DOUBLE,
  field_<fieldname2> VARCHAR,
  field_<fieldname3> BIGINT,
  field_<fieldname4> BOOLEAN
  -- table VARCHAR(可選字段)
)
WITH (
    'connector' = 'lindormtsdb',
    'url'='<lindormTSDBHttpUrl>',
    'table'='<yourTableName>',
    'defaultDatabase'='<yourDatabaseName>',
    'schemaPolicy'='<schemaPolicy>',
    'sink.parallelism'='<sinkParallelism>'
    'ignoreErrorData'='<ignoreErrorData>',
    'maxRetries'='<maxRetries>',
    'batchSize'='<batchSize>',
    'connectTimeoutMs'='<connectTimeoutMs>',
    'sync'='<sync>',
    'debug'='<debug>'
);

參數說明

結果表結構參數說明

字段名

數據類型

是否必選

說明

timestamp

BIGINT

字段名必須為timestamp,且字段類型必須為BIGINT。

單位為毫秒(ms)。

說明
  • timestamp為保留字,請用反引號(``)將timestamp括起來。

  • 字段值為13位時間戳,如果是10位時間戳,寫入會自動轉換為13位。

tag_tagname

VARCHAR

指定時序數據的標簽(Tag)。tag_表示前綴,不能省略和修改。tagname表示時序數據標簽名稱。

示例:tag_deviceid。

說明

tag_tagname可以為一列或者多列。

field_fieldname

DOUBLE、VARCHAR、BIGINT、BOOLEAN

指定時序數據量測值(Field)。field_表示前綴,不能省略和修改。fieldname表示時序數據量測值名稱。

示例:field_humidity。

說明

field_fieldname可以為一列或者多列。

table

VARCHAR

指定時序數據表。

  • 如果寫入一張時序表,建議在WITH參數中配置。

  • 如果同時寫入多張時序表,可以在表結構的table字段中配置。

WITH參數說明

參數

是否必選

說明

connector

固定值lindormtsdb,指定時序引擎SINK插件。

url

Lindorm時序引擎的HTTP連接地址,獲取方法請參見查看連接地址

table

指定時序數據表。

  • 如果寫入一張時序表,建議在WITH參數中配置。

  • 如果同時寫入多張時序表,可以在表結構的table字段中配置。

username

條件必選

連接時序引擎的用戶名和密碼。

如已開啟用戶認證與權限校驗,則必須輸入用戶名和密碼。否則無需輸入。

說明

時序引擎默認未開啟用戶認證與權限校驗。為了數據安全,建議您開啟時序引擎的用戶認證與權限校驗。

password

條件必選

defaultDatabase

寫入數據的數據庫。默認值為default。

schemaPolicy

Schema約束策略。

  • Strong:強約束,默認值。時序引擎會嚴格依據預先定義的表結構對寫入數據的表名、字段名、類型進行校驗。選擇Strong,需提前手動建表,否則數據寫入會失敗。

  • Weak:弱約束。寫入數據的表不存在時,不會報錯,而是會自動創建對應的表。

  • None:無約束。寫入數據的表不存在時,不會報錯,也不會自動建表。如果不手動建表,不影響數據寫入,但無法直接通過SQL查詢寫入的數據。

說明

更多信息請參見Schema約束

sink.parallelism

寫入并發度,當寫入數據量較大時可適當增加并發度,默認值為1 。

ignoreErrorData

是否忽略寫入錯誤。

  • false:不忽略,默認值。如果遇到錯誤,就跳出寫入。

  • true:忽略。如果遇到錯誤就忽略錯誤,繼續寫入。

maxRetries

寫入時遇到服務端內部錯誤或者網絡錯誤時最大重試次數,默認值為3。

batchSize

批處理大小,即每次寫入數據庫的數據量,默認值為500個數據點。

connectTimeoutMs

HTTP連接超時時間,默認值為90000。單位為毫秒(ms)。

debug

是否開啟debug模式,用來打印詳細數據點日志。

  • false:不開啟,默認值。

  • true:開啟。

sync

是否同步寫入,建議使用false。

  • false:異步寫入,默認值,寫入效率高。

  • true:同步寫入。

使用示例

以datagen_source隨機數據生成器為例,將生成的數據寫入Lindorm時序表mytable中。示例代碼如下:

CREATE TEMPORARY TABLE datagen_source (
  id INTEGER,
  score DOUBLE,
  name STRING
)
WITH (
  'connector' = 'datagen'
);

CREATE TEMPORARY TABLE tsdb_sink(
  tag_tagk VARCHAR,
  field_score DOUBLE,
  field_name STRING,
  `timestamp` BIGINT
)
WITH (
    'connector' = 'lindormtsdb',
    'url'='http://ld-bp159jt4eivt3****-proxy-tsdb.lindorm.rds.aliyuncs.com:8242',
    'table'= 'mytable',
    'schemaPolicy'='weak'
);

INSERT INTO tsdb_sink
SELECT
  CAST(id as STRING) as tag_tagk,
  score as field_score,
  name as field_name,
  UNIX_TIMESTAMP(now()) * 1000  as `timestamp`
FROM datagen_source;