日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

高性能Flink SQL優(yōu)化技巧

本文將從作業(yè)配置和Flink SQL優(yōu)化兩方面為您介紹如何提升Flink SQL作業(yè)性能。

作業(yè)配置優(yōu)化推薦方案

  • 資源優(yōu)化技巧

    VVP中限制了JobManager和TaskManager的CPU的實際使用大小,配置了多少個CPU,最大就只能使用多少個CPU。因此在資源優(yōu)化時,建議:

    • 作業(yè)并發(fā)大時:在作業(yè)的部署詳情頁簽的資源配置中,增加JobManager的資源,提高CPU和內(nèi)存的大小,例如:

      • Job Manager CPUs設(shè)置為4。

      • Job Manager Memory設(shè)置為8 GiB。

    • 作業(yè)拓撲較復雜時,在作業(yè)的部署詳情頁簽的資源配置中,增加TaskManager的資源,提高CPU和內(nèi)存的大小,例如:

      • Task Manager CPUs設(shè)置為2。

      • Task Manager Memory設(shè)置為4 GiB。

    • 不建議修改taskmanager.numberOfTaskSlots,保持默認值1。

  • 提升吞吐和解決數(shù)據(jù)熱點的推薦配置

    其他配置中添加以下代碼,具體操作請參見如何配置作業(yè)運行參數(shù)?Group Aggregate優(yōu)化技巧

    execution.checkpointing.interval: 180s
    table.exec.state.ttl: 129600000
    table.exec.mini-batch.enabled: true
    table.exec.mini-batch.allow-latency: 5s
    table.optimizer.distinct-agg.split.enabled: true

    參數(shù)解釋如下表所示。

    參數(shù)

    說明

    execution.checkpointing.interval

    Checkpoint間隔時間,單位為毫秒。

    state.backend

    State backend的配置。

    table.exec.state.ttl

    State數(shù)據(jù)的生命周期,單位為毫秒。

    table.exec.mini-batch.enabled

    是否開啟minibatch。

    table.exec.mini-batch.allow-latency

    批量輸出數(shù)據(jù)的時間間隔。

    table.exec.mini-batch.size

    微批操作所緩存的最大數(shù)據(jù)條數(shù)。

    說明

    實時計算引擎VVR已對Minibatch機制進行了優(yōu)化,建議不設(shè)置該參數(shù),具體參考重要參數(shù)說明

    table.optimizer.distinct-agg.split.enabled

    是否開啟PartialFinal優(yōu)化,解決COUNT DISTINCT熱點問題。

  • 提升雙流Join類型作業(yè)的性能配置

    流式SQL中雙流Join算子支持自動推導開啟KV分離優(yōu)化。在實時計算引擎VVR 6.0.1及以上版本中,SQL作業(yè)雙流Join算子會根據(jù)作業(yè)特點,自動推導并開啟State KV分離優(yōu)化功能,無需您額外配置。開啟State KV分離優(yōu)化功能后,可以顯著提升雙流Join類型作業(yè)的性能。在典型場景的性能測試中,有40%以上的性能提升。

    您可以通過table.exec.join.kv-separate配置項對該功能進行顯式控制,參數(shù)取值詳情如下:

    • AUTO(默認值):表示引擎內(nèi)部會根據(jù)雙流Join算子的State特點自動開啟。

    • FORCE:表示強制開啟KV分離優(yōu)化。

    • NONE:表示強制關(guān)閉KV分離優(yōu)化。

    說明

    該功能僅對Gemini StateBackend生效。

Flink SQL優(yōu)化推薦方案

Group Aggregate優(yōu)化技巧

  • 開啟MiniBatch(提升吞吐)

    MiniBatch是緩存一定的數(shù)據(jù)后再觸發(fā)處理,以減少對State的訪問,從而提升吞吐并減少數(shù)據(jù)的輸出量。

    MiniBatch主要基于事件消息來觸發(fā)微批處理,事件消息會按您指定的時間間隔在源頭插入。

    • 適用場景

      微批處理通過增加延遲換取高吞吐,如果您有超低延遲的要求,不建議開啟微批處理。對于一般聚合場景,微批處理可以顯著提升系統(tǒng)性能,建議開啟。

    • 開啟方式

      MiniBatch默認關(guān)閉,在其他配置中填寫以下代碼即可開啟,具體操作請參見如何配置作業(yè)運行參數(shù)?

      table.exec.mini-batch.enabled: true
      table.exec.mini-batch.allow-latency: 5s

      參數(shù)解釋如下表所示。

      參數(shù)

      說明

      table.exec.mini-batch.enabled

      是否開啟mini-batch。

      table.exec.mini-batch.allow-latency

      批量輸出數(shù)據(jù)的時間間隔。

      table.exec.mini-batch.size

      微批操作所緩存的最大數(shù)據(jù)條數(shù)。

      說明

      該參數(shù)需要同時與上述兩個參數(shù)一起使用才會生效。實時計算引擎VVR已對Minibatch機制進行了優(yōu)化,建議不設(shè)置該參數(shù),具體參考重要參數(shù)說明

  • 開啟LocalGlobal(解決常見數(shù)據(jù)熱點問題)

    LocalGlobal本質(zhì)上能夠靠LocalAgg的聚合篩除部分傾斜數(shù)據(jù),從而降低GlobalAgg的熱點,提升性能。

    LocalGlobal優(yōu)化將原先的Aggregate分成Local和Global兩階段聚合,即MapReduce模型中的Combine和Reduce兩階段處理模式。第一階段在上游節(jié)點積攢一批數(shù)據(jù)進行聚合(localAgg),并輸出這次微批的增量值(Accumulator)。第二階段再將收到的Accumulator合并(Merge),得到最終的結(jié)果(GlobalAgg)。

    • 適用場景

      提升普通聚合(例如SUM、COUNT、MAX、MIN和AVG)的性能,以及這些場景下的數(shù)據(jù)熱點問題。

    • 使用限制

      LocalGlobal是默認開啟的,但是有以下限制:

      • 在minibatch開啟的前提下才能生效。

      • 需要使用AggregateFunction實現(xiàn)Merge。

    • 判斷是否生效

      觀察最終生成的拓撲圖的節(jié)點名字中是否包含GlobalGroupAggregate或LocalGroupAggregate。

  • 開啟PartialFinal(解決COUNT DISTINCT熱點問題)

    為了解決COUNT DISTINCT的熱點問題,通常需要手動改寫為兩層聚合(增加按Distinct Key取模的打散層)。目前,實時計算提供了COUNT DISTINCT自動打散,即PartialFinal優(yōu)化,您無需自行改寫為兩層聚合。

    LocalGlobal優(yōu)化針對普通聚合(例如SUM、COUNT、MAX、MIN和AVG)有較好的效果,對于COUNT DISTINCT收效不明顯,因為COUNT DISTINCT在Local聚合時,對于DISTINCT KEY的去重率不高,導致在Global節(jié)點仍然存在熱點問題。

    • 適用場景

      使用COUNT DISTINCT,但無法滿足聚合節(jié)點性能要求。

      說明
      • 不能在包含UDAF的Flink SQL中使用PartialFinal優(yōu)化方法。

      • 數(shù)據(jù)量較少的情況,不建議使用PartialFinal優(yōu)化方法,浪費資源。因為PartialFinal優(yōu)化會自動打散成兩層聚合,引入額外的網(wǎng)絡(luò)Shuffle。

    • 開啟方式

      默認不開啟。如果您需要開啟,在其他配置中填寫以下代碼,具體操作請參見如何配置作業(yè)運行參數(shù)?

      table.optimizer.distinct-agg.split.enabled: true
    • 判斷是否生效

      觀察最終生成的拓撲圖,是否由原來一層的聚合變成了兩層的聚合。

  • AGG WITH CASE WHEN改寫為AGG WITH FILTER語法(提升大量COUNT DISTINCT場景性能)

    統(tǒng)計作業(yè)需要計算各種維度的UV,例如全網(wǎng)UV、來自手機客戶端的UV、來自PC的UV等等。建議使用標準的AGG WITH FILTER語法來代替CASE WHEN實現(xiàn)多維度統(tǒng)計的功能。實時計算目前的SQL優(yōu)化器能分析出Filter參數(shù),從而同一個字段上計算不同條件下的COUNT DISTINCT能共享State,減少對State的讀寫操作。性能測試中,使用AGG WITH FILTER語法來代替CASE WHEN能夠使性能提升1倍。

    • 適用場景

      對于同一個字段上計算不同條件下的COUNT DISTINCT結(jié)果的場景,性能提升很大。

    • 原始寫法

      COUNT(distinct visitor_id) as UV1 , COUNT(distinct case when is_wireless='y' then visitor_id else null end) as UV2
    • 優(yōu)化寫法

      COUNT(distinct visitor_id) as UV1 , COUNT(distinct visitor_id) filter (where is_wireless='y') as UV2

TopN優(yōu)化技巧

  • TopN算法

    當TopN的輸入是非更新流(例如SLS數(shù)據(jù)源),TopN只有1種算法AppendRank。當TopN的輸入是更新流時(例如經(jīng)過了AGG或JOIN計算),TopN有2種算法,性能從高到低分別是:UpdateFastRank和RetractRank。算法名字會顯示在拓撲圖的節(jié)點名字上。

    • AppendRank:對于非更新流,只支持該算法。

    • UpdateFastRank:對于更新流,最優(yōu)算法。

    • RetractRank:對于更新流,保底算法。性能不佳,在某些業(yè)務(wù)場景下可優(yōu)化成UpdateFastRank。

    下面介紹RetractRank如何能優(yōu)化成UpdateFastRank。使用UpdateFastRank算法需要具備3個條件:

    • 輸入流為更新流,但不能包含DELETE(D)、UPDATE_BEFORE(UB)類型的消息,否則會影響排序字段的單調(diào)性。關(guān)于輸入流的消息類型,可以通過執(zhí)行EXPLAIN CHANGELOG_MODE <query_statement_or_insert_statement_or_statement_set>命令來獲取對應(yīng)節(jié)點輸出的消息類型,語法詳情請參見EXPLAIN語句

    • 輸入流有Primary Key信息,例如上游做了GROUP BY聚合操作。

    • 排序字段的更新是單調(diào)的,且單調(diào)方向與排序方向相反。例如,ORDER BY COUNT/COUNT_DISTINCT/SUM(正數(shù))DESC。

    如果您要獲取到UpdateFastRank的優(yōu)化Plan,則您需要在使用ORDER BY SUM DESC時,添加SUM為正數(shù)的過濾條件,確保total_fee為正數(shù)。

    說明

    如下示例中的random_test表為非更新流,對應(yīng)的GROUP分組聚合的結(jié)果不會包含DELETE(D)、UPDATE_BEFORE(UB)消息,所以對應(yīng)的聚合結(jié)果字段才能保持單調(diào)性。

    可以優(yōu)化成UpdateFastRank的示例:

    insert
      into print_test
    SELECT
      cate_id,
      seller_id,
      stat_date,
      pay_ord_amt  -- 不輸出rownum字段,能減小結(jié)果表的輸出量。
    FROM (
        SELECT
          *,
          ROW_NUMBER () OVER (        
     -- 注意:PARTITION BY的列要被子查詢中的GROUP BY分組聚合字段包含;另外要有時間字段,否則State過期會導致數(shù)據(jù)錯亂。
            PARTITION BY cate_id,
            stat_date  
            ORDER
              BY pay_ord_amt DESC
          ) as rownum  --根據(jù)上游sum結(jié)果排序。
        FROM (
            SELECT
              cate_id,
              seller_id,
              stat_date,
              -- 重點:聲明Sum的參數(shù)都是正數(shù),所以Sum的結(jié)果是單調(diào)遞增的,因此TopN能使用優(yōu)化算法,只獲取前100個數(shù)據(jù)。
              sum (total_fee) filter (
                where
                  total_fee >= 0
              ) as pay_ord_amt
            FROM
              random_test
            WHERE
              total_fee >= 0
            GROUP
              BY seller_id,
              stat_date,
              cate_id
          ) a
        ) WHERE
          rownum <= 100;
  • TopN優(yōu)化方法

    • 無排名優(yōu)化

      TopN的輸出結(jié)果不需要顯示rownum值,僅需在最終前端顯示時進行1次排序,極大地減少輸入結(jié)果表的數(shù)據(jù)量。無排名優(yōu)化方法詳情請參見Top-N

    • 增加TopN的Cache大小

      TopN為了提升性能有一個State Cache層,Cache層能提升對State的訪問效率。TopN的Cache命中率的計算公式如下。

      cache_hit = cache_size*parallelism/top_n/partition_key_num

      例如,Top100配置緩存10000條,并發(fā)50,當您的PartitionBy的Key維度較大時,例如10萬級別時,Cache命中率只有10000*50/100/100000=5%,命中率會很低,導致大量的請求都會擊中State(磁盤),觀察state seek metric可能會有很多毛刺。性能會大幅下降。

      因此當partitionKey維度特別大時,可以適當加大TopN的cache size,相對應(yīng)的也建議適當加大TopN節(jié)點的heap memory,詳情請參見配置作業(yè)部署信息

      table.exec.rank.topn-cache-size: 200000

      默認10000條,調(diào)整TopN cache到200000,那么理論命中率能達到200000*50/100/100000 = 100%

    • PartitionBy的字段中要有時間類字段

      例如每天的排名,要帶上Day字段,否則TopN的最終結(jié)果會由于State TTL產(chǎn)生錯亂。

高效去重方案

實時計算的源數(shù)據(jù)在部分場景中存在重復數(shù)據(jù),去重成為了用戶經(jīng)常反饋的需求。實時計算有保留第一條(Deduplicate Keep FirstRow)和保留最后一條(Deduplicate Keep LastRow)2種去重方案。

  • 語法

    由于SQL上沒有直接支持去重的語法,還要靈活地保留第一條或保留最后一條。因此我們使用了SQL的ROW_NUMBER OVER WINDOW功能來實現(xiàn)去重語法。去重本質(zhì)上是一種特殊的TopN。

    SELECT *
    FROM (
       SELECT *,
        ROW_NUMBER() OVER (PARTITION BY col1[, col2..]
         ORDER BY timeAttributeCol [asc|desc]) AS rownum
       FROM table_name)
    WHERE rownum = 1

    參數(shù)

    說明

    ROW_NUMBER()

    計算行號的OVER窗口函數(shù)。行號從1開始計算。

    PARTITION BY col1[, col2..]

    可選。指定分區(qū)的列,即去重的KEYS。

    ORDER BY timeAttributeCol [asc|desc])

    指定排序的列,必須是一個時間屬性的字段(即Proctime或Rowtime)。可以指定順序(Keep FirstRow)或者倒序 (Keep LastRow)。

    rownum

    僅支持rownum=1rownum<=1

    如上語法所示,去重需要兩層Query:

    1. 使用ROW_NUMBER() 窗口函數(shù)來對數(shù)據(jù)根據(jù)時間屬性列進行排序并標上排名。

      • 當排序字段是Proctime列時,F(xiàn)link就會按照系統(tǒng)時間去重,其每次運行的結(jié)果是不確定的。

      • 當排序字段是Rowtime列時,F(xiàn)link就會按照業(yè)務(wù)時間去重,其每次運行的結(jié)果是確定的。

    2. 對排名進行過濾,只取第一條,達到了去重的目的。

      排序方向可以是按照時間列的順序,也可以是倒序:

      • Deduplicate Keep FirstRow:順序并取第一條行數(shù)據(jù)。

      • Deduplicate Keep LastRow:倒序并取第一條行數(shù)據(jù)。

  • Deduplicate Keep FirstRow

    保留首行的去重策略:保留KEY下第一條出現(xiàn)的數(shù)據(jù),之后出現(xiàn)該KEY下的數(shù)據(jù)會被丟棄掉。因為STATE中只存儲了KEY數(shù)據(jù),所以性能較優(yōu),示例如下。

    SELECT *
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
      FROM T
    )
    WHERE rowNum = 1

    以上示例是將表T按照字段b進行去重,并按照系統(tǒng)時間保留第一條數(shù)據(jù)。proctime在這里是源表T中的一個具有Processing Time屬性的字段。如果您按照系統(tǒng)時間去重,也可以將proctime字段簡化proctime()函數(shù)調(diào)用,可以省略proctime字段的聲明。

  • Deduplicate Keep LastRow

    保留末行的去重策略:保留KEY下最后一條出現(xiàn)的數(shù)據(jù)。保留末行的去重策略性能略優(yōu)于LAST_VALUE函數(shù),示例如下。

    SELECT *
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
      FROM T
    )
    WHERE rowNum = 1

    以上示例是將T表按照b和d字段進行去重,并按照業(yè)務(wù)時間保留最后一條數(shù)據(jù)。rowtime在這里是源表T中的一個具有Event Time屬性的字段。

高效的內(nèi)置函數(shù)

在使用內(nèi)置函數(shù)時,您需要注意以下幾點:

  • 使用內(nèi)置函數(shù)替換自定義函數(shù)

    實時計算的內(nèi)置函數(shù)在持續(xù)的優(yōu)化當中,請盡量使用內(nèi)置函數(shù)替換自定義函數(shù)。實時計算對內(nèi)置函數(shù)主要進行了如下優(yōu)化:

    • 優(yōu)化數(shù)據(jù)序列化和反序列化的耗時。

    • 新增直接對字節(jié)單位進行操作的功能。

  • KEY VALUE函數(shù)使用單字符的分隔符

    KEY VALUE的簽名:KEYVALUE(content, keyValueSplit, keySplit, keyName),當keyValueSplit和KeySplit是單字符,例如,冒號(:)、逗號(,)時,系統(tǒng)會使用優(yōu)化算法,在二進制數(shù)據(jù)上直接尋找所需的keyName值,而不會將整個content進行切分,性能約提升30%。

  • LIKE操作注意事項

    • 如果需要進行StartWith操作,使用LIKE 'xxx%'

    • 如果需要進行EndWith操作,使用LIKE '%xxx'

    • 如果需要進行Contains操作,使用LIKE '%xxx%'

    • 如果需要進行Equals操作,使用LIKE 'xxx',等價于str = 'xxx'

    • 如果需要匹配下劃線(_),請注意要完成轉(zhuǎn)義LIKE '%seller/_id%' ESCAPE '/'。下劃線(_)在SQL中屬于單字符通配符,能匹配任何字符。如果聲明為 LIKE '%seller_id%',則不單會匹配seller_id,還會匹配seller#idsellerxidseller1id等,導致結(jié)果錯誤。

  • 慎用正則函數(shù)(REGEXP)

    正則表達式是非常耗時的操作,對比加減乘除通常有百倍的性能開銷,而且正則表達式在某些極端情況下可能會進入無限循環(huán),導致作業(yè)阻塞,因此建議使用LIKE。正則函數(shù)包括:

SQL Hints

為了更加靈活地提升引擎的優(yōu)化能力Flink支持了SQL提示(SQL Hints),SQL提示一般可以用于以下場景:

  • 修改執(zhí)行計劃:使用SQL提示,您可以更好地控制執(zhí)行計劃。

  • 增加元數(shù)據(jù)(或者統(tǒng)計信息):一些統(tǒng)計數(shù)據(jù)對于查詢來說是動態(tài)的,例如已掃描的表索引、一些shuffle keys的傾斜信息等,從planner獲得的計劃元數(shù)據(jù)可能不準確,此時可以使用提示來配置它們。

  • 動態(tài)表配置選項:動態(tài)表選項允許用戶動態(tài)地指定或覆蓋表選項,這些選項可以在每個查詢的每個表范圍內(nèi)靈活地指定。

查詢提示(Query Hints)是SQL提示的一種,用于為優(yōu)化器修改執(zhí)行計劃提供建議,該修改只能在當前查詢提示所在的查詢塊中生效(Query block)。 目前查詢提示只支持聯(lián)接提示(Join Hints)。

  • 語法

    Flink中的查詢提示語法與Apache Calcite的語法一致。

    # Query hints:
    SELECT /*+ hint [, hint ] */ ...
    
    hint:
            hintName '(' hintOption [, hintOption ]* ')'
    
    hintOption:
            simpleIdentifier
        |   numericLiteral
        |   stringLiteral
  • 聯(lián)接提示

    聯(lián)接提示(Join Hints)是查詢提示的一種,該提示允許您動態(tài)的優(yōu)化Join,目前支持維表JOIN Hints雙流JOIN hints