本文介紹如何優化器和執行器如何處理聚合(Group-by)與排序(Order-by)算子,以達到減少數據傳輸量和提高執行效率的效果。

基本概念

聚合操作(Aggregate,簡稱Agg)語義為按照GROUP BY指定列對輸入數據進行聚合的計算,或者不分組、對所有數據進行聚合的計算。PolarDB-X 1.0支持如下聚合函數:

  • COUNT
  • SUM
  • AVG
  • MAX
  • MIN
  • BIT_OR
  • BIT_XOR
  • GROUP_CONCAT

排序操作(Sort)語義為按照指定的ORDER BY列對輸入進行排序。

本文介紹均為不下推的Agg或Sort的算子的實現。如果已被下推到LogicalView中,則由存儲層MySQL來選擇執行方式。

聚合(Agg)

聚合(Agg)由兩種主要的算子HashAgg和SortAgg實現。

HashAgg

HashAgg利用哈希表實現聚合:

  1. 根據輸入行的分組列的值,通過Hash找到對應的分組。
  2. 按照指定的聚合函數,對該行進行聚合計算。
  3. 重復以上步驟直到處理完所有的輸入行,最后輸出聚合結果。
> explain select count(*) from t1 join t2 on t1.id = t2.id group by t1.name,t2.name;

Project(count(*)="count(*)")
  HashAgg(group="name,name0", count(*)="COUNT()")
    BKAJoin(condition="id = id", type="inner")
      Gather(concurrent=true)
        LogicalView(tables="t1", shardCount=2, sql="SELECT `id`, `name` FROM `t1` AS `t1`")
      Gather(concurrent=true)
        LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `id`, `name` FROM `t2` AS `t2` WHERE (`id` IN ('?'))")

Explain結果中,HashAgg算子還包含以下關鍵信息:

  • group:表示GROUP BY字段,示例中為name,name0分別引用t1,t2表的name列,當存在相同別名會通過后綴數字區分 。
  • 聚合函數:等號(=) 前為聚合函數對應的輸出列名,其后為對應的計算方法。示例中 count(*)="COUNT()" ,第一個 count(*) 對應輸出的列名,隨后的COUNT()表示對其輸入數據進行計數。

HashAgg對應可以通過Hint來關閉:/*+TDDL:cmd_extra(ENABLE_HASH_AGG=false)*/

SortAgg

SortAgg在輸入數據已按分組列排序的情況,對各個分組依次完成聚合。

  1. 保證輸入按指定的分組列排序(例如,可能會看到 MergeSort 或 MemSort)。
  2. 逐行讀入輸入數據,如果分組與當前分組相同,則對其進行聚合計算。
  3. 否則,如果分組與當前分組不同,則輸出當前分組上的聚合結果。

相比 HashAgg,SortAgg 每次只要處理一個分組,內存消耗很小;相對的,HashAgg 需要把所有分組存儲在內存中,需要消耗較多的內存。

> explain select count(*) from t1 join t2 on t1.id = t2.id group by t1.name,t2.name order by t1.name, t2.name;

Project(count(*)="count(*)")
  MemSort(sort="name ASC,name0 ASC")
    HashAgg(group="name,name0", count(*)="COUNT()")
      BKAJoin(condition="id = id", type="inner")
        Gather(concurrent=true)
          LogicalView(tables="t1", shardCount=2, sql="SELECT `id`, `name` FROM `t1` AS `t1`")
        Gather(concurrent=true)
          LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `id`, `name` FROM `t2` AS `t2` WHERE (`id` IN ('?'))")

SortAgg對應可以通過Hint來關閉:/*+TDDL:cmd_extra(ENABLE_SORT_AGG=false)*/

兩階段聚合優化

兩階段聚合,即通過將Agg拆分為部分聚合(Partial Agg)和最終聚合(Final Agg)的兩個階段,先對部分結果集做聚合,然后將這些部分聚合結果匯總,得到整體聚合的結果。

例如下面的 SQL 中,HashAgg 中拆分出的部分聚合(PartialAgg)會被下推至MySQL上的各個分表,而其中的AVG函數也被拆分成 SUMCOUNT 以實現兩階段的計算:

> explain select avg(age) from t2 group by name

Project(avg(age)="sum_pushed_sum / sum_pushed_count")
  HashAgg(group="name", sum_pushed_sum="SUM(pushed_sum)", sum_pushed_count="SUM(pushed_count)")
    Gather(concurrent=true)
      LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `name`, SUM(`age`) AS `pushed_sum`, COUNT(`age`) AS `pushed_count` FROM `t2` AS `t2` GROUP BY `name`")

兩階段聚合的優化能大大減少數據傳輸量、提高執行效率。

排序(Sort)

PolarDB-X 1.0中的排序算子主要包括 MemSort、TopN,以及 MergeSort。

  • MemSort

    PolarDB-X 1.0中的通用的排序實現為MemSort算子,即內存中運行快速排序(Quick Sort)算法。

    下面是一個用到MemSort算子的例子:

    > explain select t1.name from t1 join t2 on t1.id = t2.id order by t1.name,t2.name;
    
    Project(name="name")
      MemSort(sort="name ASC,name0 ASC")
        Project(name="name", name0="name0")
          BKAJoin(condition="id = id", type="inner")
            Gather(concurrent=true)
              LogicalView(tables="t1", shardCount=2, sql="SELECT `id`, `name` FROM `t1` AS `t1`")
            Gather(concurrent=true)
              LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `id`, `name` FROM `t2` AS `t2` WHERE (`id` IN ('?'))")
  • TopN

    當SQL中ORDER BY和LIMIT一起出現時,Sort算子和Limit算子會合并成TopN算子。

    TopN算子維護一個最大或最小堆,按照排序鍵的值,堆中始終保留最大或最小的N行數據。當處理完全部的輸入數據時,堆中留下的N個行(或小于N個)就是需要的結果。

    下面是一個用到 TopN 算子的例子:

    > explain select t1.name from t1 join t2 on t1.id = t2.id order by t1.name,t2.name limit 10;
    
    Project(name="name")
      TopN(sort="name ASC,name0 ASC", offset=0, fetch=?0)
        Project(name="name", name0="name0")
          BKAJoin(condition="id = id", type="inner")
            Gather(concurrent=true)
              LogicalView(tables="t1", shardCount=2, sql="SELECT `id`, `name` FROM `t1` AS `t1`")
            Gather(concurrent=true)
              LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `id`, `name` FROM `t2` AS `t2` WHERE (`id` IN ('?'))")
  • MergeSort

    通常,只要語義允許,SQL中的排序操作會被下推到MySQL上執行,而PolarDB-X 1.0執行層只做最后的歸并操作,即MergeSort。嚴格來說,MergeSort 不僅僅是排序,更是一種數據重分布算子(類似 Gather)。

    下面的SQL是對t1表進行排序,經過PolarDB-X 1.0查詢優化器的優化,Sort算子被下推至各個MySQL分片中執行,最終只在上層做歸并操作。

    > explain select name from t1 order by name;
    
    MergeSort(sort="name ASC")
      LogicalView(tables="t1", shardCount=2, sql="SELECT `name` FROM `t1` AS `t1` ORDER BY `name`")

    相比 MemSort,MergeSort 算法可以減少PolarDB-X 1.0層的內存消耗,并充分利用 MySQL 層的計算能力。

優化組合的例子

下面是一個組合優化的例子,在這個例子中,用到了以下優化規則:

  • Agg下推穿過Join
  • Join算法選擇為SortMergeJoin
  • Agg算法選擇為SortAgg
  • SortMergeJoin中需要的排序利用了SortAgg輸出的有序
  • 兩階段Agg
  • Agg下推
  • Sort下推
>  explain select count(*) from t1 join t2 on t1.name = t2.name group by t1.name;

Project(count(*)="count(*) * count(*)0")
  SortMergeJoin(condition="name = name", type="inner")
    SortAgg(group="name", count(*)="SUM(count(*))")
      MergeSort(sort="name ASC")
        LogicalView(tables="t1", shardCount=2, sql="SELECT `name`, COUNT(*) AS `count(*)` FROM `t1` AS `t1` GROUP BY `name` ORDER BY `name`")
    SortAgg(group="name", count(*)="SUM(count(*))")
      MergeSort(sort="name ASC")
        LogicalView(tables="t2_[0-3]", shardCount=4, sql="SELECT `name`, COUNT(*) AS `count(*)` FROM `t2` AS `t2` GROUP BY `name` ORDER BY `name`")