日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

重要參數說明

本文介紹SQL開發中涉及的重要參數說明和使用示例。

table.exec.sink.keyed-shuffle

為解決向帶有主鍵的表中寫入數據時出現的分布式亂序問題,您可以通過table.exec.sink.keyed-shuffle參數來進行Hash Shuffle操作,這將確保相同主鍵的數據被發送到算子的同一個并發,減少分布式亂序問題。

注意事項

  • 僅在上游算子能夠確保更新記錄在主鍵字段上的順序性時,Hash Shuffle操作才起作用;否則,Hash Shuffle操作不能解決問題。

  • 在作業專家模式時,修改算子并發度,不適用下面的并發度判定規則。

取值說明

  • AUTO(默認值):表示在Sink的并發度不為1,且Sink的并發度與上游算子不同時,當數據流向Sink時,Flink會自動對主鍵字段進行Hash Shuffle操作。

  • FORCE:表示在Sink并發度不為1時,當數據流向Sink時,Flink會強制對主鍵字段進行Hash Shuffle操作。

  • NONE:表示Flink不會根據Sink和上游算子的并發度信息進行Hash Shuffle操作。

使用示例

  • 參數值為AUTO

    1. 新建SQL流作業,復制如下測試SQL(顯式指定Sink并發度為2),部署作業。

      CREATE TEMPORARY TABLE s1 (
        a INT,
        b INT,
        ts TIMESTAMP(3)
      ) WITH (
        'connector'='datagen',
        'rows-per-second'='1',
        'fields.ts.kind'='random','fields.ts.max-past'='5s',
        'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
      );
      
      CREATE TEMPORARY TABLE sink (
        a INT,
        b INT,
        ts TIMESTAMP(3),
        PRIMARY KEY (a) NOT ENFORCED
      ) WITH (
        'connector'='print',
         --您可以通過sink.parallelism參數直接指定Sink并發度。
        'sink.parallelism'='2'
      );
      
      INSERT INTO sink SELECT * FROM s1;
      --您也可以通過動態表選項的方式指定Sink并發度。
      --INSERT INTO sink /*+ OPTIONS('sink.parallelism' = '2') */ SELECT * FROM s1;
    2. 作業運維頁面的部署詳情頁簽資源配置區域,將并發度設置為1,在運行參數配置區域其他配置中,不設置table.exec.sink.keyed-shuffle參數或顯式添加table.exec.sink.keyed-shuffle: AUTO(兩者效果一致)。

      image

    3. 啟動作業。在狀態總覽頁簽下,您可以看到Sink節點和上游的數據連接方式為HASH。

      image

  • 參數值為FORCE

    1. 新建SQL流作業,復制如下測試SQL(不再顯式指定Sink并發度),部署作業。

      CREATE TEMPORARY TABLE s1 (
        a INT,
        b INT,
        ts TIMESTAMP(3)
      ) WITH (
        'connector'='datagen',
        'rows-per-second'='1',
        'fields.ts.kind'='random','fields.ts.max-past'='5s',
        'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
      );
      
      CREATE TEMPORARY TABLE sink (
        a INT,
        b INT,
        ts TIMESTAMP(3),
        PRIMARY KEY (a) NOT ENFORCED
      ) WITH (
        'connector'='print'
      );
      
      INSERT INTO sink
      SELECT * FROM s1;
    2. 作業運維頁面的部署詳情頁簽資源配置區域,將并發度設置為2。在運行參數配置區域其他配置中添加table.exec.sink.keyed-shuffle: FORCE

      image

    3. 啟動作業后,在狀態總覽頁簽下,您可以看到Sink節點和上游節點的并發度都為2,并且數據連接方式變成了HASH。

      image

table.exec.mini-batch.size

該參數控制了相關的計算節點進行微批操作所緩存的最大數據條數,達到該值后觸發最終的計算和數據輸出。該參數只有與table.exec.mini-batch.enabled、table.exec.mini-batch.allow-latency同時使用時才會生效。有關MiniBatch相關的優化請參見MiniBatch AggregationMiniBatch 雙流Join

注意事項

在作業啟動前,如果未在運行參數配置區域顯式設置該參數,在MiniBatch處理模式下,將使用Managed Memory緩存數據,在以下幾種條件下都會觸發最終計算和數據輸出:

  • 收到MiniBatchAssigner節點發送的watermark消息

  • Managed Memory已滿

  • 進行Checkpoint前

  • 作業停止時

取值說明

  • -1(默認值):表示使用Managed Memory緩存數據。

  • 其他Long類型的負值:同默認設置。

  • 其他Long類型的正值:表示使用Heap Memory來緩存數據。當緩存的數據量達到N條時,會自動觸發輸出操作。

使用示例

  1. 新建SQL流作業,復制如下測試SQL,部署作業。

    CREATE TEMPORARY TABLE s1 (
      a INT,
      b INT,
      ts TIMESTAMP(3),
      PRIMARY KEY (a) NOT ENFORCED,
      WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
    ) WITH (
      'connector'='datagen',
      'rows-per-second'='1',
      'fields.ts.kind'='random',
      'fields.ts.max-past'='5s',
      'fields.b.kind'='random',
      'fields.b.min'='0',
      'fields.b.max'='10'
    );
    
    CREATE TEMPORARY TABLE sink (
      a INT,
      b BIGINT,
      PRIMARY KEY (a) NOT ENFORCED
    ) WITH (
      'connector'='print'
    );
    
    INSERT INTO sink SELECT a, sum(b) FROM s1 GROUP BY a;
  2. 作業運維頁面的部署詳情運行參數配置區域其他配置中,設置table.exec.mini-batch.enabled: truetable.exec.mini-batch.allow-latency: 2s參數,不設置table.exec.mini-batch.size(取默認值-1)。

  3. 啟動作業。在狀態總覽頁簽下,您可以看到作業包含了MiniBatchAssigner節點、LocalGroupAggregate節點和GlobalGroupAggregate節點。

    image

table.exec.agg.mini-batch.output-identical-enabled

開啟State TTL的情況下,MinibatchGlobalAgg節點和MinibatchAgg節點在消費數據后,如果聚合結果未發生變化,默認將不會向下游發送重復的數據,這可能導致下游的有狀態節點由于長時間未收到上游發送的數據,自身State過期的問題。該參數控制了開啟StateTTL且聚合結果未發生變化的情況下,是否仍然向下游發送重復數據。您可以設置該參數為true,使得MinibatchGlobalAgg和MinibatchAgg兩個節點在這種情況下,仍然下發數據。如果您的作業聚合結果變化周期小于State TTL設置時間,則無需手動設置此參數。具體社區Issues詳情請參考FLINK-33936

注意事項

  • 該參數僅在VVR-8.0.8及以上版本生效,在VVR-8.0.8前的版本,其行為等同于取值為false的行為。

  • 當取值從false修改為true時,可能會導致MinibatchGlobalAgg節點和MinibatchAgg節點向下游發送的數據量增加,對下游算子造成壓力。

取值說明

  • false(默認值):表示在開啟State TTL的情況下,MinibatchGlobalAgg節點和MinibatchAgg節點在消費數據后,如果聚合結果未發生變化,就不向下游下發數據。

  • true:表示在開啟State TTL的情況下,MinibatchGlobalAgg節點和MinibatchAgg節點在消費數據后,如果聚合結果未發生變化,仍然向下游發送更新的(重復的)數據。

使用示例

  1. 新建SQL流作業,復制如下測試SQL,部署作業。

    create temporary table src(
        a int,
        b string
    ) with (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.a.min' = '1',
        'fields.a.max' = '1',
        'fields.b.length' = '3'
    );
    
    create temporary table snk(
        a int,
        max_length_b bigint
    ) with (
        'connector' = 'blackhole'
    );
    
    insert into snk select a, max(CHAR_LENGTH(b)) from src group by a; 
  2. 作業運維頁面的部署詳情運行參數配置區域其他配置中,設置table.exec.mini-batch.enabled: truetable.exec.mini-batch.allow-latency: 2s參數,啟用Minibatch Aggregate優化。

  3. 啟動作業。在狀態總覽頁簽下,您可以看到作業包含了MinibatchGlobalAggregate節點,點擊該節點上的“+”號,可以觀察到GlobalGroupAggregate節點在聚合結果不變的情況下,不向下游發送數據。

    image

  4. 停止該作業,在作業運維頁面的部署詳情運行參數配置區域其他配置中,添加參數table.exec.agg.mini-batch.output-identical-enabled: true

  5. 啟動作業。在狀態總覽頁簽下,您可以看到作業包含了MinibatchGlobalAggregate節點,點擊該節點上的“+”號,可以觀察到GlobalGroupAggregate節點在聚合結果不變的情況下,仍然向下游發送數據。image

table.exec.async-lookup.key-ordered-enabled

業務場景在通過維表Join做數據打寬時,通過開啟異步模式通常可以獲得更好的吞吐性能。當前在維表Join中設置table.exec.async-lookup.output-mode參數和處理的輸入是否為更新流最終會對應到異步I/O如下結果順序:

output-mode\處理的輸入

更新流

非更新流

ORDERED

有序模式

有序模式

ALLOW_UNORDERED

有序模式

無序模式

表格中更新流和ALLOW_UNORDERED的組合通過有序模式確保了正確性,但在一定程度上犧牲了吞吐性能。為優化該場景,推出了table.exec.async-lookup.key-ordered-enabled參數,既兼顧更新流的正確性語義,又保證異步I/O的吞吐性能。對于流中具備相同更新鍵(可視為變更日志主鍵)的消息,將按照消息進入算子的先后順序進行處理。

說明
  • 有序(Ordered)模式:這種模式保持了流的順序,發出結果消息的順序與觸發異步請求的順序(消息進入算子的順序)相同。

  • 無序(Unordered)模式:異步請求一結束就立刻發出結果消息。流中消息的順序在經過異步I/O算子之后發生了改變。詳情請詳見異步 I/O | Apache Flink

應用場景

  • 一段時間內流上相同更新鍵的消息量較少(比如更新鍵為主鍵的場景,相同主鍵的數據更新頻率不高),同時在維表join時對基于更新鍵的處理有處理順序的需求。該優化可以保證基于更新鍵的數據處理順序。

  • 在包含主鍵的CDC流中,通過維表join打寬寫入Sink(Sink的主鍵與Source的主鍵保持一致),且維表join的join key和主鍵不一致,維表側join key為主鍵。該優化會根據CDC主鍵(被推導為更新鍵)進行shuffle。相比同場景開啟SHUFFLE_HASH優化,在多并發的情況下,可以避免在Sink前產生SinkMaterializer 節點,從而消除因該節點引起的潛在性能問題,尤其可以消除長期運行時該節點產生的大state。有關SinkUpsertMaterializer請參見使用建議

  • 維表join的join key和主鍵不一致,維表側join key為主鍵,且之后存在rank節點,該優化會根據CDC主鍵(被推導為更新鍵)進行shuffle。相比同場景開啟SHUFFLE_HASH優化,可以避免UpdateFastRank退化為RetractRank。RetractRank如何能優化成UpdateFastRank請參見TopN優化技巧

注意事項

  • 當流上不存在更新鍵時,會將整行數據作為key。

  • 在短時間內同一個更新鍵存在較頻繁的更新時吞吐量會降低,因為針對同一個更新鍵的數據是嚴格按照順序處理的。

  • Key-Ordered模式相比原有異步維表Join新增了Keyed State,開啟或關閉該模式會影響狀態兼容性。

  • 僅適用于VVR 8.0.10及以上版本且維表Join的輸入是非更新流,配置table.exec.async-lookup.output-mode='ALLOW_UNORDERED'table.exec.async-lookup.key-ordered-enabled='true'時才會生效。

取值說明

  • false(默認值):表示不開啟Key-Ordered模式。

  • true:表示開啟Key-Ordered模式。

使用示例

  1. 以使用Hologres異步維表Join為例,新建SQL流作業,復制如下測試SQL,部署作業。

    Hologres連接器詳情請參見實時數倉Hologres

    create TEMPORARY table bid_source(
      auction  BIGINT,
      bidder  BIGINT,
      price  BIGINT,
      channel  VARCHAR,
      url  VARCHAR,
      dateTime  TIMESTAMP(3),
      extra  VARCHAR,
      proc_time as proctime(),
      WATERMARK FOR dateTime AS dateTime - INTERVAL '4' SECOND
    ) with (
      'connector' = 'kafka',  -- 非insert only流連接器
      'topic' = 'user_behavior',
      'properties.bootstrap.servers' = 'localhost:9092',
      'properties.group.id' = 'testGroup',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE users (
        user_id STRING PRIMARY KEY NOT ENFORCED, -- 定義主鍵
        user_name VARCHAR(255) NOT NULL, 
        age INT NOT NULL
    ) WITH (
        'connector' = 'hologres',    -- 支持異步lookup功能連機器
        'async' = 'true',
        'dbname' = 'holo db name', --Hologres的數據庫名稱
        'tablename' = 'schema_name.table_name', --Hologres用于接收數據的表名稱
        'username' = 'access id', --當前阿里云賬號的AccessKey ID
        'password' = 'access key', --當前阿里云賬號的AccessKey Secret
        'endpoint' = 'holo vpc endpoint', --當前Hologres實例VPC網絡的Endpoint
    );
    
    CREATE TEMPORARY TABLE bh ( 
        auction  BIGINT,
        age int
    ) WITH (
        'connector' = 'blackhole'
    );
    
    insert into bh
    SELECT
        bid_source.auction,
        u.age
    FROM bid_source
        JOIN users FOR SYSTEM_TIME AS OF bid_source.proc_time AS u
        ON bid_source.channel = u.user_id;
    
  2. 作業運維頁面的部署詳情頁簽運行參數配置區域其他配置中,設置table.exec.async-lookup.output-mode='ALLOW_UNORDERED'table.exec.async-lookup.key-ordered-enabled='true'參數。

  3. 啟動作業。在狀態總覽頁簽下,您可以看到作業async屬性KEY_ORDERED:true。

    image

相關文檔

為什么數據在LocalGroupAggregate節點中長時間卡住,無輸出?