您可以通過調整內存、CPU和Task個數等,實現對Hive作業的調優。本文為您介紹如何調優Hive作業。
作業調優方案
作業調優方向 | 調優方案 |
參數調優 | |
代碼優化 |
代碼優化
數據清洗
讀取表時分區過濾,避免全表掃描。
數據過濾之后再JOIN。
重復使用數據時,避免重復計算,構建中間表,重復使用中間表。
多Distinct優化
優化前代碼
多個Distinct時,數據會出現膨脹。
select k,count(distinct case when a > 1 then user_id end) user1, count(distinct case when a > 2 then user_id end) user2, count(distinct case when a > 3 then user_id end) user3, count(distinct case when a > 4 then user_id end) user4 from t group by k;
優化后代碼
通過兩次Group By的方式代替Distinct操作,通過內層的Group By去重并降低數據量,通過外層的Group By取sum,即可實現Distinct的效果。
select k,sum(case when user1 > 0 then 1 else 0 end) as user1, sum(case when user2 > 0 then 1 else 0 end) as user2, sum(case when user3 > 0 then 1 else 0 end) as user3, sum(case when user4 > 0 then 1 else 0 end) as user4 from (select k,user_id,count(case when a > 1 then user_id end) user1, count(case when a > 2 then user_id end) user2, count(case when a > 3 then user_id end) user3, count(case when a > 4 then user_id end) user4 from t group by k,user_id ) tmp group by k;
數據傾斜
熱點Key處理方式如下:
如果是Group By出現熱點,請按照以下方法操作:
先開啟Map端聚合。
set hive.map.aggr=true; set hive.groupby.mapaggr.checkinterval=100000;(用于設定Map端進行聚合操作的條目數)
可以對Key隨機化打散,多次聚合,或者直接設置。
set hive.groupby.skewindata=true;
當選項設定為true時,生成的查詢計劃有兩個MapReduce任務。在第一個MapReduce中,Map的輸出結果集合會隨機分布到Reduce中, 每個部分進行聚合操作,并輸出結果。這樣處理的結果是,相同的Group By Key有可能分發到不同的Reduce中,從而達到負載均衡的目的;第二個MapReduce任務再根據預處理的數據結果按照Group By Key分布到Reduce中(這個過程可以保證相同的Group By Key分布到同一個Reduce中),最后完成最終的聚合操作。
如果兩個大表進行JOIN操作時,出現熱點,則使用熱點Key隨機化。
例如,log表存在大量user_id為null的記錄,但是表bmw_users中不會存在user_id為空,則可以把null隨機化再關聯,這樣就避免null值都分發到一個Reduce Task上。代碼示例如下。
SELECT * FROM log a LEFT OUTER JOIN bmw_users b ON CASE WHEN a.user_id IS NULL THEN CONCAT('dp_hive',RAND()) ELSE a.user_id=b.user_id END;
如果大表和小表進行JOIN操作時,出現熱點,則使用MAP JOIN。
內存參數
您可以通過設置以下參數,對Map和Reduce階段的內存進行調優:
Map階段
參數
描述
示例
mapreduce.map.java.opts
默認參數,表示JVM堆內存。
-Xmx2048m
mapreduce.map.memory.mb
默認參數,表示整個JVM進程占用的內存,計算方法為
堆內存+堆外內存=2048+256
。2304
Reduce階段
參數
描述
示例
mapreduce.reduce.java.opts
默認參數,表示JVM堆內存。
-Xmx2048m
mapreduce.reduce.memory.mb
默認參數,表示整個JVM進程占用的內存,計算方法為
堆內存+堆外內存=2048+256
。2304
CPU參數
您可以通過設置以下參數,對Map和Reduce任務的CPU進行調優。
參數 | 描述 |
mapreduce.map.cpu.vcores | 每個Map任務可用的最多的CPU Core數目。 |
mapreduce.reduce.cpu.vcores | 每個Reduce任務可用的最多的CPU Core數目。 說明 此設置在公平隊列是不生效的,通常vCores用于較大的集群,以限制不同用戶或應用程序的CPU。 |
Task數量優化
Map Task數量優化
在分布式計算系統中,決定Map數量的一個因素就是原始數據,在不加干預的情況下,原始數據有多少個塊,就可能有多少個起始的Task,因為每個Task對應讀取一個塊的數據;當然這個也不是絕對的,當文件數量特別多,并且每個文件的大小特別小時,您就可以限制減少初始Map對相應的Task的數量,以減少計算資源的浪費,如果文件數量較少,但是單個文件較大,您可以增加Map的Task的數量,以減小單個Task的壓力。
通常,Map Task數量是由mapred.map.tasks、mapred.min.split.size和dfs.block.size決定的。
Hive的文件基本上都是存儲在HDFS上,而HDFS上的文件,都是分塊的,所以具體的Hive數據文件在HDFS上分多少塊,可能對應的是默認Hive起始的Task的數量,使用default_mapper_num參數表示。使用數據總大小除以dfs默認的最大塊大小來決定初始默認數據分區數。
初始默認的Map Task數量,具體公式如下。
default_mapper_num = total_size/dfs.block.size
計算Split的size,具體公式如下。
default_split_size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))
上述公式中的mapred.min.split.size和mapred.max.split.size,分別為Hive計算的時Split的最小值和最大值。
將數據按照計算出來的size劃分為數據塊,具體公式如下。
split_num = total_size/default_split_size;
計算的Map Task數量,具體公式如下。
map_task_num = min(split_num, max(mapred.map.tasks, default_mapper_num))
從上面的過程來看,Task的數量受各個方面的限制,不至于Task的數量太多,也不至于Task的數量太少。如果需要提高Task數量,就要降低mapred.min.split.size的數值,在一定的范圍內可以減小default_split_size的數值,從而增加split_num的數量,也可以增大mapred.map.tasks的數量。
重要Hive on TEZ和Hive on MR使用是有差異的。例如,在Hive中執行一個Query時,可以發現Hive的執行引擎在使用Tez與MR時,兩者生成的mapper數量差異較大。主要原因在于Tez中對inputSplit做了grouping操作,可以將多個inputSplit組合成更少的groups,然后為每個group生成一個mapper任務,而不是為每個inputSplit生成一個mapper任務。
Reduce Task數量優化
通過hive.exec.reducers.bytes.per.reducer參數控制單個Reduce處理的字節數。
Reduce的計算方法如下。
reducer_num = min(total_size/hive.exec.reducers.bytes.per.reducers, hive.exec.reducers.max)。
通過mapred.reduce.tasks參數來設置Reduce Task的數量。
說明在TEZ引擎模式下,通過命令
set hive.tez.auto.reducer.parallelism = true;
,TEZ將會根據vertice的輸出大小動態預估調整Reduce Task的數量。同Map一樣,啟動和初始化Reduce也會消耗時間和資源。另外,有多少個Reduce,就會有多少個輸出文件,如果生成了很多個小文件,并且這些小文件作為下一個任務的輸入,則會出現小文件過多的問題。
并行運行
并行運行表示同步執行Hive的多個階段。Hive在執行過程中,將一個查詢轉化成一個或者多個階段。某個特定的Job可能包含多個階段,而這些階段可能并非完全相互依賴的,也就是可以并行運行的,這樣可以使得整個Job的運行時間縮短。
您可以通過設置以下參數,控制不同的作業是否可以同時運行。
參數 | 描述 |
hive.exec.parallel | 默認值為false。設置true時,表示允許任務并行運行。 |
hive.exec.parallel.thread.number | 默認值為8。表示允許同時運行線程的最大值。 |
Fetch task
您可以通過設置以下參數,在執行查詢等語句時,不執行MapReduce程序,以減少等待時間。
參數 | 描述 |
hive.fetch.task.conversion | 默認值為none。參數取值如下:
|
開啟向量化
您可以通過設置以下參數,在執行查詢時啟用向量化執行,以提升查詢性能。
參數 | 描述 |
hive.vectorized.execution.enabled | 默認值為true。開啟向量化查詢的開關。 |
hive.vectorized.execution.reduce.enabled | 默認值為true。表示是否啟用Reduce任務的向量化執行模式。 |
合并小文件
大量小文件容易在文件存儲端造成瓶頸,影響處理效率。對此,您可以通過合并Map和Reduce的結果文件來處理。
您可以通過設置以下參數,合并小文件。
參數 | 描述 |
hive.merge.mapfiles | 默認值為true。表示是否合并Map輸出文件。 |
hive.merge.mapredfiles | 默認值為false。表示是否合并Reduce輸出文件。 |
hive.merge.size.per.task | 默認值為256000000,單位字節。表示合并文件的大小。 |