本文為您介紹如何使用云數據庫Tair連接器(Redis開源版)。
背景信息
阿里云數據庫Tair是兼容開源Redis協議標準、提供內存加硬盤混合存儲的數據庫服務,基于高可靠雙機熱備架構及可平滑擴展的集群架構,充分滿足高吞吐、低延遲及彈性變配的業務需求,更多內容詳情請參見什么是云數據庫 Tair(兼容 Redis)。
Redis連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 維表和結果表 |
支持模式 | 流模式 |
數據格式 | String |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API 種類 | SQL |
是否支持更新或刪除結果表數據 | 是 |
前提條件
使用限制
目前Redis連接器是僅提供Best Effort語義,無法保證數據的Exactly Once,需要您自行保證語義上的冪等性。
維表使用限制有:
僅支持讀取Redis數據存儲中STRING和HASHMAP類型的數據。
維表的字段必須為STRING,且必須聲明且只能聲明一個主鍵。
維表JOIN時,ON條件必須包含主鍵的等值條件。
已知缺陷及解決方案
實時計算引擎VVR 8.0.9版本緩存功能存在問題,需要在結果表WITH參數中添加 'sink.buffer-flush.max-rows' = '0' 禁用。
語法結構
CREATE TABLE redis_table (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED -- 必填。
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'mode' = 'STRING' -- 結果表時必填。
);
WITH參數
通用
參數
說明
數據類型
是否必填
默認值
備注
connector
表類型。
String
是
無
固定值為redis。
host
Redis Server連接地址。
String
是
無
推薦您使用內網地址。
說明由于網絡延遲和帶寬限制等因素,連接公網地址時可能會出現不穩定的情況。
port
Redis Server連接端口。
Int
否
6379
無。
password
Redis數據庫密碼。
String
否
空字符串,表示不進行校驗。
無。
dbNum
選擇操作的數據庫編號。
Int
否
0
無。
clusterMode
Redis集群是否為集群模式。
Boolean
否
false
無。
hostAndPorts
Redis集群的主機和端口號。
說明如果啟用了集群模式,且不需要連接高可用,可以通過host和port配置項只配置其中一臺主機,也可以只配置該項。該配置項的優先級高于獨立的host和port配置項。
String
否
空
如果
ClusterMode = true
,同時需要支持Jedis到自建Redis集群連接的高可用,必須配置該項。配置格式為字符串:"host1:port1,host2:port2"
。key-prefix
表主鍵值的前綴。
String
否
無
配置后,Redis維表和結果表的主鍵字段值在查詢或者寫入Redis時會被自動添加前綴,該前綴是由鍵前綴(key-prefix)和其后的前綴分隔符(key-prefix-delimiter)組成。
說明僅實時計算引擎VVR 8.0.7及以上版本支持該參數。
key-prefix-delimiter
表主鍵值與表主鍵值前綴之間的分隔符。
String
否
無
connection.pool.max-total
連接池可以分配的最大連接數。
Int
否
8
說明僅實時計算引擎VVR 8.0.9及以上版本支持該參數。
connection.pool.max-idle
連接池中最大空閑連接數。
Int
否
8
connection.pool.min-idle
連接池中最小空閑連接數。
Int
否
0
connect.timeout
建立連接的超時時間。
Duration
否
3000ms
socket.timeout
從Redis服務器接收數據的超時時間(即套接字超時)。
Duration
否
3000ms
結果表獨有
參數
說明
數據類型
是否必填
默認值
備注
mode
對應Redis的數據結構。
String
是
無
云數據庫Tair結果表支持5種Redis數據結構,其DDL必須按指定格式定義且主鍵必須被定義。詳情請參見Redis結果表數據結構格式。
flattenHash
是否按照多值模式寫入HASHMAP類型數據。
Boolean
否
false
參數取值如下:
true:按照多值模式寫入。此時,您需要在DDL中聲明多個非主鍵字段,主鍵字段值對應key,每個非主鍵字段的字段名對應一個field,字段值對應該field的value。
false:按照單值模式寫入。此時您需要在DDL中聲明三個字段,第一個主鍵字段的字段值對應key,第二個非主鍵字段的字段值對應field,第三個非主鍵字段的字段值對應value。
說明該參數僅在mode參數取值為HASHMAP時生效。
僅實時計算引擎VVR 8.0.7及以上版本支持該參數。
ignoreDelete
是否忽略Retraction消息。
Boolean
否
false
參數取值如下:
true:收到Retraction消息時,忽略Retraction消息。
false:收到Retraction消息時,同時刪除數據對應的key及已插入的數據。
expiration
為寫入數據對應的Key設置TTL。
Long
否
0,代表不設置TTL。
如果該參數的值大于0,則寫入數據對應的Key會被設置相應的TTL,單位為毫秒。
說明僅實時計算引擎VVR 4.0.13及以上版本支持該參數。
sink.buffer-flush.max-rows
緩存可保存的最大記錄數。
Int
否
200
緩存記錄包括所有追加、修改和刪除的事件,超過最大記錄數時將刷寫緩存。
說明僅實時計算引擎VVR 8.0.9及以上版本支持該參數。
僅適用于非Redis集群實例,可以設置為
0
禁用該參數。
sink.buffer-flush.interval
緩存刷寫時間間隔。
Duration
否
1000ms
異步刷寫緩存。
說明僅實時計算引擎VVR 8.0.9及以上版本支持該參數。
僅適用于非Redis集群實例,可以設置為
0
禁用該參數。
維表獨有
參數
說明
數據類型
是否必填
默認值
備注
mode
讀取Redis的數據類型。
String
否
STRING
參數取值如下:
STRING:不指定時,默認以STRING類型讀取。
HASHMAP:當指定mode為HASHMAP時,將按照多值模式讀取HASHMAP類型數據。
此時DDL需要聲明多個非主鍵字段,主鍵字段值對應key,每個非主鍵字段的字段名對應field,字段值對應value。
說明僅實時計算引擎VVR 8.0.7及以上版本支持該參數。
如果您需要以單值模式讀取HASHMAP類型數據時,請配置hashName參數。
hashName
單值模式讀取HASHMAP類型數據時使用的key。
String
否
無
如果您未指定mode參數,還希望以單值模式讀取HASHMAP類型數據。此時,您需要配置hashName。
此時DDL僅需要聲明兩個字段,第一個主鍵字段的字段值對應field,第二個非主鍵字段的字段值對應value。
cache
緩存策略。
String
否
None
云數據庫Tair維表支持以下緩存策略:
None(默認值):無緩存。
LRU:緩存維表里的部分數據。源表的每條數據都會觸發系統先在Cache中查找數據,如果沒有找到,則去物理維表中查找。
ALL:緩存維表里的所有數據。在Job運行前,系統會將維表中所有數據加載到Cache中,之后所有的維表查找數據都會通過Cache進行。如果在Cache中無法找到數據,則KEY不存在。全量的Cache有一個過期時間,過期后會重新加載一遍全量Cache。
重要僅實時計算引擎VVR 8.0.3及以上版本支持ALL緩存策略。
ALL緩存策略目前僅支持單值模式讀取hashmap類型數據(即hashName參數不為空,mode參數為空時)。
需要配置相關參數:緩存大小(cacheSize)和緩存更新時間間隔(cacheTTLMs)。
cacheSize
緩存大小。
Long
否
10000
當選擇LRU緩存策略時,需要設置緩存大小。
cacheTTLMs
緩存超時時長,單位為毫秒。
Long
否
無
cacheTTLMs配置和cache有關:
如果cache配置為None,則cacheTTLMs可以不配置,表示緩存不超時。
如果cache配置為LRU,則cacheTTLMs為緩存超時時間。默認不過期。
如果cache配置為ALL,則cacheTTLMs為緩存加載時間。默認不重新加載。
cacheEmpty
是否緩存空結果。
Boolean
否
true
無。
cacheReloadTimeBlackList
更新時間黑名單。在緩存策略選擇為ALL時,啟用更新時間黑名單,防止在此時間內做Cache更新(例如雙11場景)。
String
否
無
格式為2017-10-24 14:00 -> 2017-10-24 15:00,2017-11-10 23:30 -> 2017-11-11 08:00。分隔符的使用情況如下所示:
用英文逗號(,)來分隔多個黑名單。
用箭頭(->)來分割黑名單的起始結束時間。
Redis結果表數據結構格式
類型 | 格式 | Redis插入數據的命令 |
STRING類型 | DDL為兩列:
|
|
LIST類型 | DDL為兩列:
|
|
SET類型 | DDL為兩列:
|
|
HASHMAP類型 | 默認情況下,DDL為三列:
|
|
flattenHash參數配置為true時,DDL支持多列,以4列的情況為例:
|
| |
SORTEDSET類型 | DDL為三列:
|
|
類型映射
類型 | Redis字段類型 | Flink字段類型 |
通用 | STRING | STRING |
結果表獨有 | SCORE | DOUBLE |
因為Redis的SCORE類型應用于SORTEDSET(有序集合),所以需要手動為每個Value設置一個DOUBLE類型的SCORE,Value才能按照該SCORE從小到大進行排序。
使用示例
結果表
寫入STRING類型數據:在代碼示例中,
redis_sink
結果表中col1
列的值會作為key,col2
列的值會作為value寫入到Redis中。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, col2 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_sink ( col1 STRING, col2 STRING, PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'STRING', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); INSERT INTO redis_sink SELECT * FROM datagen_source;
單值模式寫入HASHMAP類型數據:在代碼示例中,
redis_sink
結果表中的col1
列的值會作為key,col2
列的值會作為field,col3
列的值會作為value寫入到Redis中。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, col2 STRING, col3 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_sink ( col1 STRING, col2 STRING, col3 STRING PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); INSERT INTO redis_sink SELECT * FROM datagen_source;
多值模式寫入HASHMAP類型數據:在代碼示例中,
redis_sink
結果表中的col1
列的值會作為key,col2
列的值會作為field為col2的value,col3
列的值會作為field為col3的value,col4
列的值會作為field為col4的value,寫入到Redis中。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, col2 STRING, col3 STRING, col4 STRING ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_sink ( col1 STRING, col2 STRING, col3 STRING, col4 STRING PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'flattenHash' = 'true', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); INSERT INTO redis_sink SELECT * FROM datagen_source;
維表
讀取STRING類型數據:在代碼示例中,
redis_dim
維表中的col1
列的值對應key,col2
列的值對應value。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_dim ( col1 STRING, col2 STRING, PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>' ); CREATE TEMPORARY TABLE blackhole_sink ( col1 STRING, col2 STRING, col3 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.col1, t2.col1, t2.col2 FROM datagen_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.col1 = t2.col1;
單值模式讀取HASHMAP類型數據:在代碼示例中,
hashName
參數的值testKey為key,redis_dim
維表中的col1
列的值對應field,col2
列的值對應value。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_dim ( col1 STRING, col2 STRING, PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>', 'hashName' = 'testkey' ); CREATE TEMPORARY TABLE blackhole_sink ( col1 STRING, col2 STRING, col3 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.col1, t2.col1, t2.col2 FROM datagen_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.col1 = t2.col1;
多值模式讀取HASHMAP類型數據:在代碼示例中,
redis_dim
維表中的col1
列的值對應key,col2
列的值對應field為col2的value,col3
列的值對應field為col3的value,col4
列的值對應field為col4的value。CREATE TEMPORARY TABLE datagen_source ( col1 STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '100' ); CREATE TEMPORARY TABLE redis_dim ( col1 STRING, col2 STRING, col3 STRING, col4 STRING PRIMARY KEY (col1) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = '<yourHost>', 'port' = '<yourPort>', 'password' = '<yourPassword>', 'mode' = 'HASHMAP' ); CREATE TEMPORARY TABLE blackhole_sink ( col1 STRING, col2 STRING, col3 STRING, col4 STRING ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.col1, t2.col2, t2.col3, t2.col4 FROM datagen_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.col1 = t2.col1;