日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

數據總線DataHub

本文為您介紹數據總線DataHub連接器語法結構、WITH參數和使用示例等。

背景信息

阿里云流數據處理平臺DataHub是流式數據(Streaming Data)的處理平臺,提供對流式數據的發布(Publish)、訂閱(Subscribe)和分發功能,讓您可以輕松構建基于流式數據的分析和應用,詳情請參見產品概述

說明

DataHub兼容Kafka協議,因此您可以使用Kafka連接器(不包括Upsert Kafka)來訪問DataHub,詳情請參見兼容Kafka

DataHub連接器支持的信息如下。

類別

詳情

支持類型

結果表和源表

運行模式

流模式和批模式

數據格式

暫不適用

特有監控指標

暫無

API種類

Datastream和SQL

是否支持更新或刪除目標Topic數據

不支持更新和刪除目標Topic數據,只支持插入數據。

語法結構

CREATE TEMPORARY TABLE datahub_input (
  `time` BIGINT,
  `sequence`  STRING METADATA VIRTUAL,
  `shard-id` BIGINT METADATA VIRTUAL,
  `system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
  'connector' = 'datahub',
  'subId' = '<yourSubId>',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'topic' = '<yourTopicName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH參數

  • 通用

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    connector

    表類型。

    String

    固定值為datahub。

    endPoint

    消費端點信息。

    String

    不同地域DataHub有不同的EndPoint,詳情請參見域名列表

    project

    項目。

    String

    創建project詳情請參見快速入門

    topic

    主題。

    String

    創建topic詳情請參見快速入門

    說明

    如果您填寫的topic是blob類型(一種無類型的非結構化數據的存儲方式),則在Flink消費時,表定義中必須有且只有一個VARBINARY類型的字段。

    accessId

    阿里云賬號的AccessKey ID。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理

    accessKey

    阿里云賬號的AccessKey Secret。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理

    retryTimeout

    最大持續重試時間。

    Integer

    1800000

    單位毫秒,通常不作修改。

    retryInterval

    重試間隔。

    Integer

    1000

    單位毫秒,通常不作修改。

    enableSchemaRegistry

    是否打開Schema注冊。

    Boolean

    false

    您需要設置為true。

    CompressType

    讀寫的壓縮策略。

    String

    lz4

    • lz4 (默認值):使用lz4壓縮。

    • deflate:使用deflate壓縮。

    • ""(空字符串):表示關閉數據壓縮。

    說明

    僅VVR 6.0.5及以上版本支持指定CompressType參數。

  • 源表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    subId

    訂閱ID。

    String

    如何創建DataHub訂閱,詳情請參見創建訂閱

    maxFetchSize

    單次讀取條數。

    Integer

    50

    影響讀性能的參數,調大可以增加吞吐。

    maxBufferSize

    異步讀取的最大緩存數據條數。

    Integer

    50

    影響讀性能的參數,調大可以增加吞吐。

    fetchLatestDelay

    數據源沒有數據時,sleep的時間。

    Integer

    500

    單位毫秒。在數據源頻繁沒有數據的情況下,影響吞吐,建議調小。

    lengthCheck

    單行字段條數檢查策略。

    String

    NONE

    • NONE(默認值):

      • 解析出的字段數大于定義字段數時,按從左到右的順序,取定義字段數量的數據。

      • 解析出的字段數小于定義字段數時,跳過該行數據。

    • SKIP:解析出的字段數和定義字段數不同時跳過該行數據。

    • EXCEPTION:解析出的字段數和定義字段數不同時提示異常。

    • PAD:按從左到右順序填充。

      • 解析出的字段數大于定義字段數時,按從左到右的順序,取定義字段數量的數據。

      • 解析出的字段數小于定義字段數時,按從左到右的順序,在行尾用Null填充缺少的字段。

    columnErrorDebug

    是否打開調試開關。

    Boolean

    false

    • false(默認值):關閉調試功能。

    • true:打開調試開關,打印解析異常的日志。

    startTime

    消費日志的開始時間。

    String

    當前時間

    格式為yyyy-MM-dd hh:mm:ss。

    endTime

    消費日志的結束時間。

    String

    格式為yyyy-MM-dd hh:mm:ss。

  • 結果表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    batchCount

    每次批量寫入數據的數量。

    Integer

    500

    影響寫性能,調大可以增加吞吐,但是會增大延遲。

    batchSize

    每次批量寫入數據的大小。

    Integer

    512000

    單位Byte,影響寫性能,調大可以增加吞吐,但是會增大延遲。

    flushInterval

    攢批寫入數據的時間。

    Integer

    5000

    單位毫秒,影響寫性能,調大可以增加吞吐,但是增大延遲。

    hashFields

    指定列名后,相同列的值會寫入到同一個Shard。

    String

    null,即隨機寫

    可以指定多個列值,用逗號(,)分割,例如hashFields=a,b

    timeZone

    數據的時區。

    String

    影響TimeStamp等帶時區數據的轉換。

    schemaVersion

    向注冊的Schema里寫入的version。

    Integer

    -1

    您需要指定該參數。

類型映射

Flink字段類型

DataHub字段類型

TINYINT

TINYINT

BOOLEAN

BOOLEAN

INTEGER

INTEGER

BIGINT

BIGINT

BIGINT

TIMESTAMP

TIMESTAMP

FLOAT

FLOAT

DOUBLE

DOUBLE

DECIMAL

DECIMAL

VARCHAR

STRING

SMALLINT

SMALLINT

VARBINARY

BLOB

屬性字段

字段名

字段類型

說明

shard-id

BIGINT METADATA VIRTUAL

Shard的ID。

sequence

STRING METADATA VIRTUAL

數據順序。

system-time

TIMESTAMP METADATA VIRTUAL

系統時間。

說明

僅在VVR 3.0.1及以上版本支持獲取以上DataHub屬性字段。

使用示例

  • 源表

    CREATE TEMPORARY TABLE datahub_input (
      `time` BIGINT,
      `sequence`  STRING METADATA VIRTUAL,
      `shard-id` BIGINT METADATA VIRTUAL,
      `system-time` TIMESTAMP METADATA VIRTUAL
    ) WITH (
      'connector' = 'datahub',
      'subId' = '<yourSubId>',
      'endPoint' = '<yourEndPoint>',
      'project' = '<yourProjectName>',
      'topic' = '<yourTopicName>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}'
    );
    
    CREATE TEMPORARY TABLE test_out (
      `time` BIGINT,
      `sequence`  STRING,
      `shard-id` BIGINT,
      `system-time` TIMESTAMP
    ) WITH (
      'connector' = 'print',
      'logger' = 'true'
    );
    
    INSERT INTO test_out
    SELECT
      `time`,
      `sequence` ,
      `shard-id`,
      `system-time`
    FROM datahub_input;
  • 結果表

    CREATE TEMPORARY table datahub_source(
      name VARCHAR
    ) WITH (
      'connector'='datahub',
      'endPoint'='<endPoint>',
      'project'='<yourProjectName>',
      'topic'='<yourTopicName>',
      'subId'='<yourSubId>',
      'accessId'='${secret_values.ak_id}',
      'accessKey'='${secret_values.ak_secret}',
      'startTime'='2018-06-01 00:00:00'
    );
    
    CREATE TEMPORARY table datahub_sink(
      name varchar
    ) WITH (
      'connector'='datahub',
      'endPoint'='<endPoint>',
      'project'='<yourProjectName>',
      'topic'='<yourTopicName>',
      'accessId'='${secret_values.ak_id}',
      'accessKey'='${secret_values.ak_secret}',
      'batchSize'='512000',
      'batchCount'='500'
    );
    
    INSERT INTO datahub_sink
    SELECT
      LOWER(name)
    from datahub_source;

Datastream API

重要

通過DataStream的方式讀寫數據時,則需要使用對應的DataStream連接器連接Flink全托管,DataStream連接器設置方法請參見DataStream連接器使用方法

DataHub源表

VVR提供了SourceFunction的實現類DatahubSourceFunction來讀取DataHub表數據。以下為讀取DataHub表數據的示例。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//DataHub連接配置。
DatahubSourceFunction datahubSource =
    new DatahubSourceFunction(
    <yourEndPoint>,
    <yourProjectName>,
    <yourTopicName>,
    <yourSubId>,
    <yourAccessId>,
    <yourAccessKey>,
    "public",
    <enableSchemaRegistry>, // 是否開啟schemaRegistry,一般填false即可。
    <yourStartTime>,
    <yourEndTime>
    );
datahubSource.setRequestTimeout(30 * 1000);
datahubSource.enableExitAfterReadFinished();
env.addSource(datahubSource)
    .map((MapFunction<RecordEntry, Tuple2<String, Long>>) this::getStringLongTuple2)
    .print();
env.execute();
private Tuple2<String, Long> getStringLongTuple2(RecordEntry recordEntry) {
    Tuple2<String, Long> tuple2 = new Tuple2<>();
    TupleRecordData recordData = (TupleRecordData) (recordEntry.getRecordData());
    tuple2.f0 = (String) recordData.getField(0);
    tuple2.f1 = (Long) recordData.getField(1);
    return tuple2;
}

DataHub結果表

VVR提供了OutputFormatSinkFunction的實現類DatahubSinkFunction將數據寫入DataHub。以下為將數據寫入DataHub的示例。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//DataHub連接配置。
env.generateSequence(0, 100)
    .map((MapFunction<Long, RecordEntry>) aLong -> getRecordEntry(aLong, "default:"))
    .addSink(
    new DatahubSinkFunction<>(
       <yourEndPoint>,
       <yourProjectName>,
       <yourTopicName>,
       <yourSubId>,
       <yourAccessId>,
       <yourAccessKey>,
       "public",
       <enableSchemaRegistry>, // 是否開啟schemaRegistry,一般填false即可。
       <schemaVersion> // 如果開啟了schemaRegistry,寫入的時候需要指定schemaVersion,其他情況填0即可。
       );
env.execute();
private RecordEntry getRecordEntry(Long message, String s) {
    RecordSchema recordSchema = new RecordSchema();
    recordSchema.addField(new Field("f1", FieldType.STRING));
    recordSchema.addField(new Field("f2", FieldType.BIGINT));
    recordSchema.addField(new Field("f3", FieldType.DOUBLE));
    recordSchema.addField(new Field("f4", FieldType.BOOLEAN));
    recordSchema.addField(new Field("f5", FieldType.TIMESTAMP));
    recordSchema.addField(new Field("f6", FieldType.DECIMAL));
    RecordEntry recordEntry = new RecordEntry();
    TupleRecordData recordData = new TupleRecordData(recordSchema);
    recordData.setField(0, s + message);
    recordData.setField(1, message);
    recordEntry.setRecordData(recordData);
    return recordEntry;
}

XML

Maven中央庫中已經放置了DataHub DataStream連接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-datahub</artifactId>
    <version>${vvr-version}</version>
</dependency>

常見問題