日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Spark作業讀寫OSS數據

本文以Spark自帶的PageRank作業為例,介紹如何在ACK集群中運行Spark作業,并配置讀寫位于阿里云OSS(對象存儲服務)中的數據。

前提條件

流程概述

本文將引導您完成如下步驟,幫助您了解如何在ACK集群中運行Spark作業并配置讀寫OSS數據。

  1. 準備測試數據并上傳至OSS:生成用于PageRank的測試數據集并將其上傳至OSS。

  2. 構建Spark容器鏡像:構建包含了訪問OSS相關Jar包依賴的Spark容器鏡像。

  3. 創建Secret存儲OSS訪問憑據:為Spark作業創建指定的OSS訪問憑據,以確保安全訪問OSS。

  4. 提交示例Spark作業:創建并提交一個Spark作業的配置文件,實現對OSS數據的讀寫。

  5. (可選)環境清理:在完成測試后,清理無需使用的Spark作業和資源,避免產生預期外的費用。

步驟一:準備測試數據并上傳至OSS

首先,您需生成測試數據集并將其上傳到指定的OSS中,便于后續的Spark作業使用。以下是生成測試數據集的腳本和上傳OSS的操作步驟。

  1. 根據以下內容創建名為generate_pagerank_dataset.sh的腳本,用于生成測試數據集。

    #!/bin/bash
    
    # 檢查參數數量
    if [ "$#" -ne 2 ]; then
        echo "Usage: $0 M N"
        echo "M: Number of web pages"
        echo "N: Number of records to generate"
        exit 1
    fi
    
    M=$1
    N=$2
    
    # 檢查 M 和 N 是否為正整數
    if ! [[ "$M" =~ ^[0-9]+$ ]] || ! [[ "$N" =~ ^[0-9]+$ ]]; then
        echo "Both M and N must be positive integers."
        exit 1
    fi
    
    # 生成數據集
    
    for ((i=1; i<=$N; i++)); do
        # 保證源頁面和目標頁面不相同
        while true; do
            src=$((RANDOM % M + 1))
            dst=$((RANDOM % M + 1))
            if [ "$src" -ne "$dst" ]; then
                echo "$src $dst"
                break
            fi
        done
    done
  2. 執行如下命令,生成測試數據集。

    M=100000    # 網頁數量
    
    N=10000000  # 記錄數量
    
    # 隨機生成數據集并保存至 pagerank_dataset.txt
    bash generate_pagerank_dataset.sh $M $N > pagerank_dataset.txt
  3. 執行如下命令,將生成的數據集上傳至OSS Bucket中的 data/ 路徑下:

    ossutil cp pagerank_dataset.txt oss://<BUCKET_NAME>/data/

步驟二:構建Spark容器鏡像

為了在Spark作業中訪問數據,首先需要構建一個包含了訪問OSS相關Jar包依賴的容器鏡像。您可以選擇使用Hadoop OSS SDK或JindoSDK來訪問OSS,本文演示用的容器鏡像根據如下示例Dockerfile構建。關于容器鏡像服務構建鏡像請參見使用企業版實例構建鏡像

說明
  • 本文涉及到的容器鏡像僅用于演示,不建議在生產環境中使用。

  • 您需要根據使用的Spark版本,選擇相應的Hadoop OSS SDK版本或者Jindo SDK版本。

使用Hadoop OSS SDK

以Spark 3.5.2版本和Hadoop OSS SDK 3.3.4版本為例,創建如下示例Dockerfile文件。

ARG SPARK_IMAGE=registry-cn-hangzhou.ack.aliyuncs.com/dev/spark:3.5.2

FROM ${SPARK_IMAGE}

# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars

使用JindoSDK

以Spark 3.5.2版本和JindoSDK 6.4.0版本為例,創建如下示例Dockerfile文件。

ARG SPARK_IMAGE=registry-cn-hangzhou.ack.aliyuncs.com/dev/spark:3.5.2

FROM ${SPARK_IMAGE}

# Add dependency for JindoSDK support
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.4.0/jindo-core-6.4.0.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.4.0/jindo-sdk-6.4.0.jar ${SPARK_HOME}/jars

步驟三:創建Secret存儲OSS訪問憑據

在Spark作業中訪問OSS數據時,需要配置OSS訪問憑證,為了避免在作業中硬編碼訪問憑據,需要創建一個Secret用于存儲敏感信息,并以環境變量的形式注入到容器中。

  1. 根據以下內容創建Secret清單文件,并保存為spark-oss-secret.yaml。

    apiVersion: v1
    kind: Secret
    metadata:
      name: spark-oss-secret
      namespace: default
    stringData:
      OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID>            # 阿里云AccessKey ID。
      OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>    # 阿里云AccessKey Secret。
  2. 執行如下命令創建Secret。

    kubectl apply -f spark-oss-secret.yaml

    預期輸出如下。

    secret/spark-oss-secret created

步驟四:運行示例Spark作業

在ACK集群上提交Spark作業,實現OSS數據的讀寫。

使用Hadoop OSS SDK

創建如下SparkApplication清單文件并保存為spark-pagerank.yaml。關于OSS完整的配置參數列表,請參見Hadoop-Aliyun module

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2-oss
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt           # 指定輸入測試數據集,將<OSS_BUCKET>替換成OSS Buckt名稱。
  - "10"                                                   # 迭代次數。
  sparkVersion: 3.5.2
  hadoopConf:
    fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    # OSS 訪問端點
    fs.oss.endpoint: <OSS_ENDPOINT>                        # OSS訪問端點。例如北京地區OSS的內網訪問地址為oss-cn-beijing-internal.aliyuncs.com。 
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    envFrom:
    - secretRef:
        name: spark-oss-secret               #指定訪問OSS的安全憑據。
  executor:
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef: 
        name: spark-oss-secret               #指定訪問OSS的安全憑據。
  restartPolicy:
    type: Never

使用JindoSDK

創建如下SparkApplication清單文件并保存為spark-pagerank.yaml

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2-jindosdk
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt    # 指定輸入測試數據集,將<OSS_BUCKET>替換成OSS Buckt名稱。
  - "10"                                            # 迭代次數。
  sparkVersion: 3.5.2
  hadoopConf:
    fs.AbstractFileSystem.oss.impl: com.aliyun.jindodata.oss.JindoOSS
    fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem
    fs.oss.endpoint: <OSS_ENDPOINT>                 # OSS訪問端點。例如北京地區OSS的內網訪問地址為oss-cn-beijing-internal.aliyuncs.com。
    fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    envFrom:
    - secretRef:
        name: spark-oss-secret                    #指定訪問OSS的安全憑據。
  executor: 
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef:
        name: spark-oss-secret                    #指定訪問OSS的安全憑據。
  restartPolicy:
    type: Never
  1. 執行如下命令提交Spark作業。

    kubectl apply -f spark-pagerank.yaml
  2. 執行如下命令查看Spark作業執行狀態并等待作業執行結果。

    kubectl get sparkapplications spark-pagerank

    預期輸出。

    NAME             STATUS      ATTEMPTS   START                  FINISH                 AGE
    spark-pagerank   COMPLETED   1          2024-10-09T12:54:25Z   2024-10-09T12:55:46Z   90s
  3. 執行如下命令查看Driver pod日志輸出的最后20行。

    kubectl logs spark-pagerank-driver --tail=20

    預期輸出。

    使用Hadoop OSS SDK

    從日志中可以看到,Spark作業已經成功運行結束。

    30024 has rank:  1.0709659078941967 .
    21390 has rank:  0.9933356174074005 .
    28500 has rank:  1.0404018494028928 .
    2137 has rank:  0.9931000490520374 .
    3406 has rank:  0.9562543137167121 .
    20904 has rank:  0.8827028621652337 .
    25604 has rank:  1.0270134041934191 .
    24/10/09 12:48:36 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-dd0d4d927151c9d0-driver-svc.default.svc:4040
    24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    24/10/09 12:48:36 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    24/10/09 12:48:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    24/10/09 12:48:36 INFO MemoryStore: MemoryStore cleared
    24/10/09 12:48:36 INFO BlockManager: BlockManager stopped
    24/10/09 12:48:36 INFO BlockManagerMaster: BlockManagerMaster stopped
    24/10/09 12:48:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    24/10/09 12:48:36 INFO SparkContext: Successfully stopped SparkContext
    24/10/09 12:48:36 INFO ShutdownHookManager: Shutdown hook called
    24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8b8c2ab-c916-4f84-b60f-f54c0de3a7f0
    24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-c5917d98-06fb-46fe-85bc-199b839cb885/spark-23e2c2ae-4754-43ae-854d-2752eb83b2c5

    使用JindoSDK

    從日志中可以看到,Spark作業已經成功運行結束。

    21390 has rank:  0.9933356174074005 .
    28500 has rank:  1.0404018494028928 .
    2137 has rank:  0.9931000490520374 .
    3406 has rank:  0.9562543137167121 .
    20904 has rank:  0.8827028621652337 .
    25604 has rank:  1.0270134041934191 .
    24/10/09 12:55:44 INFO SparkContext: SparkContext is stopping with exitCode 0.
    24/10/09 12:55:44 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-6a5e3d9271584856-driver-svc.default.svc:4040
    24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    24/10/09 12:55:44 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    24/10/09 12:55:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    24/10/09 12:55:45 INFO MemoryStore: MemoryStore cleared
    24/10/09 12:55:45 INFO BlockManager: BlockManager stopped
    24/10/09 12:55:45 INFO BlockManagerMaster: BlockManagerMaster stopped
    24/10/09 12:55:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    24/10/09 12:55:45 INFO SparkContext: Successfully stopped SparkContext
    24/10/09 12:55:45 INFO ShutdownHookManager: Shutdown hook called
    24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /var/data/spark-87e8406e-06a7-4b4a-b18f-2193da299d35/spark-093a1b71-121a-4367-9d22-ad4e397c9815
    24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-723e2039-a493-49e8-b86d-fff5fd1bb168

(可選)步驟五:環境清理

如果您已體驗完本教程,相關資源如不再需要,可以通過執行以下命令進行刪除。

執行如下命令刪除Spark作業。

kubectl delete -f spark-pagerank.yaml

執行如下命令刪除Secret資源。

kubectl delete -f oss-secret.yaml