本文以Spark自帶的PageRank作業為例,介紹如何在ACK集群中運行Spark作業,并配置讀寫位于阿里云OSS(對象存儲服務)中的數據。
前提條件
已創建1.24及以上的ACK集群Pro版、ACK Serverless集群Pro版。相關操作,請參見創建ACK托管集群、創建ACK Serverless集群、手動升級集群。
已部署ack-spark-operator組件,請參見部署ack-spark-operator組件。
已通過kubectl工具連接集群。具體操作,請參見獲取集群KubeConfig并通過kubectl工具連接集群。
已創建OSS存儲空間。具體操作請參見創建存儲空間。
已安裝ossutil并配置ossutil。關于ossutil命令參考請參見命令行工具ossutil命令參考。
流程概述
本文將引導您完成如下步驟,幫助您了解如何在ACK集群中運行Spark作業并配置讀寫OSS數據。
準備測試數據并上傳至OSS:生成用于PageRank的測試數據集并將其上傳至OSS。
構建Spark容器鏡像:構建包含了訪問OSS相關Jar包依賴的Spark容器鏡像。
創建Secret存儲OSS訪問憑據:為Spark作業創建指定的OSS訪問憑據,以確保安全訪問OSS。
提交示例Spark作業:創建并提交一個Spark作業的配置文件,實現對OSS數據的讀寫。
(可選)環境清理:在完成測試后,清理無需使用的Spark作業和資源,避免產生預期外的費用。
步驟一:準備測試數據并上傳至OSS
首先,您需生成測試數據集并將其上傳到指定的OSS中,便于后續的Spark作業使用。以下是生成測試數據集的腳本和上傳OSS的操作步驟。
根據以下內容創建名為generate_pagerank_dataset.sh的腳本,用于生成測試數據集。
#!/bin/bash # 檢查參數數量 if [ "$#" -ne 2 ]; then echo "Usage: $0 M N" echo "M: Number of web pages" echo "N: Number of records to generate" exit 1 fi M=$1 N=$2 # 檢查 M 和 N 是否為正整數 if ! [[ "$M" =~ ^[0-9]+$ ]] || ! [[ "$N" =~ ^[0-9]+$ ]]; then echo "Both M and N must be positive integers." exit 1 fi # 生成數據集 for ((i=1; i<=$N; i++)); do # 保證源頁面和目標頁面不相同 while true; do src=$((RANDOM % M + 1)) dst=$((RANDOM % M + 1)) if [ "$src" -ne "$dst" ]; then echo "$src $dst" break fi done done
執行如下命令,生成測試數據集。
M=100000 # 網頁數量 N=10000000 # 記錄數量 # 隨機生成數據集并保存至 pagerank_dataset.txt bash generate_pagerank_dataset.sh $M $N > pagerank_dataset.txt
執行如下命令,將生成的數據集上傳至OSS Bucket中的
data/
路徑下:ossutil cp pagerank_dataset.txt oss://<BUCKET_NAME>/data/
步驟二:構建Spark容器鏡像
為了在Spark作業中訪問數據,首先需要構建一個包含了訪問OSS相關Jar包依賴的容器鏡像。您可以選擇使用Hadoop OSS SDK或JindoSDK來訪問OSS,本文演示用的容器鏡像根據如下示例Dockerfile構建。關于容器鏡像服務構建鏡像請參見使用企業版實例構建鏡像。
本文涉及到的容器鏡像僅用于演示,不建議在生產環境中使用。
您需要根據使用的Spark版本,選擇相應的Hadoop OSS SDK版本或者Jindo SDK版本。
使用Hadoop OSS SDK
以Spark 3.5.2版本和Hadoop OSS SDK 3.3.4版本為例,創建如下示例Dockerfile文件。
ARG SPARK_IMAGE=registry-cn-hangzhou.ack.aliyuncs.com/dev/spark:3.5.2
FROM ${SPARK_IMAGE}
# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars
使用JindoSDK
以Spark 3.5.2版本和JindoSDK 6.4.0版本為例,創建如下示例Dockerfile文件。
ARG SPARK_IMAGE=registry-cn-hangzhou.ack.aliyuncs.com/dev/spark:3.5.2
FROM ${SPARK_IMAGE}
# Add dependency for JindoSDK support
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.4.0/jindo-core-6.4.0.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.4.0/jindo-sdk-6.4.0.jar ${SPARK_HOME}/jars
步驟三:創建Secret存儲OSS訪問憑據
在Spark作業中訪問OSS數據時,需要配置OSS訪問憑證,為了避免在作業中硬編碼訪問憑據,需要創建一個Secret用于存儲敏感信息,并以環境變量的形式注入到容器中。
根據以下內容創建Secret清單文件,并保存為spark-oss-secret.yaml。
apiVersion: v1 kind: Secret metadata: name: spark-oss-secret namespace: default stringData: OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID> # 阿里云AccessKey ID。 OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET> # 阿里云AccessKey Secret。
執行如下命令創建Secret。
kubectl apply -f spark-oss-secret.yaml
預期輸出如下。
secret/spark-oss-secret created
步驟四:運行示例Spark作業
在ACK集群上提交Spark作業,實現OSS數據的讀寫。
使用Hadoop OSS SDK
創建如下SparkApplication清單文件并保存為spark-pagerank.yaml
。關于OSS完整的配置參數列表,請參見Hadoop-Aliyun module。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2-oss
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # 指定輸入測試數據集,將<OSS_BUCKET>替換成OSS Buckt名稱。
- "10" # 迭代次數。
sparkVersion: 3.5.2
hadoopConf:
fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
# OSS 訪問端點
fs.oss.endpoint: <OSS_ENDPOINT> # OSS訪問端點。例如北京地區OSS的內網訪問地址為oss-cn-beijing-internal.aliyuncs.com。
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret #指定訪問OSS的安全憑據。
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret #指定訪問OSS的安全憑據。
restartPolicy:
type: Never
使用JindoSDK
創建如下SparkApplication清單文件并保存為spark-pagerank.yaml
。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2-jindosdk
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # 指定輸入測試數據集,將<OSS_BUCKET>替換成OSS Buckt名稱。
- "10" # 迭代次數。
sparkVersion: 3.5.2
hadoopConf:
fs.AbstractFileSystem.oss.impl: com.aliyun.jindodata.oss.JindoOSS
fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem
fs.oss.endpoint: <OSS_ENDPOINT> # OSS訪問端點。例如北京地區OSS的內網訪問地址為oss-cn-beijing-internal.aliyuncs.com。
fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret #指定訪問OSS的安全憑據。
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret #指定訪問OSS的安全憑據。
restartPolicy:
type: Never
執行如下命令提交Spark作業。
kubectl apply -f spark-pagerank.yaml
執行如下命令查看Spark作業執行狀態并等待作業執行結果。
kubectl get sparkapplications spark-pagerank
預期輸出。
NAME STATUS ATTEMPTS START FINISH AGE spark-pagerank COMPLETED 1 2024-10-09T12:54:25Z 2024-10-09T12:55:46Z 90s
執行如下命令查看Driver pod日志輸出的最后20行。
kubectl logs spark-pagerank-driver --tail=20
預期輸出。
使用Hadoop OSS SDK
從日志中可以看到,Spark作業已經成功運行結束。
30024 has rank: 1.0709659078941967 . 21390 has rank: 0.9933356174074005 . 28500 has rank: 1.0404018494028928 . 2137 has rank: 0.9931000490520374 . 3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 24/10/09 12:48:36 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-dd0d4d927151c9d0-driver-svc.default.svc:4040 24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/10/09 12:48:36 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/10/09 12:48:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/10/09 12:48:36 INFO MemoryStore: MemoryStore cleared 24/10/09 12:48:36 INFO BlockManager: BlockManager stopped 24/10/09 12:48:36 INFO BlockManagerMaster: BlockManagerMaster stopped 24/10/09 12:48:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/10/09 12:48:36 INFO SparkContext: Successfully stopped SparkContext 24/10/09 12:48:36 INFO ShutdownHookManager: Shutdown hook called 24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8b8c2ab-c916-4f84-b60f-f54c0de3a7f0 24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-c5917d98-06fb-46fe-85bc-199b839cb885/spark-23e2c2ae-4754-43ae-854d-2752eb83b2c5
使用JindoSDK
從日志中可以看到,Spark作業已經成功運行結束。
21390 has rank: 0.9933356174074005 . 28500 has rank: 1.0404018494028928 . 2137 has rank: 0.9931000490520374 . 3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 24/10/09 12:55:44 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/10/09 12:55:44 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-6a5e3d9271584856-driver-svc.default.svc:4040 24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/10/09 12:55:44 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/10/09 12:55:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/10/09 12:55:45 INFO MemoryStore: MemoryStore cleared 24/10/09 12:55:45 INFO BlockManager: BlockManager stopped 24/10/09 12:55:45 INFO BlockManagerMaster: BlockManagerMaster stopped 24/10/09 12:55:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/10/09 12:55:45 INFO SparkContext: Successfully stopped SparkContext 24/10/09 12:55:45 INFO ShutdownHookManager: Shutdown hook called 24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /var/data/spark-87e8406e-06a7-4b4a-b18f-2193da299d35/spark-093a1b71-121a-4367-9d22-ad4e397c9815 24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-723e2039-a493-49e8-b86d-fff5fd1bb168
(可選)步驟五:環境清理
如果您已體驗完本教程,相關資源如不再需要,可以通過執行以下命令進行刪除。
執行如下命令刪除Spark作業。
kubectl delete -f spark-pagerank.yaml
執行如下命令刪除Secret資源。
kubectl delete -f oss-secret.yaml