Celeborn是一個處理中間數據的服務,能夠提升大數據引擎的穩定性、靈活性和性能。本文為您介紹如何使用Celeborn服務。
背景信息
目前Shuffle方案的缺點如下:
Shuffle Write在大數據量場景下會溢出,導致寫放大。
Shuffle Read過程中存在大量的網絡小包導致的Connection reset問題。
Shuffle Read過程中存在大量小數據量的IO請求和隨機讀,對磁盤和CPU造成高負載。
對于M*N次的連接數,在M和N數千的規模下,作業基本無法完成。
NodeManager和Spark Shuffle Service是同一進程,當Shuffle的數據量特別大時,通常會導致NodeManager重啟,從而影響YARN調度的穩定性。
Celeborn服務可以優化目前Shuffle方案的問題。Celeborn優勢如下:
使用Push-Style Shuffle代替Pull-Style,減少Mapper的內存壓力。
支持IO聚合,Shuffle Read的連接數從M*N降到N,同時將隨機讀更改為順序讀。
支持兩副本機制,降低Fetch Fail概率。
支持計算與存儲分離架構,可以部署Shuffle Service至特殊硬件環境中,與計算集群分離。
解決Spark on Kubernetes時對本地磁盤的依賴。
Celeborn設計架構圖如下。
前提條件
已創建E-MapReduce的DataLake集群或自定義集群,并選擇Celeborn服務。創建集群詳情請參見創建集群。
使用限制
此文檔僅適用于以下版本的集群。
集群 | 版本 |
DataLake集群 | EMR-3.45.0及后續版本,EMR-5.11.0及后續版本。 |
自定義集群 | EMR-3.45.0及后續版本,EMR-5.11.0及后續版本。 |
操作步驟
Spark配置
參數 | 描述 |
spark.shuffle.manager |
|
spark.serializer | 固定值為org.apache.spark.serializer.KryoSerializer。 |
spark.celeborn.push.replicate.enabled | 是否開啟兩副本。取值如下:
|
spark.shuffle.service.enabled | 需修改為false,才會使用Celeborn。 使用Celeborn時需要關閉原有的External Shuffle Service。在使用Celeborn的情況下,是可以正常使用Spark的Dynamic Allocation的。 說明
|
spark.celeborn.shuffle.writer | Celeborn的wirter支持的模式:
|
spark.celeborn.master.endpoints | 填寫格式<celeborn-master-ip>:<celeborn-master-port>。 涉及參數如下:
高可用集群時配置所有Master節點的IP地址。 |
spark.sql.adaptive.enabled | Celeborn支持Adaptive Execution,關閉Local Shuffle Reader可以獲得最佳的Shuffle性能。 各參數值需要修改為true、false和true。 |
spark.sql.adaptive.localShuffleReader.enabled | |
spark.sql.adaptive.skewJoin.enabled |
Spark服務支持一鍵配置使用Celeborn服務。
EMR-5.11.1及之后版本,EMR-3.45.1及之后版本
可以在Spark服務狀態頁面的服務概述區域,打開或關閉enableCeleborn開關。
EMR-5.11.0版本,EMR-3.45.0版本
可以在Spark服務狀態頁面的組件列表區域,選擇SparkThriftServer操作列的spark-defaults.conf和spark-thriftserver.conf兩個配置文件。
或 。選擇后會自動修改上文表格中的Spark配置項并重啟SparkThriftServer,同時會修改選擇
,所有的Spark任務都使用Celeborn服務。選擇
,所有的Spark任務都不使用Celeborn服務。
Celeborn配置
您可以在Celeborn服務配置頁面,修改或查看Celeborn所有的配置項。
針對不同的節點組(例如CORE或TASK)各配置項的值是不同的。
參數 | 描述 | 默認值 |
celeborn.worker.flusher.threads | 磁盤(HDD或者SSD)的刷盤線程數。 |
|
CELEBORN_WORKER_OFFHEAP_MEMORY | Worker堆外內存大小。 | 根據集群配置自動計算。 |
celeborn.application.heartbeat.timeout | Application心跳超時時間,超時會清理Application相關資源。 | 120s |
celeborn.worker.flusher.buffer.size | Flush buffer大小,超過最大值會觸發刷盤。 | 256K |
celeborn.metrics.enabled | 是否打開監控。取值如下:
| true |
CELEBORN_WORKER_MEMORY | Worker堆內內存大小。 | 1g |
CELEBORN_MASTER_MEMORY | Master堆內內存大小。 | 2g |
重啟Celeborn組件
在Celeborn服務的狀態頁面,選擇CelebornMaster組件操作列的
。說明如果是非高可用集群,您也可以單擊CelebornMaster組件操作列的重啟。
在彈出的對話框中,關閉滾動執行開關,輸入執行原因,單擊確定。
在彈出的對話框中,單擊確定。