本文為您介紹大數據計算服務MaxCompute連接器的語法結構、WITH參數和使用示例等。
背景信息
大數據計算服務MaxCompute(原名ODPS)是一種快速、完全托管的EB級數據倉庫解決方案,致力于批量結構化數據的存儲和計算,提供海量數據倉庫的解決方案及分析建模服務。MaxCompute的詳情請參見什么是MaxCompute。
MaxCompute連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 源表、維表和結果表 |
運行模式 | 流模式和批模式 |
數據格式 | 暫不支持 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | Datastream和SQL |
是否支持更新或刪除結果表數據 | Batch Tunnel和Stream Tunnel模式僅支持插入數據,Upsert Tunnel模式支持插入、更新和刪除數據。 |
前提條件
已創建MaxCompute表,詳情請參見創建表。
使用限制
僅實時計算引擎VVR 2.0.0及以上版本支持MaxCompute連接器。
MaxCompute連接器僅支持At Least Once語義。
說明At Least Once語義會保證數據不缺失,但在少部分情況下,可能會將重復數據寫入MaxCompute。不同的MaxCompute Tunnel出現重復數據的情況不同,MaxCompute Tunnel詳情請參見如何選擇數據通道?。
默認情況下源表為全量模式,僅會讀取partition參數中指定的分區,在讀完所有數據后結束運行,狀態轉換為finished,不會監控是否有新分區產生。
如果您需要持續監控新分區,請通過WITH參數中指定startPartition使用增量源表模式。
說明維表每次更新時都會檢查最新分區,不受這一限制。
在源表開始運行后,向分區里添加的新數據不會被讀取,請在分區數據完整的情況下運行作業。
語法結構
CREATE TABLE odps_source(
id INT,
user_name VARCHAR,
content VARCHAR
) WITH (
'connector' = 'odps',
'endpoint' = '<yourEndpoint>',
'tunnelEndpoint' = '<yourTunnelEndpoint>',
'project' = '<yourProjectName>',
'schemaName' = '<yourSchemaName>',
'tableName' = '<yourTableName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'partition' = 'ds=2018****'
);
WITH參數
通用
參數
說明
數據類型
是否必填
默認值
備注
connector
表類型。
String
是
無
固定值為odps。
endpoint
MaxCompute服務地址。
String
是
無
請參見Endpoint。
tunnelEndpoint
MaxCompute Tunnel服務的連接地址。
String
否
無
請參見Endpoint。
說明如果未填寫,MaxCompute會根據內部的負載均衡服務分配Tunnel的連接。
project
MaxCompute項目名稱。
String
是
無
無。
schemaName
MaxCompute Schema名稱。
String
否
無
僅當MaxCompute項目開啟Schema功能時,需填寫該值為MaxCompute表所屬Schema名,詳情請參見 Schema操作。
說明僅實時計算引擎VVR 8.0.6及以上版本支持該參數。
tableName
MaxCompute表名。
String
是
無
無。
accessId
MaxCompute AccessKey ID。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理。
accessKey
MaxCompute AccessKey Secret。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理。
partition
MaxCompute分區名。
String
否
無
對于非分區表和增量源表無需填寫。
說明分區表詳情請參見在讀取或寫入分區時,如何填寫Partition參數?。
compressAlgorithm
MaxCompute Tunnel使用的壓縮算法。
String
否
VVR 4.0.13及以上版本:ZLIB
VVR 6.0.1及以上版本:SNAPPY
參數取值如下:
RAW(無壓縮)
ZLIB
SNAPPY
SNAPPY相比ZLIB能帶來明顯的吞吐提升。在測試場景下,吞吐提升約50%。
說明僅實時計算引擎VVR 4.0.13及以上版本支持該參數。
quotaName
MaxCompute獨享數據傳輸服務的quota名稱。
String
否
無
設置該值來使用獨享的MaxCompute數據傳輸服務。
重要僅實時計算引擎VVR 8.0.3及以上版本支持該參數。
設置該值時,必須刪除tunnelEndpoint參數,否則仍將使用tunnelEndpoint中指定的數據通道。
說明MaxCompute獨享數據傳輸參見購買與使用獨享數據傳輸服務資源組。
源表獨有
參數
說明
數據類型
是否必填
默認值
備注
maxPartitionCount
可以讀取的最大分區數量。
Integer
否
100
如果讀取的分區數量超過了該參數,則會出現報錯
The number of matched partitions exceeds the default limit
。重要由于一次性讀取大量分區會給MaxCompute服務帶來一定壓力,同時也會讓作業啟動速度變慢,因此您需要確認是否需要讀取這么多分區(而不是誤填partition參數)。如果確實需要,需要手動調大maxPartitionCount參數。
useArrow
是否使用Arrow格式讀取數據。
Boolean
否
false
使用Arrow格式能夠調用MaxCompute的Storage API,詳情請參見什么是MaxCompute中用戶接口與開放性一節。
重要僅在批作業中生效。
僅實時計算引擎VVR 8.0.8及以上版本支持該參數。
splitSize
在使用Arrow格式讀取數據時,一次拉取的數據大小。
MemorySize
否
256 MB
僅實時計算引擎VVR 8.0.8及以上版本支持該參數。
重要僅在批作業中生效。
compressCodec
在使用Arrow格式讀取數據時,采用的壓縮算法。
String
否
""
參數取值如下:
"" (無壓縮)
ZSTD
LZ4_FRAME
指定壓縮算法相比無壓縮能帶來一定的吞吐提升。
重要僅在批作業中生效。
僅實時計算引擎VVR 8.0.8及以上版本支持該參數。
dynamicLoadBalance
是否允許動態分配分片。
Boolean
否
false
參數取值如下:
true:允許
false:不允許
允許動態分配分片能夠發揮Flink不同節點的處理性能,減少源表整體讀取時間,但也會導致不同節點讀取總數據量不一致,出現數據傾斜情況。
重要僅在批作業中生效。
僅實時計算引擎VVR 8.0.8及以上版本支持該參數。
增量源表獨有
增量源表通過間歇輪詢MaxCompute服務器獲取所有的分區信息來發現新增的分區,讀取新分區時要求分區內數據已寫入完畢,詳情參見增量MaxCompute源表監聽到新分區時,如果該分區還有數據沒有寫完,如何處理?。通過startPartition可以指定起始點位,但注意只讀取字典序大于等于起始點位的分區,例如分區
year=2023,month=10
字典序小于分區year=2023,month=9
,對于這種類型的分區聲明可以通過加0補齊的方式來保證字典序正確,例如year=2023,month=09
。參數
說明
數據類型
是否必填
默認值
備注
startPartition
增量讀取的起始MaxCompute分區點位(包含)。
String
是
無
使用該參數后啟用增量源表模式,將忽略partition參數。
多級分區必須按分區級別從大到小聲明每個分區列的值。
說明startPartition參數詳情,請參見如何填寫增量MaxCompute的startPartition參數?。
subscribeIntervalInSec
輪詢MaxCompute獲取分區列表的時間間隔。
Integer
否
30
單位為秒。
modifiedTableOperation
讀取分區過程中遇到分區數據被修改時的處理。
Enum (NONE, SKIP)
否
NONE
由于下載session被保存在檢查點中,每次從檢查點恢復時嘗試從該session恢復讀取進度,而該session由于分區數據被修改不可用,Flink任務會陷入不斷重啟。此時您可以設置該參數,參數取值如下:
NONE:需要您修改startPartition參數使其大于不可用分區,并從無狀態啟動作業。
SKIP:若不希望無狀態啟動,可將模式修改為SKIP,Flink嘗試從檢查點恢復session時將跳過不可用的分區。
重要NONE和SKIP模式下,被修改分區中已讀取的數據不會被撤回,未讀取的數據將不會被讀取。
結果表獨有
參數
說明
數據類型
是否必填
默認值
備注
useStreamTunnel
是否使用MaxCompute Stream Tunnel上傳數據。
Boolean
否
false
參數取值如下:
true:使用MaxCompute Stream Tunnel上傳數據。
false:使用MaxCompute Batch Tunnel上傳數據。
說明僅實時計算引擎VVR 4.0.13及以上版本支持該參數。
數據通道選擇詳情請參見如何選擇數據通道?。
flushIntervalMs
MaxCompute Tunnel Writer緩沖區flush間隔。
Long
否
30000(30秒)
MaxCompute Sink寫入記錄時,先將數據存儲到MaxCompute的緩沖區中,等緩沖區溢出或者每隔一段時間(flushIntervalMs),再把緩沖區里的數據寫到目標MaxCompute表。
對于Stream Tunnel,flush的數據立即可見;對于Batch Tunnel,數據flush后仍需要等待checkpoint完成后才可見,建議設置該參數為0來關閉定時flush。
單位為毫秒。
說明本參數可以與batchSize一同使用,滿足任一條件即會Flush數據。
batchSize
MaxCompute Tunnel Writer緩沖區flush的大小。
Long
否
67108864(64 MB)
MaxCompute Sink寫入記錄時,先將數據存儲到MaxCompute的緩沖區中,等緩沖區達到一定大小(batchSize),再把緩沖區里的數據寫到目標MaxCompute表。
單位為字節。
說明僅實時計算引擎VVR 4.0.14及以上版本支持該參數。
本參數可以與flushIntervalMs一同使用,滿足任一條件即會Flush數據。
numFlushThreads
MaxCompute Tunnel Writer緩沖區flush的線程數。
Integer
否
1
每個MaxCompute Sink并發將創建numFlushThreads個線程用于flush數據。當該值大于1時,將允許不同分區的數據并發Flush,提升Flush的效率。
說明僅實時計算引擎VVR 4.0.14及以上版本支持該參數。
dynamicPartitionLimit
寫入動態分區的最大數量。
Integer
否
100
當結果表在兩次Checkpoint之間寫入的動態分區數量超過了dynamicPartitionLimit,則會出現報錯
Too many dynamic partitions
。重要由于一次性寫入大量分區會給MaxCompute服務帶來一定壓力,同時也會導致結果表flush和作業Checkpoint變慢。因此當報錯出現時,您需要確認是否需要寫入這么多分區。如果確實需要,需要手動調大dynamicPartitionLimit參數。
retryTimes
向MaxCompute服務器請求最大重試次數。
Integer
否
3
創建session、提交session、flush數據時可能存在短暫的MaxCompute服務不可用的情況,會根據該配置進行重試。
sleepMillis
重試間隔時間。
Integer
否
1000
單位為毫秒。
enableUpsert
是否使用MaxCompute Upsert Tunnel上傳數據。
Boolean
否
false
參數取值如下:
true:使用Upsert Tunnel,處理Flink中的INSERT、UPDATE_AFTER和DELETE數據。
false:根據useStreamTunnel參數使用Batch Tunnel或Stream Tunnel,處理Flink中的INSERT、UPDATE_AFTER數據。
重要若Upsert模式下MaxCompute sink提交時出現報錯、失敗、耗時長等情況,建議限制sink節點的并發數在10以內。
僅實時計算引擎VVR 8.0.6及以上版本支持該參數。
upsertAsyncCommit
Upsert模式下在提交session時是否使用異步模式。
Boolean
否
false
參數取值如下:
true:使用異步模式,提交耗時更短,但提交完成時寫入的數據非立即可讀。
false:默認為同步模式,提交時將等待服務側處理完session。
說明僅實時計算引擎VVR 8.0.6及以上版本支持該參數。
upsertCommitTimeoutMs
Upsert模式下提交session超時時間。
Integer
否
120000
(120秒)
僅實時計算引擎VVR 8.0.6及以上版本支持該參數。
維表獨有
MaxCompute維表在作業啟動時從指定的分區拉取全量數據,partition參數支持使用max_pt()等函數。當緩存過期重新加載時會重新解析partition參數拉取最新的分區,使用max_two_pt()時維表可拉取兩個分區,其他情況下只支持指定單個分區。
參數
說明
數據類型
是否必填
默認值
備注
cache
緩存策略。
String
是
無
目前MaxCompute維表僅支持
ALL
策略,必須顯式聲明。 適用于遠程表數據量小且MISS KEY(源表數據和維表JOIN時,ON條件無法關聯)特別多的場景。ALL策略:緩存維表里的所有數據。在Job運行前,系統會將維表中所有數據加載到Cache中,之后所有的維表查詢都會通過Cache進行。如果在Cache中無法找到數據,則KEY不存在,并在Cache過期后重新加載一遍全量Cache。
說明因為系統會異步加載維表數據,所以在使用CACHE ALL時,需要增加維表JOIN節點的內存,增加的內存大小為遠程表數據量的至少4倍,具體值與MaxCompute存儲壓縮算法有關。
如果MaxCompute維表數據量較大,可以考慮使用SHUFFLE_HASH注解將維表數據均勻分散到各個并發中。詳情請參見如何使用維表SHUFFLE_HASH注解?。
在使用超大MaxCompute維表時,如果JVM頻繁GC導致作業異常,且在增加維表JOIN節點的內存仍無改善的情況下,建議改為支持LRU Cache策略的KV型維表,例如云數據庫Hbase版維表。
cacheSize
最多緩存的數據條數。
Long
否
100000
如果維表數據量超過了cacheSize,則會出現報錯
Row count of table <table-name> partition <partition-name> exceeds maxRowCount limit
。重要由于維表數據量太大會占用大量JVM堆內存,同時也會讓作業啟動和維表更新變慢,因此您需要確認是否需要緩存這么多數據,如果確實需要,需要手動調大該參數。
cacheTTLMs
緩存超時時間,也就是緩存更新的間隔時間。
Long
否
Long.MAX_VALUE(相當于永不更新)
單位為毫秒。
cacheReloadTimeBlackList
更新時間黑名單。在該參數規定的時間段內不會更新緩存。
String
否
無
用于防止緩存在關鍵時間段(例如活動流量峰值期間)更新導致作業不穩定。填寫方式詳情請參見如何填寫CacheReloadTimeBlackList參數?。
maxLoadRetries
緩存更新時(包含作業啟動時初次拉取數據)最多嘗試次數,超過該次數后作業運行失敗。
Integer
否
10
無。
類型映射
MaxCompute支持的類型參見2.0數據類型版本。
MaxCompute類型 | Flink類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
當MaxCompute物理表中同時存在嵌套的復合類型字段(ARRAY、MAP或STRUCT)和JSON類型字段時,需要在創建MaxCompute物理表時指定tblproperties('columnar.nested.type'='true')
,才能被Flink正確讀寫。
使用示例
SQL
源表示例
全量讀取
CREATE TEMPORARY TABLE odps_source ( cid VARCHAR, rt DOUBLE ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpointName>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=201809*' ); CREATE TEMPORARY TABLE blackhole_sink ( cid VARCHAR, invoke_count BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT cid, COUNT(*) AS invoke_count FROM odps_source GROUP BY cid;
增量讀取
CREATE TEMPORARY TABLE odps_source ( cid VARCHAR, rt DOUBLE ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpointName>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'startPartition' = 'yyyy=2018,MM=09,dd=05' -- 從20180905對應分區開始讀取 ); CREATE TEMPORARY TABLE blackhole_sink ( cid VARCHAR, invoke_count BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT cid, COUNT(*) AS invoke_count FROM odps_source GROUP BY cid;
結果表示例
寫入固定分區
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905' -- 寫入固定分區ds=20180905。 ); INSERT INTO odps_sink SELECT id, len, content FROM datagen_source;
寫入動態分區
CREATE TEMPORARY TABLE datagen_source ( id INT, len INT, content VARCHAR, c TIMESTAMP ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_sink ( id INT, len INT, content VARCHAR, ds VARCHAR --需要顯式聲明動態分區列。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds' --不寫分區的值,表示根據ds字段的值寫入不同分區。 ); INSERT INTO odps_sink SELECT id, len, content, DATE_FORMAT(c, 'yyMMdd') as ds FROM datagen_source;
維表示例
一對一維表
CREATE TEMPORARY TABLE datagen_source ( k INT, v VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_dim ( k INT, v VARCHAR, PRIMARY KEY (k) NOT ENFORCED -- 一對一維表需要聲明主鍵。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905', 'cache' = 'ALL' ); CREATE TEMPORARY TABLE blackhole_sink ( k VARCHAR, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT k, s.v, d.v FROM datagen_source AS s INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
一對多維表
CREATE TEMPORARY TABLE datagen_source ( k INT, v VARCHAR ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE odps_dim ( k INT, v VARCHAR -- 一對多維表無需聲明主鍵。 ) WITH ( 'connector' = 'odps', 'endpoint' = '<yourEndpoint>', 'project' = '<yourProjectName>', 'tableName' = '<yourTableName>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'partition' = 'ds=20180905', 'cache' = 'ALL' ); CREATE TEMPORARY TABLE blackhole_sink ( k VARCHAR, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT k, s.v, d.v FROM datagen_source AS s INNER JOIN odps_dim FOR SYSTEM_TIME AS OF PROCTIME() AS d ON s.k = d.k;
DataStream
通過DataStream的方式讀寫數據時,則需要使用對應的DataStream連接器連接Flink,DataStream連接器設置方法請參見DataStream連接器使用方法。
為了保護知識產權,從實時計算引擎VVR6.0.6版本起,此連接器在本地調試單次運行作業的時間為30分鐘,30分鐘后作業會報錯并退出。本地運行和調試包含MaxCompute連接器的作業參見本地運行和調試包含連接器的作業。
若您在Flink開發控制臺提交作業后,出現本地運行和調試包含連接器的作業中類似的MaxCompute相關類ClassNotFound問題,請下載Maven中央庫中對應版本中后綴為uber.jar的文件,添加為作業的附加依賴。以1.15-vvr-6.0.6版本為例,需下載的文件為該倉庫目錄下的verveica-connector-odps-1.15-vvr-6.0.6-uber.jar。
在DataStream中使用MaxCompute連接器推薦使用SQL聲明MaxCompute表,通過Table/DataStream相互轉換來連接MaxCompute表和數據流
連接源表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_source (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=201809*'",
")");
DataStream<Row> source = tEnv.toDataStream(tEnv.from("odps_source"));
source.print();
env.execute("odps source");
連接結果表
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(String.join(
"\n",
"CREATE TEMPORARY TABLE IF NOT EXISTS odps_sink (",
" cid VARCHAR,",
" rt DOUBLE",
") WITH (",
" 'connector' = 'odps',",
" 'endpoint' = '<yourEndpointName>',",
" 'project' = '<yourProjectName>',",
" 'tableName' = '<yourTableName>',",
" 'accessId' = '<yourAccessId>',",
" 'accessKey' = '<yourAccessPassword>',",
" 'partition' = 'ds=20180905'",
")");
DataStream<Row> data = env.fromElements(
Row.of("id0", 3.),
Row.of("id1", 4.));
tEnv.fromDataStream(data).insertInto("odps_sink").execute();
XML
MaxCompute連接器的Maven依賴包含了構建全量源表、增量源表、結果表和維表的所需要的類。Maven中央庫中已經放置了MaxCompute DataStream連接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-odps</artifactId>
<version>${vvr-version}</version>
</dependency>