使用Spark Operator運行Spark作業(yè)
Apache Spark是一種專門用于大規(guī)模數(shù)據(jù)處理的計算引擎,廣泛應用于數(shù)據(jù)分析和機器學習等場景。Spark Operator提供了一種在Kubernetes集群中自動化部署Spark作業(yè)和管理其生命周期的能力。本文介紹如何在ACK集群中使用Spark Operator運行Spark作業(yè),幫助數(shù)據(jù)工程師快速高效地運行和管理大數(shù)據(jù)處理作業(yè)。
前提條件
已創(chuàng)建1.24及以上的ACK集群Pro版、ACK Serverless集群Pro版。相關(guān)操作,請參見創(chuàng)建ACK托管集群、創(chuàng)建ACK Serverless集群、手動升級集群。
已通過kubectl連接kubernetes集群。具體操作,請參見獲取集群KubeConfig并通過kubectl工具連接集群。
Spark Operator介紹
Spark Operator專為在Kubernetes集群中運行Spark工作負載而設(shè)計,旨在自動化管理Spark作業(yè)的生命周期。通過SparkApplication
和ScheduledSparkApplication
等CRD資源,您可以靈活提交和管理Spark作業(yè)。利用Kubernetes的自動擴展、健康檢查和資源管理等特性,Spark Operator可以更有效地監(jiān)控和優(yōu)化Spark作業(yè)的運行。ACK基于社區(qū)組件kubeflow/spark-operator提供了ack-spark-operator組件,更多信息,請參見Spark Operator | Kubeflow。
使用優(yōu)勢:
簡化管理:通過Kubernetes的聲明式作業(yè)配置,自動化部署Spark作業(yè)并管理作業(yè)的生命周期。
支持多租戶:可利用Kubernetes的命名空間機制和資源配額機制進行用戶粒度資源隔離和資源分配,并利用Kubernetes的節(jié)點選擇機制保證 Spark工作負載可以獲得專用的資源。
彈性資源供給:利用ECI彈性容器實例或彈性節(jié)點池等彈性資源,可在業(yè)務(wù)高峰期快速獲得大量彈性資源,平衡性能和成本。
適用場景:
數(shù)據(jù)分析:數(shù)據(jù)科學家可以利用Spark進行交互式數(shù)據(jù)分析和數(shù)據(jù)清洗等。
批量數(shù)據(jù)計算:運行定時批處理作業(yè),處理大規(guī)模數(shù)據(jù)集。
實時數(shù)據(jù)處理:Spark Streaming庫提供了對實時數(shù)據(jù)進行流式處理的能力。
流程概述
本文將引導您完成以下步驟,幫助您了解如何使用Spark Operator在ACK集群上運行和管理Spark作業(yè),從而有效地進行大數(shù)據(jù)處理。
部署ack-spark-operator組件:在ACK集群中安裝Spark Operator,使其能夠管理和運行Spark作業(yè)。
提交Spark作業(yè):創(chuàng)建并提交一個Spark作業(yè)的配置文件,實現(xiàn)對特定數(shù)據(jù)處理任務(wù)的執(zhí)行。
查看Spark作業(yè):監(jiān)控作業(yè)的運行狀態(tài),獲取詳細的執(zhí)行信息和日志。
訪問Spark Web UI:通過Web界面更直觀地了解Spark作業(yè)執(zhí)行情況。
更新Spark作業(yè):根據(jù)需求調(diào)整作業(yè)配置,支持動態(tài)更新參數(shù)。
刪除Spark作業(yè):清理已完成或不再需要的Spark作業(yè),避免產(chǎn)生預期外的費用。
步驟一:部署ack-spark-operator組件
登錄容器服務(wù)管理控制臺,在左側(cè)導航欄選擇 。
在應用市場頁面單擊應用目錄頁簽,然后搜索并選中ack-spark-operator。
在ack-spark-operator頁面,單擊一鍵部署。
在創(chuàng)建面板中,選擇集群和命名空間,然后單擊下一步。
在參數(shù)配置頁面,設(shè)置相應參數(shù),然后單擊確定。
下表列出了部分配置參數(shù)的說明。完整的參數(shù)配置詳情,您可以在ack-spark-operator頁面中的配置項查看。
參數(shù)
描述
示例值
controller.replicas
控制器副本數(shù)量。
1(默認值)
webhook.replicas
Webhook副本數(shù)量。
1(默認值)
spark.jobNamespaces
可運行Spark任務(wù)的命名空間列表。包含空字符串表示允許所有命名空間。多個命名空間使用英文半角逗號(,)隔開。
["default"]
(默認值)[""]
(所有命名空間)["ns1","ns2","ns3"]
(多個命名空間)
spark.serviceAccount.name
Spark作業(yè)會在
spark.jobNamespaces
指定的每個命名空間中自動創(chuàng)建名為spark-operator-spark
的ServiceAccount和RBAC資源并進行相關(guān)授權(quán)。您可以自定義ServiceAccount名稱,后續(xù)提交Spark作業(yè)時請指定自定義創(chuàng)建的ServiceAccount名稱。spark-operator-spark
(默認值)
步驟二:提交Spark作業(yè)
您可以通過創(chuàng)建SparkApplication清單文件,提交一個實際的Spark作業(yè)以進行數(shù)據(jù)處理。
創(chuàng)建如下SparkApplication清單文件,并保存為
spark-pi.yaml
。apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pi namespace: default # 需要確保命名空間在spark.jobNamespaces指定的命名空間列表中。 spec: type: Scala mode: cluster image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2 imagePullPolicy: IfNotPresent mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar arguments: - "1000" sparkVersion: 3.5.2 driver: cores: 1 coreLimit: 1200m memory: 512m serviceAccount: spark-operator-spark # 如果您自定義了ServiceAccount名稱,則需要進行相應修改。 executor: instances: 1 cores: 1 coreLimit: 1200m memory: 512m restartPolicy: type: Never
執(zhí)行以下命令,提交Spark作業(yè)。
kubectl apply -f spark-pi.yaml
預期輸出如下。
sparkapplication.sparkoperator.k8s.io/spark-pi created
步驟三:查看Spark作業(yè)
您可以通過以下命令獲取Spark作業(yè)的運行狀態(tài)、相關(guān)Pod信息及日志。
執(zhí)行以下命令,查看Spark作業(yè)運行狀態(tài)。
kubectl get sparkapplication spark-pi
預期輸出如下。
NAME STATUS ATTEMPTS START FINISH AGE spark-pi SUBMITTED 1 2024-06-04T03:17:11Z <no value> 15s
執(zhí)行以下命令,指定標簽
sparkoperator.k8s.io/app-name
為spark-pi
,查看Spark作業(yè)的Pod的運行狀態(tài)。kubectl get pod -l sparkoperator.k8s.io/app-name=spark-pi
預期輸出如下。
NAME READY STATUS RESTARTS AGE spark-pi-7272428fc8f5f392-exec-1 1/1 Running 0 13s spark-pi-7272428fc8f5f392-exec-2 1/1 Running 0 13s spark-pi-driver 1/1 Running 0 49s
當Spark作業(yè)運行結(jié)束后,所有Executor Pod都將被Driver自動刪除。
執(zhí)行以下命令,查看Spark作業(yè)詳細信息。
kubectl describe sparkapplication spark-pi
具體輸出內(nèi)容會根據(jù)當前作業(yè)運行狀態(tài)而有所不同。
Name: spark-pi Namespace: default Labels: <none> Annotations: <none> API Version: sparkoperator.k8s.io/v1beta2 Kind: SparkApplication Metadata: Creation Timestamp: 2024-06-04T03:16:59Z Generation: 1 Resource Version: 1350200 UID: 1a1f9160-5dbb-XXXX-XXXX-be1c1fda4859 Spec: Arguments: 1000 Driver: Core Limit: 1200m Cores: 1 Memory: 512m Service Account: spark Executor: Core Limit: 1200m Cores: 1 Instances: 1 Memory: 512m Image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2 Image Pull Policy: IfNotPresent Main Application File: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar Main Class: org.apache.spark.examples.SparkPi Mode: cluster Restart Policy: Type: Never Spark Version: 3.5.2 Type: Scala Status: Application State: State: COMPLETED Driver Info: Pod Name: spark-pi-driver Web UI Address: 172.XX.XX.92:0 Web UI Port: 4040 Web UI Service Name: spark-pi-ui-svc Execution Attempts: 1 Executor State: spark-pi-26c5XXXXX1408337-exec-1: COMPLETED Last Submission Attempt Time: 2024-06-04T03:17:11Z Spark Application Id: spark-0042dead12XXXXXX43675f09552a946 Submission Attempts: 1 Submission ID: 117ee161-3951-XXXX-XXXX-e7d24626c877 Termination Time: 2024-06-04T03:17:55Z Events: Type Reason Age From Message ---- ------ ---- ---- ------- Normal SparkApplicationAdded 91s spark-operator SparkApplication spark-pi was added, enqueuing it for submission Normal SparkApplicationSubmitted 79s spark-operator SparkApplication spark-pi was submitted successfully Normal SparkDriverRunning 61s spark-operator Driver spark-pi-driver is running Normal SparkExecutorPending 56s spark-operator Executor [spark-pi-26c5XXXXX1408337-exec-1] is pending Normal SparkExecutorRunning 53s spark-operator Executor [spark-pi-26c5XXXXX1408337-exec-1] is running Normal SparkDriverCompleted 35s spark-operator Driver spark-pi-driver completed Normal SparkApplicationCompleted 35s spark-operator SparkApplication spark-pi completed Normal SparkExecutorCompleted 35s spark-operator Executor [spark-pi-26c5XXXXX1408337-exec-1] completed
執(zhí)行以下命令,查看Driver Pod運行日志的最后20行。
kubectl logs --tail=20 spark-pi-driver
預期輸出如下:
24/05/30 10:05:30 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 24/05/30 10:05:30 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 7.942 s 24/05/30 10:05:30 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job 24/05/30 10:05:30 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished 24/05/30 10:05:30 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 8.043996 s Pi is roughly 3.1419522314195225 24/05/30 10:05:30 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/05/30 10:05:30 INFO SparkUI: Stopped Spark web UI at http://spark-pi-1e18858fc8f56b14-driver-svc.default.svc:4040 24/05/30 10:05:30 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/05/30 10:05:30 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/05/30 10:05:30 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/05/30 10:05:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/05/30 10:05:30 INFO MemoryStore: MemoryStore cleared 24/05/30 10:05:30 INFO BlockManager: BlockManager stopped 24/05/30 10:05:30 INFO BlockManagerMaster: BlockManagerMaster stopped 24/05/30 10:05:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/05/30 10:05:30 INFO SparkContext: Successfully stopped SparkContext 24/05/30 10:05:30 INFO ShutdownHookManager: Shutdown hook called 24/05/30 10:05:30 INFO ShutdownHookManager: Deleting directory /var/data/spark-14ed60f1-82cd-4a33-b1b3-9e5d975c5b1e/spark-01120c89-5296-4c83-8a20-0799eef4e0ee 24/05/30 10:05:30 INFO ShutdownHookManager: Deleting directory /tmp/spark-5f98ed73-576a-41be-855d-dabdcf7de189
步驟四:訪問Spark Web UI
Spark作業(yè)提供了Web UI,可以監(jiān)控Spark作業(yè)的執(zhí)行狀態(tài)。通過使用kubectl port-forward
命令,將端口轉(zhuǎn)發(fā)到本地,來訪問該Web UI界面。Web UI服務(wù)僅在Spark作業(yè)運行期間(即Driver Pod處于Running狀態(tài))可用,Spark作業(yè)結(jié)束后,Web UI將無法繼續(xù)訪問。
在部署ack-spark-operator組件時,controller.uiService.enable
默認為true
,會自動創(chuàng)建一個Service,您可以將其端口轉(zhuǎn)發(fā)來訪問Web UI,但如果在部署組件時將controller.uiService.enable
設(shè)置為false
,則不會創(chuàng)建Service,也可以通過轉(zhuǎn)發(fā)Pod的端口訪問Web UI。
kubectl port-forward
命令建立的端口轉(zhuǎn)發(fā)僅適用于測試環(huán)境下的快速驗證,不適合在生產(chǎn)環(huán)境中使用,使用時請注意安全風險。
根據(jù)情況選擇通過Service或Pod來轉(zhuǎn)發(fā)端口以訪問Web UI。以下是相關(guān)的命令:
執(zhí)行以下命令,通過Service端口轉(zhuǎn)發(fā)訪問Web UI。
kubectl port-forward services/spark-pi-ui-svc 4040
執(zhí)行以下命令,通過Pod端口轉(zhuǎn)發(fā)訪問Web UI。
kubectl port-forward pods/spark-pi-driver 4040
預期輸出如下。
Forwarding from 127.0.0.1:4040 -> 4040 Forwarding from [::1]:4040 -> 4040
通過http://127.0.0.1:4040訪問Web UI。
(可選)步驟五:更新Spark作業(yè)
如需修改Spark作業(yè)的參數(shù),您可以更新Spark作業(yè)的清單文件。
編輯資源清單文件
spark-pi.yaml
,例如將作業(yè)參數(shù)arguments
修改為10000
,executor
數(shù)量修改為2
。apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pi spec: type: Scala mode: cluster image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2 imagePullPolicy: IfNotPresent mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar arguments: - "10000" sparkVersion: 3.5.2 driver: cores: 1 coreLimit: 1200m memory: 512m serviceAccount: spark executor: instances: 2 cores: 1 coreLimit: 1200m memory: 512m restartPolicy: type: Never
執(zhí)行以下命令,更新Spark作業(yè)。
kubectl apply -f spark-pi.yaml
執(zhí)行以下命令,查看Spark作業(yè)狀態(tài)。
kubectl get sparkapplication spark-pi
Spark作業(yè)將再次開始運行。預期輸出如下。
NAME STATUS ATTEMPTS START FINISH AGE spark-pi RUNNING 1 2024-06-04T03:37:34Z <no value> 20m
(可選)步驟六:刪除Spark作業(yè)
如果您已體驗完本教程,Spark作業(yè)已無需使用,您可以通過以下命令操作釋放相關(guān)資源。
執(zhí)行以下命令,刪除上述步驟中創(chuàng)建的Spark作業(yè)。
kubectl delete -f spark-pi.yaml
您也可以執(zhí)行以下命令。
kubectl delete sparkapplication spark-pi