哈希聚簇(Hash Clustering)表通過設置表的Shuffle和Sort屬性,進而MaxCompute根據數據已有的存儲特性,優化執行計劃,提高效率,節省資源消耗。本文為您介紹在MaxCompute中如何使用Hash Clustering表。

背景信息

在MaxCompute查詢中,連接(Join)表是很常見的場景。例如以下一個簡單的Inner Join表Query示例:將t1表和t2表通過id列連接起來。
SELECT t1.a, t2.b FROM t1 JOIN t2 ON t1.id = t2.id;
Join在MaxCompute內部主要有三種實現方法:
  • Broadcast Hash Join

    當Join表中存在一個很小的表時,MaxCompute采用此方式,即把小表廣播傳遞到所有的Join Task Instance上面,然后直接和大表做Hash Join。

  • Shuffle Hash Join

    如果Join表比較大,就不能直接廣播了。這時候把兩個表按照Join Key做Hash Shuffle,由于相同的鍵值Hash結果也是一樣的,這就保證了相同的Key的記錄會收集到同一個Join Task Instance上面。然后每個Instance對數據量小的一路建Hash表,數據量大的順序讀取Join。

  • Sort Merge Join
    如果Join的表數據更大一些,Shuffle Hash Join方法也用不了,因為內存已經不足以容納建立一個Hash Table。這時的實現方法是:先按照Join Key做Hash Shuffle,然后再按照Join Key做排序(Sort),最后對Join雙方做一個歸并,具體流程如下圖所示:流程實際上對于MaxCompute目前數據量和規模,絕大多數情況下都是使用的Sort Merge Join,但這其實是非常昂貴的操作。從上圖可以看到,Shuffle的時候需要一次計算,并且中間結果需要落盤,后續Reducer讀取的時候,又需要讀取和排序的過程。對于M個Mapper和R個Reducer的場景,將產生M x R次的IO讀取。對應的Fuxi物理執行計劃如下所示,需要兩個Mapper Stage,一個Join Stage,其中紅色部分為Shuffle和Sort操作:fuxiplan與此同時,有些Join是可能反復發生的,比如將Query改為:
    SELECT t1.c, t2.d FROM t1 JOIN t2 ON t1.id = t2.id;
    雖然選擇的列不一樣了,但是Join是完全一樣的,整個Shuffle和Sort的過程也是完全一樣的。
    又或者將Query改為:
    SELECT t1.c, t3.d FROM t1 JOIN t3 ON t1.id = t3.id;
    這個時候對表t1和t3來Join,但實際上對于t1而言,整個Shuffle和Sort過程還是完全一樣。
    于是,考慮如果初始表數據生成時,按照Hash Shuffle和Sort的方式存儲,那么后續查詢中將避免對數據的再次Shuffle和Sort。這樣做的好處是,雖然建表時付出了一次性的代價,卻節省了將來可能產生的反復的Shuffle和Join。這時Join的Fuxi物理執行計劃變成了如下,不僅節省了Shuffle和Sort的操作,并且查詢從3個Stage變成了1個Stage完成:hashshuffle

使用說明

創建Hash Clustering表

您可以使用以下語句創建Hash Clustering表。您需要指定Cluster Key(即Hash Key),以及Hash分片(Bucket)的數目。Sort是可選項,但在大多數情況下,建議和Cluster Key一致,以便取得最佳的優化效果。
  • 命令語法
    CREATE TABLE [IF NOT EXISTS] <table_name>
                 [(<col_name> <data_type> [comment <col_comment>], ...)]
                 [comment <table_comment>]
                 [PARTITIONED BY (<col_name> <data_type> [comment <col_comment>], ...)]
                 [CLUSTERED BY (<col_name> [, <col_name>, ...])
                 [SORTED BY (<col_name> [ASC | DESC] [, <col_name> [ASC | DESC] ...])]
                 INTO <number_of_buckets> BUCKETS] [AS <select_statement>]
  • 使用示例。
    • 非分區表。
      CREATE TABLE T1 (a string, b string, c bigint)
                   CLUSTERED BY (c)
                   SORTED by (c) INTO 1024 BUCKETS;
    • 分區表。
      CREATE TABLE T1 (a string, b string, c bigint)
             PARTITIONED BY (dt string)
             CLUSTERED BY (c)
             SORTED by (c) INTO 1024 BUCKETS;
  • 屬性說明
    • CLUSTERED BY

      指定Hash Key,MaxCompute將對指定列進行Hash運算,按照Hash值分散到各個Bucket里面。為避免數據傾斜,避免熱點,取得較好的并行執行效果,CLUSTERED BY列適宜選擇取值范圍大、重復鍵值少的列。此外為了達到Join優化的目的,也應該考慮選取常用的Join或Aggregation Key,即類似于傳統數據庫中的主鍵。

    • SORTED BY

      指定在Bucket內字段的排序方式,建議Sorted By和Clustered By一致,以取得較好的性能。此外當SORTED BY子句指定之后,MaxCompute將自動生成索引,并且在查詢的時候利用索引來加速執行。

    • INTO number_of_buckets BUCKETS

      指定哈希桶的數目,這個數字必須提供,由數據量大小來決定。Bucket越多并發度越大,Job整體運行時間越短,但同時如果Bucket太多的話,可能導致小文件太多,另外并發度過高也會造成CPU時間的增加。目前推薦設置讓每個Bucket數據大小在500MB ~ 1GB之間,如果是特別大的表,這個數值可以適當增加。對于Join優化的場景,兩個表的Join要去掉Shuffle和Sort步驟,要求哈希桶數目成倍數關系,比如256512。目前建議桶的數目統一使用2的N次冪,比如512、1024、2048、4096,這樣系統可以自動進行哈希桶的分裂和合并時,也可以去除Shuffle和Sort的步驟。

更改表的Hash Clustering屬性

對于分區表支持通過ALTER TABLE語句,來增加或者去除Hash Clustering屬性。
  • 命令語句
    --更改表為Hash Clustering表
    ALTER TABLE <table_name> [CLUSTERED BY (<col_name> [, <col_name>, ...])
                           [SORTED BY (<col_name> [ASC | DESC] [, <col_name> [ASC | DESC] ...])]
                           INTO <number_of_buckets> BUCKETS];
    --更改Hash Clustering表為非Hash Clustering表
    ALTER TABLE <table_name> NOT CLUSTERED;
  • 注意事項
    • ALTER TABLE語句改變聚集屬性,只對于分區表有效,非分區表一旦聚集屬性建立就無法改變。
    • ALTER TABLE語句只會影響分區表的新建分區(包括insert overwrite生成的),新分區將按新的聚集屬性存儲,老的數據分區保持不變。
    • 由于ALTER TABLE語句只影響新分區,所以該語句不可以再指定PARTITION。
ALTER TABLE語句適用于存量表,在增加了新的聚簇屬性之后,新的分區將做Hash Clustering存儲。

表屬性顯式驗證

在創建Hash Clustering Table之后,可以通過如下命令來查看表屬性,Hash Clustering屬性將顯示在Extended Info里面。
DESC EXTENDED <table_name>;
返回結果示例如下圖所示。表屬性驗證對于分區表,除了使用以上命令查看表屬性之后,還需要通過以下命令查看分區的屬性。
DESC EXTENDED <table_name> partition(<pt_spec>);
返回結果示例如下圖所示。分區表hash屬性驗證

Hash Clustering優勢

Bucket Pruning和Index優化

考慮以下查詢:
CREATE TABLE t1 (id bigint, 
                 a string, 
                 b string)
             CLUSTERED BY (id)
             SORTED BY (id) into 1000 BUCKETS; 
... 
SELECT t1.a, t1.b FROM t1 WHERE t1.id=12345;
對于普通表,這個通常意味著全表掃描操作,如果表非常大的情況下,資源消耗量是非常大的。但是因為我們已經對id做Hash Shuffle,并且對id做排序,查詢可以極大簡化:
  1. 通過查詢值12345找到對應的Hash Bucket,這時候我們只需要在1個Bucket里面掃描,而不是全部1000個Bucket里面掃描。稱之為Bucket Pruning。
  2. 因為Bucket內數據按id排序存放,MaxCompute會自動創建Index,利用Index lookup直接定位到相關記錄。
可以看出來,查詢不僅大大減少了Mapper的個數,并且由于利用了Index,Mapper可以直接定位到數據所在Page,加載讀入的數據量也大大減少。

例如一個大數據任務,一共起了1111個Mapper,讀取了427億條記錄,最后找符合條件記錄26條,總共耗時1分48秒。同樣的數據、同樣的查詢,使用Hash Clustering表來做,可以直接定位到單個Bucket,并利用Index只讀取包含查詢數據的Page,只用4個Mapper,讀取10000條記錄,總共耗時只需要6秒。

Aggregation優化

對于以下查詢:
SELECT department, SUM(salary) FROM employee GROUP BY (department);
通常情況下會對department列數據進行Shuffle和Sort,然后做Stream Aggregate,統計每一個departmentGroup。但是如果表數據已經CLUSTERED BY (department) SORTED BY (department),那么這個Shuffle和Sort的操作,也就相應節省掉了。

存儲優化

即便不考慮以上所述的各種計算上的優化,單單是把表Shuffle并排序存儲,都會對于存儲空間節省上有很大幫助。因為MaxCompute底層使用列存儲,通過排序,將鍵值相同或相近的記錄存放到一起,對于壓縮、編碼都會更加友好,從而使得壓縮效率更高。在實際測試中,某些極端情況下,排序存儲的表可以比無序表的存儲空間節省50%。對于生命周期很長的表,使用Hash Clustering存儲,是一個很值得的優化。

以下是一個簡單的實驗,使用TPC-H數據集中100GB的lineitem表,包含了intdoublestring等多種數據類型,在數據和壓縮方式等完全一樣的情況下,對比使用Hash Clustering和未使用Hash Clustering的表存儲大小,使用Hash Clustering的表存儲節省了約10%,如下圖所示。
  • 未使用Hash Clustering。存儲優化前
  • 使用Hash Clustering。存儲優化后

測試數據及分析

對于Hash Clustering整體帶來的性能收益,通過標準的TPC-H測試集進行衡量。測試使用1TB數據,統一使用500 Buckets,除了nationregion兩個極小的表以外,其余所有表均按照第一個列作為Cluster和Sort Key。整體測試結果表明,在使用了Hash Clustering之后,總CPU時間減少了約17.3%,總的Job運行時間減少了約12.8%

需要注意到是TPC-H里并不是所有的Query都可以利用到Clustering屬性,特別是兩個耗時最長的Query沒有辦法利用上,所以從總體上的效率提升并不是非常驚人。但如果單看可以利用上Clustering屬性的Query,收益還是非常明顯的,比如Q4快了約68%、Q12快了約62%、Q10快了約47%等。

以下是TPC-H Q4在普通表的Fuxi執行計劃:fuxiplan而下面則是使用Hash Clustering之后的執行計劃,可以看到,此DAG被大大簡化,這也是性能得到大幅提升的關鍵原因。優化后fuxiplan