本文為您介紹數據總線DataHub連接器語法結構、WITH參數和使用示例等。
背景信息
阿里云流數據處理平臺DataHub是流式數據(Streaming Data)的處理平臺,提供對流式數據的發布(Publish)、訂閱(Subscribe)和分發功能,讓您可以輕松構建基于流式數據的分析和應用,詳情請參見產品概述。
DataHub兼容Kafka協議,因此您可以使用Kafka連接器(不包括Upsert Kafka)來訪問DataHub,詳情請參見兼容Kafka。
DataHub連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 結果表和源表 |
運行模式 | 流模式和批模式 |
數據格式 | 暫不適用 |
特有監控指標 | 暫無 |
API種類 | Datastream和SQL |
是否支持更新或刪除目標Topic數據 | 不支持更新和刪除目標Topic數據,只支持插入數據。 |
語法結構
CREATE TEMPORARY TABLE datahub_input (
`time` BIGINT,
`sequence` STRING METADATA VIRTUAL,
`shard-id` BIGINT METADATA VIRTUAL,
`system-time` TIMESTAMP METADATA VIRTUAL
) WITH (
'connector' = 'datahub',
'subId' = '<yourSubId>',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'topic' = '<yourTopicName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);
WITH參數
通用
參數
說明
數據類型
是否必填
默認值
備注
connector
表類型。
String
是
無
固定值為datahub。
endPoint
消費端點信息。
String
是
無
不同地域DataHub有不同的EndPoint,詳情請參見域名列表。
project
項目。
String
是
無
創建project詳情請參見快速入門。
topic
主題。
String
是
無
創建topic詳情請參見快速入門。
說明如果您填寫的topic是blob類型(一種無類型的非結構化數據的存儲方式),則在Flink消費時,表定義中必須有且只有一個VARBINARY類型的字段。
accessId
阿里云賬號的AccessKey ID。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理。
accessKey
阿里云賬號的AccessKey Secret。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理。
retryTimeout
最大持續重試時間。
Integer
否
1800000
單位毫秒,通常不作修改。
retryInterval
重試間隔。
Integer
否
1000
單位毫秒,通常不作修改。
enableSchemaRegistry
是否打開Schema注冊。
Boolean
否
false
您需要設置為true。
CompressType
讀寫的壓縮策略。
String
否
lz4
lz4 (默認值):使用lz4壓縮。
deflate:使用deflate壓縮。
""(空字符串):表示關閉數據壓縮。
說明僅VVR 6.0.5及以上版本支持指定CompressType參數。
源表獨有
參數
說明
數據類型
是否必填
默認值
備注
subId
訂閱ID。
String
是
無
如何創建DataHub訂閱,詳情請參見創建訂閱。
maxFetchSize
單次讀取條數。
Integer
否
50
影響讀性能的參數,調大可以增加吞吐。
maxBufferSize
異步讀取的最大緩存數據條數。
Integer
否
50
影響讀性能的參數,調大可以增加吞吐。
fetchLatestDelay
數據源沒有數據時,sleep的時間。
Integer
否
500
單位毫秒。在數據源頻繁沒有數據的情況下,影響吞吐,建議調小。
lengthCheck
單行字段條數檢查策略。
String
否
NONE
NONE(默認值):
解析出的字段數大于定義字段數時,按從左到右的順序,取定義字段數量的數據。
解析出的字段數小于定義字段數時,跳過該行數據。
SKIP:解析出的字段數和定義字段數不同時跳過該行數據。
EXCEPTION:解析出的字段數和定義字段數不同時提示異常。
PAD:按從左到右順序填充。
解析出的字段數大于定義字段數時,按從左到右的順序,取定義字段數量的數據。
解析出的字段數小于定義字段數時,按從左到右的順序,在行尾用Null填充缺少的字段。
columnErrorDebug
是否打開調試開關。
Boolean
否
false
false(默認值):關閉調試功能。
true:打開調試開關,打印解析異常的日志。
startTime
消費日志的開始時間。
String
否
當前時間
格式為yyyy-MM-dd hh:mm:ss。
endTime
消費日志的結束時間。
String
否
無
格式為yyyy-MM-dd hh:mm:ss。
結果表獨有
參數
說明
數據類型
是否必填
默認值
備注
batchCount
每次批量寫入數據的數量。
Integer
否
500
影響寫性能,調大可以增加吞吐,但是會增大延遲。
batchSize
每次批量寫入數據的大小。
Integer
否
512000
單位Byte,影響寫性能,調大可以增加吞吐,但是會增大延遲。
flushInterval
攢批寫入數據的時間。
Integer
否
5000
單位毫秒,影響寫性能,調大可以增加吞吐,但是增大延遲。
hashFields
指定列名后,相同列的值會寫入到同一個Shard。
String
否
null,即隨機寫
可以指定多個列值,用逗號(,)分割,例如
hashFields=a,b
。timeZone
數據的時區。
String
否
無
影響TimeStamp等帶時區數據的轉換。
schemaVersion
向注冊的Schema里寫入的version。
Integer
否
-1
您需要指定該參數。
類型映射
Flink字段類型 | DataHub字段類型 |
TINYINT | TINYINT |
BOOLEAN | BOOLEAN |
INTEGER | INTEGER |
BIGINT | BIGINT |
BIGINT | TIMESTAMP |
TIMESTAMP | |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL | DECIMAL |
VARCHAR | STRING |
SMALLINT | SMALLINT |
VARBINARY | BLOB |
屬性字段
字段名 | 字段類型 | 說明 |
shard-id | BIGINT METADATA VIRTUAL | Shard的ID。 |
sequence | STRING METADATA VIRTUAL | 數據順序。 |
system-time | TIMESTAMP METADATA VIRTUAL | 系統時間。 |
僅在VVR 3.0.1及以上版本支持獲取以上DataHub屬性字段。
使用示例
源表
CREATE TEMPORARY TABLE datahub_input ( `time` BIGINT, `sequence` STRING METADATA VIRTUAL, `shard-id` BIGINT METADATA VIRTUAL, `system-time` TIMESTAMP METADATA VIRTUAL ) WITH ( 'connector' = 'datahub', 'subId' = '<yourSubId>', 'endPoint' = '<yourEndPoint>', 'project' = '<yourProjectName>', 'topic' = '<yourTopicName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}' ); CREATE TEMPORARY TABLE test_out ( `time` BIGINT, `sequence` STRING, `shard-id` BIGINT, `system-time` TIMESTAMP ) WITH ( 'connector' = 'print', 'logger' = 'true' ); INSERT INTO test_out SELECT `time`, `sequence` , `shard-id`, `system-time` FROM datahub_input;
結果表
CREATE TEMPORARY table datahub_source( name VARCHAR ) WITH ( 'connector'='datahub', 'endPoint'='<endPoint>', 'project'='<yourProjectName>', 'topic'='<yourTopicName>', 'subId'='<yourSubId>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'startTime'='2018-06-01 00:00:00' ); CREATE TEMPORARY table datahub_sink( name varchar ) WITH ( 'connector'='datahub', 'endPoint'='<endPoint>', 'project'='<yourProjectName>', 'topic'='<yourTopicName>', 'accessId'='${secret_values.ak_id}', 'accessKey'='${secret_values.ak_secret}', 'batchSize'='512000', 'batchCount'='500' ); INSERT INTO datahub_sink SELECT LOWER(name) from datahub_source;
Datastream API
通過DataStream的方式讀寫數據時,則需要使用對應的DataStream連接器連接Flink全托管,DataStream連接器設置方法請參見DataStream連接器使用方法。
DataHub源表
VVR提供了SourceFunction的實現類DatahubSourceFunction來讀取DataHub表數據。以下為讀取DataHub表數據的示例。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//DataHub連接配置。
DatahubSourceFunction datahubSource =
new DatahubSourceFunction(
<yourEndPoint>,
<yourProjectName>,
<yourTopicName>,
<yourSubId>,
<yourAccessId>,
<yourAccessKey>,
"public",
<enableSchemaRegistry>, // 是否開啟schemaRegistry,一般填false即可。
<yourStartTime>,
<yourEndTime>
);
datahubSource.setRequestTimeout(30 * 1000);
datahubSource.enableExitAfterReadFinished();
env.addSource(datahubSource)
.map((MapFunction<RecordEntry, Tuple2<String, Long>>) this::getStringLongTuple2)
.print();
env.execute();
private Tuple2<String, Long> getStringLongTuple2(RecordEntry recordEntry) {
Tuple2<String, Long> tuple2 = new Tuple2<>();
TupleRecordData recordData = (TupleRecordData) (recordEntry.getRecordData());
tuple2.f0 = (String) recordData.getField(0);
tuple2.f1 = (Long) recordData.getField(1);
return tuple2;
}
DataHub結果表
VVR提供了OutputFormatSinkFunction的實現類DatahubSinkFunction將數據寫入DataHub。以下為將數據寫入DataHub的示例。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//DataHub連接配置。
env.generateSequence(0, 100)
.map((MapFunction<Long, RecordEntry>) aLong -> getRecordEntry(aLong, "default:"))
.addSink(
new DatahubSinkFunction<>(
<yourEndPoint>,
<yourProjectName>,
<yourTopicName>,
<yourSubId>,
<yourAccessId>,
<yourAccessKey>,
"public",
<enableSchemaRegistry>, // 是否開啟schemaRegistry,一般填false即可。
<schemaVersion> // 如果開啟了schemaRegistry,寫入的時候需要指定schemaVersion,其他情況填0即可。
);
env.execute();
private RecordEntry getRecordEntry(Long message, String s) {
RecordSchema recordSchema = new RecordSchema();
recordSchema.addField(new Field("f1", FieldType.STRING));
recordSchema.addField(new Field("f2", FieldType.BIGINT));
recordSchema.addField(new Field("f3", FieldType.DOUBLE));
recordSchema.addField(new Field("f4", FieldType.BOOLEAN));
recordSchema.addField(new Field("f5", FieldType.TIMESTAMP));
recordSchema.addField(new Field("f6", FieldType.DECIMAL));
RecordEntry recordEntry = new RecordEntry();
TupleRecordData recordData = new TupleRecordData(recordSchema);
recordData.setField(0, s + message);
recordData.setField(1, message);
recordEntry.setRecordData(recordData);
return recordEntry;
}
XML
Maven中央庫中已經放置了DataHub DataStream連接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-datahub</artifactId>
<version>${vvr-version}</version>
</dependency>