Apache Celeborn是一個專門用于處理大數據計算引擎中間數據(如Shuffle數據和溢寫數據)的服務,能夠提升大數據引擎的性能、穩定性和靈活性。Remote Shuffle Service(RSS)用于高效處理大規模數據集的Shuffle過程。本文介紹如何在ACK集群中部署Celeborn組件,并在Spark作業中使用Celeborn作為Remote Shuffle Service(RSS)。
在Spark作業中使用Celeborn的優勢
對于MapReduce、Spark和Flink等大數據處理框架,使用Celeborn作為RSS具有如下優勢:
推送式Shuffle寫入(Push-based shuffle write):Mapper節點不需要將數據存儲在本地磁盤,適合云端存算分離架構。
合并式Shuffle讀取(Merge-based shuffle read):數據在Worker節點進行合并,而非在Reducer節點,避免小文件的隨機讀寫及小數據量傳輸帶來的網絡開銷,提升數據處理效率。
高可用性:Celeborn的Master節點基于Raft協議實現高可用性,確保系統的穩定運行。
高容錯性:支持雙副本機制,顯著降低Fetch失敗的概率。
前提條件
已部署ack-spark-operator組件,請參見部署ack-spark-operator組件。
已通過kubectl工具連接集群。具體操作,請參見獲取集群KubeConfig并通過kubectl工具連接集群。
已創建OSS存儲空間。具體操作請參見創建存儲空間。
已安裝ossutil并配置ossutil。關于ossutil命令參考請參見命令行工具ossutil命令參考。
根據如下集群環境配置創建節點池。
集群環境
本示例中使用的ACK集群環境信息如下所示。
Master進程部署到節點池celeborn-master中,配置如下:
節點池名稱:celeborn-master
節點數:3
ECS實例規格類型:g8i.2xlarge
標簽:celeborn.apache.org/role=master
污點:celeborn.apache.org/role=master:NoSchedule
單節點數據存儲:/mnt/celeborn_ratis(1024GB)
Worker進程部署到節點池celeborn-worker中,配置如下:
節點池名稱:celeborn-worker
節點數:5
ECS實例規格類型:g8i.4xlarge
標簽:celeborn.apache.org/role=worker
污點:celeborn.apache.org/role=worker:NoSchedule
單節點數據存儲:
/mnt/disk1(1024GB)
/mnt/disk2(1024GB)
/mnt/disk3(1024GB)
/mnt/disk4(1024GB)
流程概述
本文將引導您完成以下步驟,幫助您了解如何在ACK集群中部署Celeborn。
構建Celeborn容器鏡像
根據所需的Celeborn版本下載相應的發行版,然后構建容器鏡像并將其推送至您的鏡像倉庫,以供部署ack-celeborn組件時使用。
部署ack-celeborn組件
通過ACK應用市場提供的ack-celeborn Helm Chart,使用已構建的Celeborn容器鏡像,一鍵部署Celeborn集群。
構建Spark容器鏡像
構建包含了Celeborn和訪問OSS相關Jar包依賴的Spark容器鏡像,并推送到您的鏡像倉庫中。
準備測試數據并上傳至OSS
生成PageRank作業的測試數據集并將其上傳至OSS。
運行示例Spark作業
運行示例PageRank作業并配置使用Celeborn作為RSS。
(可選)環境清理
體驗完本教程后,清理無需使用的Spark作業和資源,避免產生額外的費用。
步驟一:構建Celeborn容器鏡像
根據您所使用的Celeborn版本,從Celeborn 官網下載相應的發行版(如0.5.2版本)。在配置過程中,將<IMAGE-REGISTRY>
和<IMAGE-REPOSITORY>
替換為您自己的鏡像倉庫和鏡像名稱。同時,您可以通過修改PLATFORMS
變量來配置所需的鏡像架構。更多信息,請參見Deploy Celeborn on Kubernetes。docker buildx
命令需要Docker版本19.03或更高版本支持,升級詳情請參見安裝Docker。
CELEBORN_VERSION=0.5.2 # Celeborn版本號。
IMAGE_REGISTRY=<IMAGE-REGISTRY> # 鏡像倉庫,例如docker.io。
IMAGE_REPOSITORY=<IMAGE-REPOSITORY> # 鏡像名稱,例如apache/celeborn。
IMAGE_TAG=${CELEBORN_VERSION} # 鏡像標簽,這里使用Celeborn版本號作為標簽。
# 下載。
wget https://downloads.apache.org/celeborn/celeborn-${CELEBORN_VERSION}/apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# 解壓。
tar -zxvf apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# 切換工作目錄。
cd apache-celeborn-${CELEBORN_VERSION}-bin
# 使用Docker Buildkit構建鏡像并推送到鏡像倉庫中。
docker buildx build \
--output=type=registry \
--push \
--platform=${PLATFORMS} \
--tag=${IMAGE_REGISTRY}/${IMAGE_REPOSITORY}:${IMAGE_TAG} \
-f docker/Dockerfile \
.
步驟二:部署ack-celeborn組件
登錄容器服務管理控制臺,在左側導航欄選擇 。
在應用市場頁面,單擊應用目錄頁簽,然后搜索并選中ack-celeborn,然后在ack-celeborn頁面,單擊一鍵部署。
在創建面板中,選擇集群和命名空間,然后單擊下一步。
在參數配置頁面,設置相應參數,然后單擊確定。
image: # 需替換成步驟一中構建得到的Celeborn鏡像地址。 registry: docker.io # 鏡像倉庫。 repository: apache/celeborn # 鏡像名稱。 tag: 0.5.2 # 鏡像標簽。 celeborn: celeborn.client.push.stageEnd.timeout: 120s celeborn.master.ha.enabled: true celeborn.master.ha.ratis.raft.server.storage.dir: /mnt/celeborn_ratis celeborn.master.heartbeat.application.timeout: 300s celeborn.master.heartbeat.worker.timeout: 120s celeborn.master.http.port: 9098 celeborn.metrics.enabled: true celeborn.metrics.prometheus.path: /metrics/prometheus celeborn.rpc.dispatcher.numThreads: 4 celeborn.rpc.io.clientThreads: 64 celeborn.rpc.io.numConnectionsPerPeer: 2 celeborn.rpc.io.serverThreads: 64 celeborn.shuffle.chunk.size: 8m celeborn.worker.fetch.io.threads: 32 celeborn.worker.flusher.buffer.size: 256K celeborn.worker.http.port: 9096 celeborn.worker.monitor.disk.enabled: false celeborn.worker.push.io.threads: 32 celeborn.worker.storage.dirs: /mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi master: replicas: 3 env: - name: CELEBORN_MASTER_MEMORY value: 28g - name: CELEBORN_MASTER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-master.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: celeborn-ratis mountPath: /mnt/celeborn_ratis resources: requests: cpu: 7 memory: 28Gi limits: cpu: 7 memory: 28Gi volumes: - name: celeborn-ratis hostPath: path: /mnt/celeborn_ratis type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: master tolerations: - key: celeborn.apache.org/role operator: Equal value: master effect: NoSchedule worker: replicas: 5 env: - name: CELEBORN_WORKER_MEMORY value: 28g - name: CELEBORN_WORKER_OFFHEAP_MEMORY value: 28g - name: CELEBORN_WORKER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-worker.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: disk1 mountPath: /mnt/disk1 - name: disk2 mountPath: /mnt/disk2 - name: disk3 mountPath: /mnt/disk3 - name: disk4 mountPath: /mnt/disk4 resources: requests: cpu: 14 memory: 56Gi limits: cpu: 14 memory: 56Gi volumes: - name: disk1 hostPath: path: /mnt/disk1 type: DirectoryOrCreate - name: disk2 hostPath: path: /mnt/disk2 type: DirectoryOrCreate - name: disk3 hostPath: path: /mnt/disk3 type: DirectoryOrCreate - name: disk4 hostPath: path: /mnt/disk4 type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: worker tolerations: - key: celeborn.apache.org/role operator: Equal value: worker effect: NoSchedule
下表列出了部分配置參數的說明。完整的參數配置詳情,您可以在ack-celeborn頁面中的配置項查看。
參數
描述
示例值
image.registry
鏡像倉庫地址。
"
docker.io"
image.repository
鏡像名稱。
"apache/celeborn"
image.tag
鏡像標簽。
"0.5.1"
image.pullPolicy
鏡像拉取策略。
"IfNotPresent"
celeborn
Celeoborn配置項。
{ "celeborn.client.push.stageEnd.timeout": "120s", "celeborn.master.ha.enabled": true, "celeborn.master.ha.ratis.raft.server.storage.dir": "/mnt/celeborn_ratis", "celeborn.master.heartbeat.application.timeout": "300s", "celeborn.master.heartbeat.worker.timeout": "120s", "celeborn.master.http.port": 9098, "celeborn.metrics.enabled": true, "celeborn.metrics.prometheus.path": "/metrics/prometheus", "celeborn.rpc.dispatcher.numThreads": 4, "celeborn.rpc.io.clientThreads": 64, "celeborn.rpc.io.numConnectionsPerPeer": 2, "celeborn.rpc.io.serverThreads": 64, "celeborn.shuffle.chunk.size": "8m", "celeborn.worker.fetch.io.threads": 32, "celeborn.worker.flusher.buffer.size": "256K", "celeborn.worker.http.port": 9096, "celeborn.worker.monitor.disk.enabled": false, "celeborn.worker.push.io.threads": 32, "celeborn.worker.storage.dirs": "/mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi" }
master.replicas
Master Pod副本數量。
3
master.volumeMounts
Master 容器數據卷掛載。
[ { "mountPath": "/mnt/celeborn_ratis", "name": "celeborn-ratis" } ]
master.volumes
Master Pod數據卷。
目前數據卷類型僅支持
hostPath
和emptyDir
類型。[ { "hostPath": { "path": "/mnt/celeborn_ratis", "type": "DirectoryOrCreate" }, "name": "celeborn-ratis" } ]
master.nodeSelector
Master Pod節點選擇器。
{}
master.affinity
Master Pod親和性。
{ "podAntiAffinity": { "requiredDuringSchedulingIgnoredDuringExecution": [ { "labelSelector": { "matchExpressions": [ { "key": "app.kubernetes.io/name", "operator": "In", "values": [ "celeborn" ] }, { "key": "app.kubernetes.io/role", "operator": "In", "values": [ "master" ] } ] }, "topologyKey": "kubernetes.io/hostname" } ] } }
master.tolerations
Master Pod污點容忍。
[]
worker.replicas
Worker Pod副本數量。
5
worker.volumeMounts
Worker 容器數據卷掛載。
[ { "mountPath": "/mnt/disk1", "name": "disk1" }, { "mountPath": "/mnt/disk2", "name": "disk2" }, { "mountPath": "/mnt/disk3", "name": "disk3" }, { "mountPath": "/mnt/disk4", "name": "disk4" } ]
worker.volumes
Worker Pod數據卷。
目前數據卷類型僅支持
hostPath
和emptyDir
類型;[ { "capacity": "100Gi", "diskType": "SSD", "hostPath": "/mnt/disk1", "mountPath": "/mnt/disk1", "type": "hostPath" }, { "capacity": "100Gi", "diskType": "SSD", "hostPath": "/mnt/disk2", "mountPath": "/mnt/disk2", "type": "hostPath" }, { "capacity": "100Gi", "diskType": "SSD", "hostPath": "/mnt/disk3", "mountPath": "/mnt/disk3", "type": "hostPath" }, { "capacity": "100Gi", "diskType": "SSD", "hostPath": "/mnt/disk4", "mountPath": "/mnt/disk4", "type": "hostPath" } ]
worker.nodeSelector
Worker Pod節點選擇器。
{}
worker.affinity
Worker Pod親和性。
{ "podAntiAffinity": { "requiredDuringSchedulingIgnoredDuringExecution": [ { "labelSelector": { "matchExpressions": [ { "key": "app.kubernetes.io/name", "operator": "In", "values": [ "celeborn" ] }, { "key": "app.kubernetes.io/role", "operator": "In", "values": [ "worker" ] } ] }, "topologyKey": "kubernetes.io/hostname" } ] } }
worker.tolerations
Worker Pod污點容忍。
[]
執行以下命令并耐心等待Celeborn部署完成。在組件部署期間,如遇到Pod異常問題請參見Pod異常問題排查。
kubectl get -n celeborn statefulset
預期輸出:
NAME READY AGE celeborn-master 3/3 68s celeborn-worker 5/5 68s
步驟三:構建Spark容器鏡像
以Spark 3.5.3版本為例,創建如下Dockerfile文件,構建并上傳至您的鏡像倉庫。
ARG SPARK_IMAGE=<SPARK_IMAGE> # 將<SPARK_IMAGE>替換成您的Spark基礎鏡像。
FROM ${SPARK_IMAGE}
# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars
# Add dependency for Celeborn
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.1/celeborn-client-spark-3-shaded_2.12-0.5.1.jar ${SPARK_HOME}/jars
步驟四:準備測試數據并上傳至 OSS
關于如何準備測試數據并上傳至OSS,參見步驟一:準備測試數據并上傳至OSS。
步驟五:創建Secret存儲OSS訪問憑據
關于如何創建Secret用于存儲OSS訪問憑據,參見步驟三:創建Secret存儲OSS訪問憑據。
步驟六:提交示例Spark作業
根據如下內容創建SparkApplication清單文件并保存為spark-pagerank.yaml
。將<SPARK_IMAGE>
替換為您在步驟三:構建Spark容器鏡像的倉庫地址,同時將<OSS_BUCKET>
和<OSS_ENDPOINT>
替換成您的OSS存儲桶名稱和訪問端點。關于如何在Spark作業中配置Celeborn的更多信息,請參見Celeborn使用文檔。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE> # Spark 鏡像,將<SPARK_IMAGE>替換成Spark鏡像名稱
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # 指定輸入測試數據集,將<OSS_BUCKET>替換成OSS Buckt名稱。
- "10" # 迭代次數。
sparkVersion: 3.5.3
hadoopConf:
fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
fs.oss.endpoint: <OSS_ENDPOINT> # OSS訪問端點。例如北京地區OSS的內網訪問地址為oss-cn-beijing-internal.aliyuncs.com。
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
sparkConf:
spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
spark.celeborn.client.spark.shuffle.writer: hash
spark.celeborn.client.push.replicate.enabled: "false"
spark.sql.adaptive.localShuffleReader.enabled: "false"
spark.sql.adaptive.enabled: "true"
spark.sql.adaptive.skewJoin.enabled: "true"
spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
spark.dynamicAllocation.shuffleTracking.enabled: "false"
spark.executor.userClassPathFirst: "false"
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret
restartPolicy:
type: Never
(可選)步驟七:環境清理
如果您已體驗完本教程,相關資源如不再需要,可以通過執行以下命令進行刪除。
執行如下命令刪除Spark作業。
kubectl delete sparkapplication spark-pagerank
執行如下命令刪除Secret資源。
kubectl delete secret spark-oss-secret
相關文檔
關于如何使用Spark Operator提交Spark作業,請參見使用Spark Operator運行Spark作業。
關于如何使用Spark History Server查看Spark作業信息,請參見使用Spark History Server查看Spark作業信息。
關于如何使用Celeborn,請參見Apache Celeborn 使用文檔。