本文介紹SQL開發中涉及的重要參數說明和使用示例。
table.exec.sink.keyed-shuffle
為解決向帶有主鍵的表中寫入數據時出現的分布式亂序問題,您可以通過table.exec.sink.keyed-shuffle參數來進行Hash Shuffle操作,這將確保相同主鍵的數據被發送到算子的同一個并發,減少分布式亂序問題。
注意事項
僅在上游算子能夠確保更新記錄在主鍵字段上的順序性時,Hash Shuffle操作才起作用;否則,Hash Shuffle操作不能解決問題。
在作業專家模式時,修改算子并發度,不適用下面的并發度判定規則。
取值說明
AUTO(默認值):表示在Sink的并發度不為1,且Sink的并發度與上游算子不同時,當數據流向Sink時,Flink會自動對主鍵字段進行Hash Shuffle操作。
FORCE:表示在Sink并發度不為1時,當數據流向Sink時,Flink會強制對主鍵字段進行Hash Shuffle操作。
NONE:表示Flink不會根據Sink和上游算子的并發度信息進行Hash Shuffle操作。
使用示例
參數值為AUTO
新建SQL流作業,復制如下測試SQL(顯式指定Sink并發度為2),部署作業。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3) ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random','fields.ts.max-past'='5s', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print', --您可以通過sink.parallelism參數直接指定Sink并發度。 'sink.parallelism'='2' ); INSERT INTO sink SELECT * FROM s1; --您也可以通過動態表選項的方式指定Sink并發度。 --INSERT INTO sink /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM s1;
在作業運維頁面的部署詳情頁簽資源配置區域,將并發度設置為1,在運行參數配置區域其他配置中,不設置
table.exec.sink.keyed-shuffle
參數或顯式添加table.exec.sink.keyed-shuffle: AUTO
(兩者效果一致)。啟動作業。在狀態總覽頁簽下,您可以看到Sink節點和上游的數據連接方式為HASH。
參數值為FORCE
新建SQL流作業,復制如下測試SQL(不再顯式指定Sink并發度),部署作業。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3) ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random','fields.ts.max-past'='5s', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print' ); INSERT INTO sink SELECT * FROM s1;
在作業運維頁面的部署詳情頁簽資源配置區域,將并發度設置為2。在運行參數配置區域其他配置中添加
table.exec.sink.keyed-shuffle: FORCE
。啟動作業后,在狀態總覽頁簽下,您可以看到Sink節點和上游節點的并發度都為2,并且數據連接方式變成了HASH。
table.exec.mini-batch.size
該參數控制了相關的計算節點進行微批操作所緩存的最大數據條數,達到該值后觸發最終的計算和數據輸出。該參數只有與table.exec.mini-batch.enabled、table.exec.mini-batch.allow-latency同時使用時才會生效。有關MiniBatch相關的優化請參見MiniBatch Aggregation和MiniBatch 雙流Join。
注意事項
在作業啟動前,如果未在運行參數配置區域顯式設置該參數,在MiniBatch處理模式下,將使用Managed Memory緩存數據,在以下幾種條件下都會觸發最終計算和數據輸出:
收到MiniBatchAssigner節點發送的watermark消息
Managed Memory已滿
進行Checkpoint前
作業停止時
取值說明
-1(默認值):表示使用Managed Memory緩存數據。
其他Long類型的負值:同默認設置。
其他Long類型的正值:表示使用Heap Memory來緩存數據。當緩存的數據量達到N條時,會自動觸發輸出操作。
使用示例
新建SQL流作業,復制如下測試SQL,部署作業。
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts TIMESTAMP(3), PRIMARY KEY (a) NOT ENFORCED, WATERMARK FOR ts AS ts - INTERVAL '1' SECOND ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.ts.kind'='random', 'fields.ts.max-past'='5s', 'fields.b.kind'='random', 'fields.b.min'='0', 'fields.b.max'='10' ); CREATE TEMPORARY TABLE sink ( a INT, b BIGINT, PRIMARY KEY (a) NOT ENFORCED ) WITH ( 'connector'='print' ); INSERT INTO sink SELECT a, sum(b) FROM s1 GROUP BY a;
在作業運維頁面的部署詳情頁運行參數配置區域其他配置中,設置
table.exec.mini-batch.enabled: true
和table.exec.mini-batch.allow-latency: 2s
參數,不設置table.exec.mini-batch.size
(取默認值-1)。啟動作業。在狀態總覽頁簽下,您可以看到作業包含了MiniBatchAssigner節點、LocalGroupAggregate節點和GlobalGroupAggregate節點。
table.exec.agg.mini-batch.output-identical-enabled
在開啟State TTL的情況下,MinibatchGlobalAgg節點和MinibatchAgg節點在消費數據后,如果聚合結果未發生變化,默認將不會向下游發送重復的數據,這可能導致下游的有狀態節點由于長時間未收到上游發送的數據,自身State過期的問題。該參數控制了開啟StateTTL且聚合結果未發生變化的情況下,是否仍然向下游發送重復數據。您可以設置該參數為true
,使得MinibatchGlobalAgg和MinibatchAgg兩個節點在這種情況下,仍然下發數據。如果您的作業聚合結果變化周期小于State TTL設置時間,則無需手動設置此參數。具體社區Issues詳情請參考FLINK-33936。
注意事項
該參數僅在VVR-8.0.8及以上版本生效,在VVR-8.0.8前的版本,其行為等同于取值為false的行為。
當取值從false修改為true時,可能會導致MinibatchGlobalAgg節點和MinibatchAgg節點向下游發送的數據量增加,對下游算子造成壓力。
取值說明
false(默認值):表示在開啟State TTL的情況下,MinibatchGlobalAgg節點和MinibatchAgg節點在消費數據后,如果聚合結果未發生變化,就不向下游下發數據。
true:表示在開啟State TTL的情況下,MinibatchGlobalAgg節點和MinibatchAgg節點在消費數據后,如果聚合結果未發生變化,仍然向下游發送更新的(重復的)數據。
使用示例
新建SQL流作業,復制如下測試SQL,部署作業。
create temporary table src( a int, b string ) with ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.a.min' = '1', 'fields.a.max' = '1', 'fields.b.length' = '3' ); create temporary table snk( a int, max_length_b bigint ) with ( 'connector' = 'blackhole' ); insert into snk select a, max(CHAR_LENGTH(b)) from src group by a;
在作業運維頁面的部署詳情頁運行參數配置區域其他配置中,設置
table.exec.mini-batch.enabled: true
和table.exec.mini-batch.allow-latency: 2s
參數,啟用Minibatch Aggregate優化。啟動作業。在狀態總覽頁簽下,您可以看到作業包含了MinibatchGlobalAggregate節點,點擊該節點上的“+”號,可以觀察到GlobalGroupAggregate節點在聚合結果不變的情況下,不向下游發送數據。
停止該作業,在作業運維頁面的部署詳情頁運行參數配置區域其他配置中,添加參數
table.exec.agg.mini-batch.output-identical-enabled: true
啟動作業。在狀態總覽頁簽下,您可以看到作業包含了MinibatchGlobalAggregate節點,點擊該節點上的“+”號,可以觀察到GlobalGroupAggregate節點在聚合結果不變的情況下,仍然向下游發送數據。
table.exec.async-lookup.key-ordered-enabled
業務場景在通過維表Join做數據打寬時,通過開啟異步模式通常可以獲得更好的吞吐性能。當前在維表Join中設置table.exec.async-lookup.output-mode參數和處理的輸入是否為更新流最終會對應到異步I/O如下結果順序:
output-mode\處理的輸入 | 更新流 | 非更新流 |
ORDERED | 有序模式 | 有序模式 |
ALLOW_UNORDERED | 有序模式 | 無序模式 |
表格中更新流和ALLOW_UNORDERED的組合通過有序模式確保了正確性,但在一定程度上犧牲了吞吐性能。為優化該場景,推出了table.exec.async-lookup.key-ordered-enabled參數,既兼顧更新流的正確性語義,又保證異步I/O的吞吐性能。對于流中具備相同更新鍵(可視為變更日志主鍵)的消息,將按照消息進入算子的先后順序進行處理。
有序(Ordered)模式:這種模式保持了流的順序,發出結果消息的順序與觸發異步請求的順序(消息進入算子的順序)相同。
無序(Unordered)模式:異步請求一結束就立刻發出結果消息。流中消息的順序在經過異步I/O算子之后發生了改變。詳情請詳見異步 I/O | Apache Flink。
應用場景
一段時間內流上相同更新鍵的消息量較少(比如更新鍵為主鍵的場景,相同主鍵的數據更新頻率不高),同時在維表join時對基于更新鍵的處理有處理順序的需求。該優化可以保證基于更新鍵的數據處理順序。
在包含主鍵的CDC流中,通過維表join打寬寫入Sink(Sink的主鍵與Source的主鍵保持一致),且維表join的join key和主鍵不一致,維表側join key為主鍵。該優化會根據CDC主鍵(被推導為更新鍵)進行shuffle。相比同場景開啟SHUFFLE_HASH優化,在多并發的情況下,可以避免在Sink前產生SinkMaterializer 節點,從而消除因該節點引起的潛在性能問題,尤其可以消除長期運行時該節點產生的大state。有關SinkUpsertMaterializer請參見使用建議。
維表join的join key和主鍵不一致,維表側join key為主鍵,且之后存在rank節點,該優化會根據CDC主鍵(被推導為更新鍵)進行shuffle。相比同場景開啟SHUFFLE_HASH優化,可以避免UpdateFastRank退化為RetractRank。RetractRank如何能優化成UpdateFastRank請參見TopN優化技巧。
注意事項
當流上不存在更新鍵時,會將整行數據作為key。
在短時間內同一個更新鍵存在較頻繁的更新時吞吐量會降低,因為針對同一個更新鍵的數據是嚴格按照順序處理的。
Key-Ordered模式相比原有異步維表Join新增了Keyed State,開啟或關閉該模式會影響狀態兼容性。
僅適用于VVR 8.0.10及以上版本且維表Join的輸入是非更新流,配置
table.exec.async-lookup.output-mode='ALLOW_UNORDERED'
、table.exec.async-lookup.key-ordered-enabled='true'
時才會生效。
取值說明
false(默認值):表示不開啟Key-Ordered模式。
true:表示開啟Key-Ordered模式。
使用示例
以使用Hologres異步維表Join為例,新建SQL流作業,復制如下測試SQL,部署作業。
Hologres連接器詳情請參見實時數倉Hologres。
create TEMPORARY table bid_source( auction BIGINT, bidder BIGINT, price BIGINT, channel VARCHAR, url VARCHAR, dateTime TIMESTAMP(3), extra VARCHAR, proc_time as proctime(), WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND ) with ( 'connector' = 'kafka', -- 非insert only流連接器 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE users ( user_id STRING PRIMARY KEY NOT ENFORCED, -- 定義主鍵 user_name VARCHAR(255) NOT NULL, age INT NOT NULL ) WITH ( 'connector' = 'hologres', -- 支持異步lookup功能連機器 'async' = 'true', 'dbname' = 'holo db name', --Hologres的數據庫名稱 'tablename' = 'schema_name.table_name', --Hologres用于接收數據的表名稱 'username' = 'access id', --當前阿里云賬號的AccessKey ID 'password' = 'access key', --當前阿里云賬號的AccessKey Secret 'endpoint' = 'holo vpc endpoint', --當前Hologres實例VPC網絡的Endpoint ); CREATE TEMPORARY TABLE bh ( auction BIGINT, age int ) WITH ( 'connector' = 'blackhole' ); insert into bh SELECT bid_source.auction, u.age FROM bid_source JOIN users FOR SYSTEM_TIME AS OF bid_source.proc_time AS u ON bid_source.channel = u.user_id;
在作業運維頁面的部署詳情頁簽運行參數配置區域其他配置中,設置
table.exec.async-lookup.output-mode='ALLOW_UNORDERED'
和table.exec.async-lookup.key-ordered-enabled='true'
參數。啟動作業。在狀態總覽頁簽下,您可以看到作業async屬性KEY_ORDERED:true。