表格存儲(chǔ)Tablestore(OTS)
本文為您介紹如何使用表格存儲(chǔ)Tablestore(OTS)連接器。
背景信息
表格存儲(chǔ)Tablestore(又名OTS)面向海量結(jié)構(gòu)化數(shù)據(jù)提供Serverless表存儲(chǔ)服務(wù),同時(shí)針對(duì)物聯(lián)網(wǎng)場(chǎng)景深度優(yōu)化提供一站式的IoTstore解決方案。適用于海量賬單、IM消息、物聯(lián)網(wǎng)、車聯(lián)網(wǎng)、風(fēng)控和推薦等場(chǎng)景中的結(jié)構(gòu)化數(shù)據(jù)存儲(chǔ),提供海量數(shù)據(jù)低成本存儲(chǔ)、毫秒級(jí)的在線數(shù)據(jù)查詢和檢索以及靈活的數(shù)據(jù)分析能力。詳情請(qǐng)參見表格存儲(chǔ)Tablestore。
Tablestore連接器支持的信息如下。
類別 | 詳情 |
運(yùn)行模式 | 流模式 |
API種類 | SQL |
支持類型 | 源表、維表和結(jié)果表 |
數(shù)據(jù)格式 | 暫不支持 |
特有監(jiān)控指標(biāo) |
說(shuō)明 指標(biāo)含義詳情,請(qǐng)參見監(jiān)控指標(biāo)說(shuō)明。 |
是否支持更新或刪除結(jié)果表數(shù)據(jù) | 是 |
前提條件
已購(gòu)買Tablestore實(shí)例并創(chuàng)建表,詳情請(qǐng)參見使用流程。
使用限制
僅實(shí)時(shí)計(jì)算引擎VVR 3.0.0及以上版本支持表格存儲(chǔ)Tablestore連接器。
語(yǔ)法結(jié)構(gòu)
結(jié)果表
CREATE TABLE ots_sink ( name VARCHAR, age BIGINT, birthday BIGINT, primary key(name,age) not enforced ) WITH ( 'connector'='ots', 'instanceName'='<yourInstanceName>', 'tableName'='<yourTableName>', 'accessId'='${ak_id}', 'accessKey'='${ak_secret}', 'endPoint'='<yourEndpoint>', 'valueColumns'='birthday' );
說(shuō)明Tablestore結(jié)果表必須定義有Primary Key,輸出數(shù)據(jù)以Update方式追加Tablestore表。
維表
CREATE TABLE ots_dim ( id int, len int, content STRING ) WITH ( 'connector'='ots', 'endPoint'='<yourEndpoint>', 'instanceName'='<yourInstanceName>', 'tableName'='<yourTableName>', 'accessId'='${ak_id}', 'accessKey'='${ak_secret}' );
源表
CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR ) WITH ( 'connector'='ots', 'endPoint' ='<yourEndpoint>', 'instanceName' = 'flink-source', 'tableName' ='flink_source_table', 'tunnelName' = 'flinksourcestream', 'accessId' ='${ak_id}', 'accessKey' ='${ak_secret}', 'ignoreDelete' = 'false' );
屬性列支持讀取待消費(fèi)字段和Tunnel Service,以及返回?cái)?shù)據(jù)中的
OtsRecordType
和OtsRecordTimestamp
兩個(gè)字段。字段說(shuō)明請(qǐng)參見下表。字段名
Flink映射名
描述
OtsRecordType
type
數(shù)據(jù)操作類型。
OtsRecordTimestamp
timestamp
數(shù)據(jù)操作時(shí)間,單位為微秒。
說(shuō)明全量讀取數(shù)據(jù)時(shí),OtsRecordTimestamp取值為0。
當(dāng)需要讀取
OtsRecordType
和OtsRecordTimestamp
字段時(shí),F(xiàn)link提供了METADATA關(guān)鍵字用于獲取源表中的屬性字段,具體DDL示例如下。CREATE TABLE tablestore_stream( `order` VARCHAR, orderid VARCHAR, customerid VARCHAR, customername VARCHAR, record_type STRING METADATA FROM 'type', record_timestamp BIGINT METADATA FROM 'timestamp' ) WITH ( ... );
WITH參數(shù)
通用
參數(shù)
說(shuō)明
數(shù)據(jù)類型
是否必填
默認(rèn)值
備注
connector
表類型。
String
是
無(wú)
固定值為
ots
。instanceName
實(shí)例名。
String
是
無(wú)
無(wú)。
endPoint
實(shí)例訪問(wèn)地址。
String
是
無(wú)
請(qǐng)參見服務(wù)地址。
tableName
表名。
String
是
無(wú)
無(wú)。
accessId
阿里云賬號(hào)或者RAM用戶的AccessKey ID。
String
是
無(wú)
詳情請(qǐng)參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過(guò)密鑰管理的方式填寫AccessKey ID取值,詳情請(qǐng)參見變量和密鑰管理。
accessKey
阿里云賬號(hào)或者RAM用戶的AccessKey Secret。
String
是
無(wú)
詳情請(qǐng)參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過(guò)密鑰管理的方式填寫AccessKey Secret取值,詳情請(qǐng)參見變量和密鑰管理。
connectTimeout
連接器連接Tablestore的超時(shí)時(shí)間。
Integer
否
30000
單位為毫秒。
socketTimeout
連接器連接Tablestore的Socket超時(shí)時(shí)間。
Integer
否
30000
單位為毫秒。
ioThreadCount
IO線程數(shù)量。
Integer
否
4
無(wú)。
callbackThreadPoolSize
回調(diào)線程池大小。
Integer
否
4
無(wú)。
源表獨(dú)有
參數(shù)
說(shuō)明
數(shù)據(jù)類型
是否必填
默認(rèn)值
備注
tunnelName
表格存儲(chǔ)數(shù)據(jù)表的數(shù)據(jù)通道名稱。
String
是
無(wú)
您需要提前在表格存儲(chǔ)產(chǎn)品側(cè)創(chuàng)建好通道名稱和對(duì)應(yīng)的通道類型(增量、全量和全量加增量)。關(guān)于創(chuàng)建通道的具體操作,請(qǐng)參見創(chuàng)建數(shù)據(jù)通道。
ignoreDelete
是否忽略DELETE操作類型的實(shí)時(shí)數(shù)據(jù)。
Boolean
否
false
參數(shù)取值如下:
true:忽略。
false(默認(rèn)值):不忽略。
skipInvalidData
是否忽略臟數(shù)據(jù)。如果不忽略臟數(shù)據(jù),則處理臟數(shù)據(jù)時(shí)會(huì)進(jìn)行報(bào)錯(cuò)。
Boolean
否
false
參數(shù)取值如下:
true:忽略臟數(shù)據(jù)。
false(默認(rèn)值):不忽略臟數(shù)據(jù)。
說(shuō)明僅實(shí)時(shí)計(jì)算引擎VVR 8.0.4及以上版本支持該參數(shù)。
retryStrategy
重試策略。
Enum
否
TIME
參數(shù)取值如下:
TIME:在超時(shí)時(shí)間retryTimeoutMs內(nèi)持續(xù)進(jìn)行重試。
COUNT:在最大重試次數(shù)retryCount內(nèi)持續(xù)進(jìn)行重試。
retryCount
重試次數(shù)。
Integer
否
3
當(dāng)retryStrategy設(shè)置為COUNT時(shí),可以設(shè)置重試次數(shù)。
retryTimeoutMs
重試的超時(shí)時(shí)間。
Integer
否
180000
當(dāng)retryStrategy設(shè)置為TIME時(shí),可以設(shè)置重試的超時(shí)時(shí)間,單位為毫秒。
streamOriginColumnMapping
原始列名到真實(shí)列名的映射。
String
否
無(wú)
原始列名與真實(shí)列名之間,請(qǐng)使用半角冒號(hào)(:)隔開;多組映射之間,請(qǐng)使用半角逗號(hào)(,)隔開。例如
origin_col1:col1,origin_col2:col2
。outputSpecificRowType
是否透?jìng)骶唧w的RowType。
Boolean
否
false
參數(shù)取值如下:
false:不透?jìng)鳎袛?shù)據(jù)RowType均為INSERT。
true:透?jìng)鳎瑢⒏鶕?jù)透?jìng)鞯念愋拖鄳?yīng)設(shè)置為INSERT、DELETE或UPDATE_AFTER。
結(jié)果表獨(dú)有
參數(shù)
說(shuō)明
數(shù)據(jù)類型
是否必填
默認(rèn)值
備注
retryIntervalMs
重試間隔時(shí)間。
Integer
否
1000
單位為毫秒。
maxRetryTimes
最大重試次數(shù)。
Integer
否
10
無(wú)。
valueColumns
插入字段的列名。
String
是
無(wú)
多個(gè)字段以半角逗號(hào)(,)分割,例如ID或NAME。
bufferSize
流入多少條數(shù)據(jù)后開始輸出。
Integer
否
5000
無(wú)。
batchWriteTimeoutMs
寫入超時(shí)的時(shí)間。
Integer
否
5000
單位為毫秒。表示如果緩存中的數(shù)據(jù)在等待batchWriteTimeoutMs秒后,依然沒(méi)有達(dá)到輸出條件,系統(tǒng)會(huì)自動(dòng)輸出緩存中的所有數(shù)據(jù)。
batchSize
一次批量寫入的條數(shù)。
Integer
否
100
最大值為200。
ignoreDelete
是否忽略DELETE操作。
Boolean
否
False
無(wú)。
autoIncrementKey
當(dāng)結(jié)果表中包含主鍵自增列時(shí),通過(guò)該參數(shù)指定主鍵自增列的列名稱。
String
否
無(wú)
當(dāng)結(jié)果表沒(méi)有主鍵自增列時(shí),請(qǐng)不要設(shè)置此參數(shù)。
說(shuō)明僅實(shí)時(shí)計(jì)算引擎VVR 8.0.4及以上版本支持該參數(shù)。
overwriteMode
數(shù)據(jù)覆蓋模式。
Enum
否
PUT
參數(shù)取值如下:
PUT:以PUT方式將數(shù)據(jù)寫入到Tablestore表。
UPDATE:以UPDATE方式寫入到Tablestore表。
說(shuō)明動(dòng)態(tài)列模式下只支持UPDATE模式。
defaultTimestampInMillisecond
設(shè)定寫入Tablestrore數(shù)據(jù)的默認(rèn)時(shí)間戳。
Long
否
-1
如果不指定,則會(huì)使用系統(tǒng)當(dāng)前的毫秒時(shí)間戳。
dynamicColumnSink
是否開啟動(dòng)態(tài)列模式。
Boolean
否
false
動(dòng)態(tài)列模式適用于在表定義中無(wú)需指定列名,根據(jù)作業(yè)運(yùn)行情況動(dòng)態(tài)插入數(shù)據(jù)列的場(chǎng)景。建表語(yǔ)句中主鍵需要定義為前若干列,最后兩列中前一列的值作為列名變量,且類型必須為String,后一列的值作為該列對(duì)應(yīng)的值。
說(shuō)明開啟動(dòng)態(tài)列模式時(shí),不支持主鍵自增列,且參數(shù)overwriteMode必須設(shè)置為UPDATE。
checkSinkTableMeta
是否檢查結(jié)果表元數(shù)據(jù)。
Boolean
否
true
若設(shè)置為true,會(huì)檢查Tablestore表的主鍵列和此處的建表語(yǔ)句中指定的主鍵是否一致。
enableRequestCompression
數(shù)據(jù)寫入過(guò)程中是否開啟數(shù)據(jù)壓縮。
Boolean
否
false
無(wú)。
維表獨(dú)有
參數(shù)
說(shuō)明
數(shù)據(jù)類型
是否必填
默認(rèn)值
備注
retryIntervalMs
重試間隔時(shí)間。
Integer
否
1000
單位為毫秒。
maxRetryTimes
最大重試次數(shù)。
Integer
否
10
無(wú)。
cache
緩存策略。
String
否
ALL
目前Tablestore維表支持以下三種緩存策略:
None:無(wú)緩存。
LRU:緩存維表里的部分?jǐn)?shù)據(jù)。源表的每條數(shù)據(jù)都會(huì)觸發(fā)系統(tǒng)先在Cache中查找數(shù)據(jù),如果沒(méi)有找到,則去物理維表中查找。
需要配置相關(guān)參數(shù):緩存大小(cacheSize)和緩存更新時(shí)間間隔(cacheTTLMs)。
ALL(默認(rèn)值):緩存維表里的所有數(shù)據(jù)。在Job運(yùn)行前,系統(tǒng)會(huì)將維表中所有數(shù)據(jù)加載到Cache中,之后所有的維表查找數(shù)據(jù)都會(huì)通過(guò)Cache進(jìn)行。如果在Cache中無(wú)法找到數(shù)據(jù),則KEY不存在,并在Cache過(guò)期后重新加載一遍全量Cache。
適用于遠(yuǎn)程表數(shù)據(jù)量小且MISS KEY(源表數(shù)據(jù)和維表JOIN時(shí),ON條件無(wú)法關(guān)聯(lián))特別多的場(chǎng)景。需要配置相關(guān)參數(shù):緩存更新時(shí)間間隔cacheTTLMs,更新時(shí)間黑名單cacheReloadTimeBlackList。
說(shuō)明因?yàn)橄到y(tǒng)會(huì)異步加載維表數(shù)據(jù),所以在使用CACHE ALL時(shí),需要增加維表JOIN節(jié)點(diǎn)的內(nèi)存,增加的內(nèi)存大小為遠(yuǎn)程表數(shù)據(jù)量的兩倍。
cacheSize
緩存大小。
Integer
否
無(wú)
當(dāng)緩存策略選擇LRU時(shí),可以設(shè)置緩存大小。
說(shuō)明單位為數(shù)據(jù)條數(shù)。
cacheTTLMs
緩存失效時(shí)間。
Integer
否
無(wú)
單位為毫秒。cacheTTLMs配置和cache有關(guān):
如果cache配置為None,則cacheTTLMs可以不配置,表示緩存不超時(shí)。
如果cache配置為L(zhǎng)RU,則cacheTTLMs為緩存超時(shí)時(shí)間。默認(rèn)不過(guò)期。
如果cache配置為ALL,則cacheTTLMs為緩存加載時(shí)間。默認(rèn)不重新加載。
cacheEmpty
是否緩存空結(jié)果。
Boolean
否
無(wú)
true:緩存
false:不緩存
cacheReloadTimeBlackList
更新時(shí)間黑名單。在緩存策略選擇為ALL時(shí),啟用更新時(shí)間黑名單,防止在此時(shí)間內(nèi)做Cache更新(例如雙11場(chǎng)景)。
String
否
無(wú)
格式為2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情況如下所示:
用半角逗號(hào)(,)來(lái)分隔多個(gè)黑名單。
用箭頭(->)來(lái)分割黑名單的起始結(jié)束時(shí)間。
async
是否異步返回?cái)?shù)據(jù)。
Boolean
否
false
true:表示異步返回?cái)?shù)據(jù)。異步返回?cái)?shù)據(jù)默認(rèn)是無(wú)序的。
false(默認(rèn)值):表示不進(jìn)行異步返回?cái)?shù)據(jù)。
類型映射
源表
Tablestore字段類型
Flink字段類型
INTEGER
BIGINT
STRING
STRING
BOOLEAN
BOOLEAN
DOUBLE
DOUBLE
BINARY
BINARY
結(jié)果表
Flink字段類型
Tablestore字段類型
BINARY
BINARY
VARBINARY
CHAR
STRING
VARCHAR
TINYINT
INTEGER
SMALLINT
INTEGER
BIGINT
FLOAT
DOUBLE
DOUBLE
BOOLEAN
BOOLEAN
使用示例
CREATE TEMPORARY TABLE tablestore_stream(
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR
) WITH
'connector'='ots',
'endPoint' ='<yourEndpoint>',
'instanceName' = 'flink-source',
'tableName' ='flink_source_table',
'tunnelName' = 'flinksourcestream',
'accessId' ='${ak_id}',
'accessKey' ='${ak_secret}',
'ignoreDelete' = 'false',
'skipInvalidData' ='false'
);
CREATE TEMPORARY TABLE ots_sink (
`order` VARCHAR,
orderid VARCHAR,
customerid VARCHAR,
customername VARCHAR,
PRIMARY KEY (`order`,orderid) NOT ENFORCED
) WITH (
'connector'='ots',
'endPoint'='<yourEndpoint>',
'instanceName'='flink-sink',
'tableName'='flink_sink_table',
'accessId'='${ak_id}',
'accessKey'='${ak_secret}',
'valueColumns'='customerid,customername',
'autoIncrementKey'='${auto_increment_primary_key_name}'
);
INSERT INTO ots_sink
SELECT `order`, orderid, customerid, customername FROM tablestore_stream;