哈希聚簇(Hash Clustering)表通過設置表的Shuffle和Sort屬性,進而MaxCompute根據數據已有的存儲特性,優化執行計劃,提高效率,節省資源消耗。本文為您介紹在MaxCompute中如何使用Hash Clustering表。
背景信息
id
列連接起來。SELECT t1.a, t2.b FROM t1 JOIN t2 ON t1.id = t2.id;
Join在MaxCompute內部主要有三種實現方法:- Broadcast Hash Join
當Join表中存在一個很小的表時,MaxCompute采用此方式,即把小表廣播傳遞到所有的Join Task Instance上面,然后直接和大表做Hash Join。
- Shuffle Hash Join
如果Join表比較大,就不能直接廣播了。這時候把兩個表按照Join Key做Hash Shuffle,由于相同的鍵值Hash結果也是一樣的,這就保證了相同的Key的記錄會收集到同一個Join Task Instance上面。然后每個Instance對數據量小的一路建Hash表,數據量大的順序讀取Join。
- Sort Merge Join如果Join的表數據更大一些,Shuffle Hash Join方法也用不了,因為內存已經不足以容納建立一個Hash Table。這時的實現方法是:先按照Join Key做Hash Shuffle,然后再按照Join Key做排序(Sort),最后對Join雙方做一個歸并,具體流程如下圖所示:實際上對于MaxCompute目前數據量和規模,絕大多數情況下都是使用的Sort Merge Join,但這其實是非常昂貴的操作。從上圖可以看到,Shuffle的時候需要一次計算,并且中間結果需要落盤,后續Reducer讀取的時候,又需要讀取和排序的過程。對于
M
個Mapper和R
個Reducer的場景,將產生M x R
次的IO讀取。對應的Fuxi物理執行計劃如下所示,需要兩個Mapper Stage,一個Join Stage,其中紅色部分為Shuffle和Sort操作:與此同時,有些Join是可能反復發生的,比如將Query改為:
雖然選擇的列不一樣了,但是Join是完全一樣的,整個Shuffle和Sort的過程也是完全一樣的。SELECT t1.c, t2.d FROM t1 JOIN t2 ON t1.id = t2.id;
又或者將Query改為:
這個時候對表t1和t3來Join,但實際上對于t1而言,整個Shuffle和Sort過程還是完全一樣。SELECT t1.c, t3.d FROM t1 JOIN t3 ON t1.id = t3.id;
于是,考慮如果初始表數據生成時,按照Hash Shuffle和Sort的方式存儲,那么后續查詢中將避免對數據的再次Shuffle和Sort。這樣做的好處是,雖然建表時付出了一次性的代價,卻節省了將來可能產生的反復的Shuffle和Join。這時Join的Fuxi物理執行計劃變成了如下,不僅節省了Shuffle和Sort的操作,并且查詢從3個Stage變成了1個Stage完成:
使用說明
創建Hash Clustering表
- 命令語法
CREATE TABLE [IF NOT EXISTS] <table_name> [(<col_name> <data_type> [comment <col_comment>], ...)] [comment <table_comment>] [PARTITIONED BY (<col_name> <data_type> [comment <col_comment>], ...)] [CLUSTERED BY (<col_name> [, <col_name>, ...]) [SORTED BY (<col_name> [ASC | DESC] [, <col_name> [ASC | DESC] ...])] INTO <number_of_buckets> BUCKETS] [AS <select_statement>]
- 使用示例。
- 非分區表。
CREATE TABLE T1 (a string, b string, c bigint) CLUSTERED BY (c) SORTED by (c) INTO 1024 BUCKETS;
- 分區表。
CREATE TABLE T1 (a string, b string, c bigint) PARTITIONED BY (dt string) CLUSTERED BY (c) SORTED by (c) INTO 1024 BUCKETS;
- 非分區表。
- 屬性說明
- CLUSTERED BY
指定Hash Key,MaxCompute將對指定列進行Hash運算,按照Hash值分散到各個Bucket里面。為避免數據傾斜,避免熱點,取得較好的并行執行效果,CLUSTERED BY列適宜選擇取值范圍大、重復鍵值少的列。此外為了達到Join優化的目的,也應該考慮選取常用的Join或Aggregation Key,即類似于傳統數據庫中的主鍵。
- SORTED BY
指定在Bucket內字段的排序方式,建議Sorted By和Clustered By一致,以取得較好的性能。此外當SORTED BY子句指定之后,MaxCompute將自動生成索引,并且在查詢的時候利用索引來加速執行。
- INTO number_of_buckets BUCKETS
指定哈希桶的數目,這個數字必須提供,由數據量大小來決定。Bucket越多并發度越大,Job整體運行時間越短,但同時如果Bucket太多的話,可能導致小文件太多,另外并發度過高也會造成CPU時間的增加。目前推薦設置讓每個Bucket數據大小在500MB ~ 1GB之間,如果是特別大的表,這個數值可以適當增加。對于Join優化的場景,兩個表的Join要去掉Shuffle和Sort步驟,要求哈希桶數目成倍數關系,比如
256
和512
。目前建議桶的數目統一使用2的N次冪,比如512、1024、2048、4096,這樣系統可以自動進行哈希桶的分裂和合并時,也可以去除Shuffle和Sort的步驟。
- CLUSTERED BY
更改表的Hash Clustering屬性
- 命令語句
--更改表為Hash Clustering表 ALTER TABLE <table_name> [CLUSTERED BY (<col_name> [, <col_name>, ...]) [SORTED BY (<col_name> [ASC | DESC] [, <col_name> [ASC | DESC] ...])] INTO <number_of_buckets> BUCKETS]; --更改Hash Clustering表為非Hash Clustering表 ALTER TABLE <table_name> NOT CLUSTERED;
- 注意事項
- ALTER TABLE語句改變聚集屬性,只對于分區表有效,非分區表一旦聚集屬性建立就無法改變。
- ALTER TABLE語句只會影響分區表的新建分區(包括insert overwrite生成的),新分區將按新的聚集屬性存儲,老的數據分區保持不變。
- 由于ALTER TABLE語句只影響新分區,所以該語句不可以再指定PARTITION。
表屬性顯式驗證
Extended Info
里面。DESC EXTENDED <table_name>;
返回結果示例如下圖所示。對于分區表,除了使用以上命令查看表屬性之后,還需要通過以下命令查看分區的屬性。DESC EXTENDED <table_name> partition(<pt_spec>);
返回結果示例如下圖所示。Hash Clustering優勢
Bucket Pruning和Index優化
CREATE TABLE t1 (id bigint,
a string,
b string)
CLUSTERED BY (id)
SORTED BY (id) into 1000 BUCKETS;
...
SELECT t1.a, t1.b FROM t1 WHERE t1.id=12345;
對于普通表,這個通常意味著全表掃描操作,如果表非常大的情況下,資源消耗量是非常大的。但是因為我們已經對id
做Hash Shuffle,并且對id
做排序,查詢可以極大簡化:- 通過查詢值
12345
找到對應的Hash Bucket,這時候我們只需要在1個Bucket里面掃描,而不是全部1000個Bucket里面掃描。稱之為Bucket Pruning。 - 因為Bucket內數據按
id
排序存放,MaxCompute會自動創建Index,利用Index lookup直接定位到相關記錄。
例如一個大數據任務,一共起了1111個Mapper,讀取了427億條記錄,最后找符合條件記錄26條,總共耗時1分48秒。同樣的數據、同樣的查詢,使用Hash Clustering表來做,可以直接定位到單個Bucket,并利用Index只讀取包含查詢數據的Page,只用4個Mapper,讀取10000條記錄,總共耗時只需要6秒。
Aggregation優化
SELECT department, SUM(salary) FROM employee GROUP BY (department);
通常情況下會對department
列數據進行Shuffle和Sort,然后做Stream Aggregate,統計每一個department
Group。但是如果表數據已經CLUSTERED BY (department) SORTED BY (department),那么這個Shuffle和Sort的操作,也就相應節省掉了。存儲優化
即便不考慮以上所述的各種計算上的優化,單單是把表Shuffle并排序存儲,都會對于存儲空間節省上有很大幫助。因為MaxCompute底層使用列存儲,通過排序,將鍵值相同或相近的記錄存放到一起,對于壓縮、編碼都會更加友好,從而使得壓縮效率更高。在實際測試中,某些極端情況下,排序存儲的表可以比無序表的存儲空間節省50%。對于生命周期很長的表,使用Hash Clustering存儲,是一個很值得的優化。
lineitem
表,包含了int
、double
、string
等多種數據類型,在數據和壓縮方式等完全一樣的情況下,對比使用Hash Clustering和未使用Hash Clustering的表存儲大小,使用Hash Clustering的表存儲節省了約10%
,如下圖所示。- 未使用Hash Clustering。
- 使用Hash Clustering。
測試數據及分析
對于Hash Clustering整體帶來的性能收益,通過標準的TPC-H測試集進行衡量。測試使用1TB數據,統一使用500 Buckets,除了nation
和region
兩個極小的表以外,其余所有表均按照第一個列作為Cluster和Sort Key。整體測試結果表明,在使用了Hash Clustering之后,總CPU時間減少了約17.3%
,總的Job運行時間減少了約12.8%
。
需要注意到是TPC-H里并不是所有的Query都可以利用到Clustering屬性,特別是兩個耗時最長的Query沒有辦法利用上,所以從總體上的效率提升并不是非常驚人。但如果單看可以利用上Clustering屬性的Query,收益還是非常明顯的,比如Q4快了約68%
、Q12快了約62%
、Q10快了約47%
等。