當前MaxCompute為您提供了新版的Flink Connector插件,新版插件支持將Flink數據寫入至MaxCompute的普通表和Delta Table類型表,提高了Flink數據寫入MaxCompute的便捷性。本文為您介紹新版Flink Connector寫入MaxCompute的能力支持情況與主要操作流程。
背景信息
支持的寫入模式:
使用新版Flink Connector將數據寫入MaxCompute時,支持通過Upsert和Insert寫入方式。其中使用Upsert時支持以下兩種數據寫入流:
按照Primary Key進行分組
按照分區字段進行分組
若分區數量過多,您可以按照分區字段進行分組,但使用該流程可能導致數據傾斜。
Upsert模式下,通過Flink Connector進行數據寫入流程和參數配置建議,詳情請參見數據實時入倉實踐。
您可在配置Flink數據寫入MaxCompute時,通過設置Flink Connector參數指定使用哪種寫入方式,全量Connector參數介紹請參見下文的附錄:新版Flink Connector全量參數。
Flink Upsert寫入任務的Checkpoint間隔建議設置3分鐘以上,設置太小的話,寫入效率得不到保障,并且可能引入大量小文件。
MaxCompute與實時計算Flink版的字段類型對照關系如下:
Flink 數據類型
MaxCompute 數據類型
CHAR(p)
CHAR(p)
VARCHAR(p)
VARCHAR(p)
STRING
STRING
BOOLEAN
BOOLEAN
TINYINT
TINYINT
SMALLINT
SMALLINT
INT
INT
BIGINT
LONG
FLOAT
FLOAT
DOUBLE
DOUBLE
DECIMAL(p, s)
DECIMAL(p, s)
DATE
DATE
TIMESTAMP(9) WITHOUT TIME ZONE、TIMESTAMP_LTZ(9)
TIMESTAMP
TIMESTAMP(3) WITHOUT TIME ZONE、TIMESTAMP_LTZ(3)
DATETIME
BYTES
BINARY
ARRAY<T>
LIST<T>
MAP<K, V>
MAP<K, V>
ROW
STRUCT
說明Flink的TIMESTAMP數據類型不含時區,MaxCompute TIMESTAMP數據類型含時區。此差異會導致8小時的時間差。其通過使用TIMESTAMP_LTZ(9)來對齊時間戳。
--FlinkSQL CREATE TEMPORARY TABLE odps_source( id BIGINT NOT NULL COMMENT 'id', created_time TIMESTAMP NOT NULL COMMENT '創建時間', updated_time TIMESTAMP_LTZ(9) NOT NULL COMMENT '更新時間', PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', ... );
Flink數據寫入MaxCompute流程:自建開源Flink
準備工作:創建MaxCompute表。
您需先創建好MaxCompute表,用于后續Flink數據寫入。以下以創建兩張表(Delta Table非分區表和分區表)作為示例,為您演示Flink數據寫入MaxCompute的主要流程,其中表屬性設置請參考Delta Table表參數。
--創建Delta Table非分區表 CREATE TABLE mf_flink_tt ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ; --創建Delta Table分區表 CREATE TABLE mf_flink_tt_part ( id BIGINT not null, name STRING, age INT, status BOOLEAN, primary key (id) ) partitioned by (dd string, hh string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;
搭建開源Flink集群。當前支持1.13、1.15、1.16和1.17版本的開源Flink,您可以選擇對應版本的Flink:
說明Flink 1.17版本可復用1.16版本。
本文以Flink Connector 1.13為例,將包下載至本地環境,下載完成后進行解壓。
下載Flink Connector并添加至Flink集群包中。
將Flink Connector Jar包下載至本地環境。
將Flink Connector Jar包添加至解壓后的Flink安裝包的lib目錄中。
mv flink-connector-odps-1.13-shaded.jar $FLINK_HOME/lib/flink-connector-odps-1.13-shaded.jar
啟動Flink實例服務。
cd $FLINK_HOME/bin ./start-cluster.sh
啟動Flink客戶端。
cd $FLINK_HOME/bin ./sql-client.sh
創建Flink表,并配置Flink Connector參數。
當前支持直接使用Flink SQL創建Flink表并配置參數,也支持使用Flink的DataStream API進行相關操作。兩種操作的核心示例如下。
使用Flink SQL
進入Flink SQL的編輯界面,執行以下命令完成建表與參數配置。
-- 在 Flink SQL中注冊一張對應的非分區表 CREATE TABLE mf_flink ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyLZKT****', 'odps.access.key'='gJwKaF3hK9MDAQgbO0zs****', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' ); -- 在 Flink SQL 中注冊一張對應的分區表 CREATE TABLE mf_flink_part ( id BIGINT, name STRING, age INT, status BOOLEAN, dd STRING, hh STRING, PRIMARY KEY(id) NOT ENFORCED ) PARTITIONED BY (`dd`,`hh`) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_tt_part', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyLZKT****', 'odps.access.key'='gJwKaF3hK9MDAQgbO0zs*******', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj' );
向Flink表中寫入數據,并在MaxCompute表中查詢,驗證Flink數據寫入MaxCompute的結果。
--在flink Sql客戶端中往非分區表里插入數據 INSERT INTO mf_flink VALUES (1,'Danny',27, false); --在Maxcompute中查詢返回 SELECT * FROM mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 27 | false | +------------+------+------+--------+ --在flink Sql客戶端中往非分區表里插入數據 INSERT INTO mf_flink VALUES (1,'Danny',28, false); --在Maxcompute中查詢返回 SELECT * FROM mf_flink_tt; +------------+------+------+--------+ | id | name | age | status | +------------+------+------+--------+ | 1 | Danny | 28 | false | +------------+------+------+--------+ --在flink Sql客戶端中往分區表里插入數據 INSERT INTO mf_flink_part VALUES (1,'Danny',27, false, '01','01'); --在Maxcompute中查詢返回 SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 27 | false | 01 | 01 | +------------+------+------+--------+----+----+ --在flink Sql客戶端中分區表里插入數據 INSERT INTO mf_flink_part VALUES (1,'Danny',30, false, '01','01'); --在Maxcompute中查詢返回 SELECT * FROM mf_flink_tt_part WHERE dd=01 AND hh=01; +------------+------+------+--------+----+----+ | id | name | age | status | dd | hh | +------------+------+------+--------+----+----+ | 1 | Danny | 30 | false | 01 | 01 | +------------+------+------+--------+----+----+
使用DataStream API
使用DataStream接口時,需先添加以下依賴。
<dependency> <groupId>com.aliyun.odps</groupId> <artifactId>flink-connector-maxcompute</artifactId> <version>xxx</version> <scope>system</scope> <systemPath>${mvn_project.basedir}/lib/flink-connector-maxcompute-xxx-shaded.jar</systemPath> </dependency>
說明使用時請將“xxx”改為相應的版本號。
建表與參數配置的示例代碼如下。
package com.aliyun.odps.flink.examples; import org.apache.flink.configuration.Configuration; import org.apache.flink.odps.table.OdpsOptions; import org.apache.flink.odps.util.OdpsConf; import org.apache.flink.odps.util.OdpsPipeline; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; public class Examples { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(120 * 1000); StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env); Table source = streamTableEnvironment.sqlQuery("SELECT * FROM source_table"); DataStream<RowData> input = streamTableEnvironment.toAppendStream(source, RowData.class); Configuration config = new Configuration(); config.set(OdpsOptions.SINK_OPERATION, "upsert"); config.set(OdpsOptions.UPSERT_COMMIT_THREAD_NUM, 8); config.set(OdpsOptions.UPSERT_MAJOR_COMPACT_MIN_COMMITS, 100); OdpsConf odpsConfig = new OdpsConf("accessid", "accesskey", "endpoint", "project", "tunnel endpoint"); OdpsPipeline.Builder builder = OdpsPipeline.builder(); builder.projectName("sql2_isolation_2a") .tableName("user_ledger_portfolio") .partition("") .configuration(config) .odpsConf(odpsConfig) .sink(input, false); env.execute(); } }
Flink數據寫入MaxCompute流程:阿里云全托管Flink
準備工作:創建MaxCompute表。
您需先創建好MaxCompute表,用于后續Flink數據寫入。以下以創建一張Delta Table表為例。
SET odps.sql.type.system.odps2=true; DROP TABLE mf_flink_upsert; CREATE TABLE mf_flink_upsert ( c1 int not null, c2 string, gt timestamp, primary key (c1) ) PARTITIONED BY (ds string) tblproperties ("transactional"="true", "write.bucket.num" = "64", "acid.data.retain.hours"="12") ;
登錄實時計算控制臺,查看Flink Connector信息,Flink Connector已經加載到阿里云全托管Flink VVP上。
通過Flink SQL作業創建Flink表,并構造Flink實時數據,完成作業開發后進行作業部署。
在Flink的作業開發頁面,創建并編輯Flink SQL作業,以下示例為新建一張Flink數據源表、一張Flink臨時結果表,并自動構建實時數據生成邏輯寫入源表,通過計算邏輯將源表數據寫入臨時結果表。SQL作業開發詳細操作請參見SQL作業開發。
--創建flink數據源表, CREATE TEMPORARY TABLE fake_src_table ( c1 int, c2 VARCHAR, gt AS CURRENT_TIMESTAMP ) WITH ( 'connector' = 'faker', 'fields.c2.expression' = '#{superhero.name}', 'rows-per-second' = '100', 'fields.c1.expression' = '#{number.numberBetween ''0'',''1000''}' ); --flink創建臨時結果表 CREATE TEMPORARY TABLE test_c_d_g ( c1 int, c2 VARCHAR, gt TIMESTAMP, ds varchar, PRIMARY KEY(c1) NOT ENFORCED ) PARTITIONED BY(ds) WITH ( 'connector' = 'maxcompute', 'table.name' = 'mf_flink_upsert', 'sink.operation' = 'upsert', 'odps.access.id'='LTAI5tRzd4W8cTyL****', 'odps.access.key'='gJwKaF3hK9MDAQgb**********', 'odps.end.point'='http://service.cn-beijing.maxcompute.aliyun.com/api', 'odps.project.name'='mf_mc_bj', 'upsert.write.bucket.num'='64' ); --flink 計算邏輯 INSERT INTO test_c_d_g SELECT c1 AS c1, c2 AS c2, gt AS gt, date_format(gt, 'yyyyMMddHH') AS ds FROM fake_src_table;
其中:
odps.end.point
:使用對應Region的云產品互聯網絡Endpoint。upsert.write.bucket.num
:與MaxCompute中創建的Delta Table表的write.bucket.num屬性值保持一致。在MaxCompute中查詢數據,驗證Flink數據寫入MaxCompute的結果。
SELECT * FROM mf_flink_upsert WHERE ds=2023061517; --返回,由于Flink中的數據為隨機生成,實際MaxCompute查詢結果與本示例不一定完全一致 +------+----+------+----+ | c1 | c2 | gt | ds | +------+----+------+----+ | 0 | Skaar | 2023-06-16 01:59:41.116 | 2023061517 | | 21 | Supah Century | 2023-06-16 01:59:59.117 | 2023061517 | | 104 | Dark Gorilla Grodd | 2023-06-16 01:59:57.117 | 2023061517 | | 126 | Leader | 2023-06-16 01:59:39.116 | 2023061517 |
附錄:新版Flink Connector全量參數
基礎參數
參數
是否必填
默認值
說明
connector
是
無默認值
Connector類型,需設置為
MaxCompute
。odps.project.name
是
無默認值
MaxCompute的Project名稱。
odps.access.id
是
無默認值
您的阿里云賬號AccessKey ID。您可以在訪問憑證頁面查看對應信息。
odps.access.key
是
無默認值
您的阿里云賬號AccessKey Secret。您可以在訪問憑證頁面查看對應信息。
odps.end.point
是
無默認值
MaxCompute的Endpoint信息。各地域的MaxCompute Endpoint請參見Endpoint。
odps.tunnel.end.point
否
Tunnel服務的公網訪問鏈接。如果您未配置Tunnel Endpoint,Tunnel會自動路由到MaxCompute服務所在網絡對應的Tunnel Endpoint。如果您配置了Tunnel Endpoint,則以配置為準,不進行自動路由。
各地域及網絡對應的Tunnel Endpoint值,請參見Endpoint。
odps.tunnel.quota.name
否
無默認值
訪問MaxCompute使用的Tunnel Quota名稱。
table.name
是
無默認值
MaxCompute表名稱,格式為
[project.][schema.]table
。odps.namespace.schema
否
false
是否使用三層模型。關于三層模型介紹,請參見Schema操作。
sink.operation
是
insert
寫入類型,取值為
insert
或upsert
說明僅MaxCompute Delta Table支持Upsert寫入。
sink.parallelism
否
無默認值
寫入的并行度,如果不設置,則默認使用上游數據并行度。
說明請務必確保表屬性 write.bucket.num 是該配置值的整數倍,這樣可以獲得最佳的寫入性能,并且能夠最有效地節省 Sink 節點內存。
sink.meta.cache.time
否
400
元數據緩存大小。
sink.meta.cache.expire.time
否
1200
元數據緩存超時時間,單位:秒(s)。
sink.coordinator.enable
否
是
是否開啟Coordinator模式。
分區參數
參數
是否必填
默認值
說明
sink.partition
否
無默認值
待寫入的分區名稱。
若您使用的是動態分區,則為動態分區的上級分區名稱。
sink.partition.default-value
否
__DEFAULT_PARTITION__
使用動態分區時的默認分區名稱。
sink.dynamic-partition.limit
否
100
動態分區寫入時,單個Checkpoint可同時導入的最大分區數量。
說明建議不要大幅提升該參數值,因為當同時寫入的分區數量過多時,容易導致Sink節點內存溢出(OOM),且當并發寫入分區數超過閾值,寫入任務會報錯。
sink.group-partition.enable
否
false
動態分區寫入時,是否按照分區進行分組。
sink.partition.assigner.class
否
無默認值
PartitionAssigner實現類。
FileCached模式寫入參數
當動態分區數量過多時,可以使用文件緩存模式,您可以通過以下參數配置數據寫入時緩存文件信息。
參數
是否必配
默認值
說明
sink.file-cached.enable
否
false
是否開啟FileCached寫入。取值說明:
false:不開啟。
true:開啟。
說明當動態分區數量過多時,可以使用文件緩存模式。
sink.file-cached.tmp.dirs
否
./local
文件緩存模式下,默認文件緩存目錄。
sink.file-cached.writer.num
否
16
文件緩存模式下,單個Task上傳數據的并發數。
說明建議不要大幅提升該參數值,因為當同時寫入的分區數量過多時,容易導致內存溢出(OOM)。
sink.bucket.check-interval
否
60000
文件緩存模式下,檢查文件大小的周期,單位:毫秒(ms)。
sink.file-cached.rolling.max-size
否
16 M
文件緩存模式下,單個緩存文件的最大值。
若文件大小超過該值,會將該文件數據上傳到服務端。
sink.file-cached.memory
否
64 M
文件緩存模式下,寫入文件使用的最大堆外內存大小。
sink.file-cached.memory.segment-size
否
128 KB
文件緩存模式下,寫入文件的使用的buffer大小。
sink.file-cached.flush.always
否
true
文件緩存模式下,寫入文件是否使用緩存。
sink.file-cached.write.max-retries
否
3
文件緩存模式下,上傳數據的重試次數。
Insert
或Upsert
寫入參數Upsert寫入參數
參數
是否必填
默認值
說明
upsert.writer.max-retries
否
3
Upsert Writer寫入Bucket失敗后,重試次數。
upsert.writer.buffer-size
否
64 m
單個Upsert Writer數據在Flink中的緩存大小。
說明當所有Bucket的緩沖區大小總和達到預設閾值時,系統將自動觸發刷新操作,將數據更新到服務器端。
一個upsert writer里會同時寫入多個Bucket,建議提高該值,以提升寫入效率。
若寫入分區較多時,會存在引發內存OOM風險,可考慮降低該參數值。
upsert.writer.bucket.buffer-size
否
1 m
單個Bucket數據在Flink中的緩存大小,當Flink服務器使用內存資源緊張時,可以減小該參數值。
upsert.write.bucket.num
是
無
寫入表的bucket數量,必須與寫入表
write.bucket.num
值一致。upsert.write.slot-num
否
1
單個Session使用Tunnel slot數量。
upsert.commit.max-retries
否
3
Upsert Session Commit重試次數。
upsert.commit.thread-num
否
16
Upsert Session Commit的并行度。
不建議將此參數值調整得過大,因為當同時進行的提交并發數越多時,會導致資源消耗增加,可能導致性能問題或資源過度消耗。
upsert.major-compact.min-commits
否
100
發起Major Compact的最小Commit次數。
upsert.commit.timeout
否
600
Upsert Session Commit等待超時時間,單位:秒(s)。
upsert.major-compact.enable
否
false
是否開啟Major Compact。
upsert.flush.concurrent
否
2
限制單個分區允許同時寫入的最大Bucket數。
說明每當一個bucket的數據刷新時,將會占用一個Tunnel Slot資源。
說明Upsert寫入時,參數配置建議詳情,請參見Upsert寫入參數配置建議。
Insert寫入參數
參數
是否必配
默認值
說明
insert.commit.thread-num
否
16
Commit Session的并行度。
insert.arrow-writer.enable
否
false
是否使用Arrow格式。
insert.arrow-writer.batch-size
否
512
Arrow Batch的最大行數。
insert.arrow-writer.flush-interval
否
100000
Writer Flush間隔,單位毫秒(ms)。
insert.writer.buffer-size
否
64 M
使用Buffered Writer的緩存大小。