日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過Apache Airflow使用Livy Operator提交任務

Apache Airflow是一個強大的工作流程自動化和調度工具,它允許開發者編排、計劃和監控數據管道的執行。EMR Serverless Spark為處理大規模數據處理任務提供了一個無服務器計算環境。本文為您介紹如何通過Apache Airflow的Livy Operator實現自動化地向EMR Serverless Spark提交任務,以實現任務調度和執行的自動化,幫助您更有效地管理數據處理任務。

背景信息

Apache Livy通過REST接口與Spark進行交互,極大簡化了Spark和應用程序服務器之間的通信復雜度。關于Livy API,請參見REST API

前提條件

操作步驟

步驟一:創建Gateway及訪問Token

  1. 創建Gateway。

    1. 進入Gateway頁面。

      1. 登錄E-MapReduce控制臺

      2. 在左側導航欄,選擇EMR Serverless > Spark

      3. Spark頁面,單擊目標工作空間名稱。

      4. EMR Serverless Spark頁面,單擊左側導航欄中的運維中心 > Gateway

    2. Livy Gateway頁面,單擊創建Livy Gateway

    3. 在創建Gateway頁面,輸入名稱(例如,Livy-gateway),單擊創建

      其余參數請根據具體情況進行調整,更多參數信息請參見管理Gateway

  2. 創建Token。

    1. Gateway頁面,單擊Livy-gateway操作列的Token管理

    2. 單擊創建Token

    3. 創建Token對話框中,輸入名稱(例如,Livy-token),單擊確定

    4. 復制Token信息。

      重要

      Token創建完成后,請務必立即復制新Token的信息,后續不支持查看。如果您的Token過期或遺失,請選擇新建Token或重置Token。

步驟二:配置Apache Airflow

  1. 執行以下命令,在Apache Airflow環境中安裝Apache Livy。

    pip install apache-airflow-providers-apache-livy
  2. 添加Connection。

    UI方式

    在Airflow中找到默認為livy_default的Connection,并對其信息進行修改;或者您也可以在Airflow Web頁面手動添加Connection,詳情請參見創建Connection

    涉及以下信息:

    • Host:填寫為Gateway中的Endpoint信息。

    • Schema:填寫為https

    • Extra:填寫JSON字符串,x-acs-spark-livy-token為您前一個步驟中復制的Token信息。

      {
        "x-acs-spark-livy-token": "6ac**********kfu"
      }

    CLI方式

    通過Airflow CLI執行相應命令來建立Connection,詳情請參見創建Connection

    airflow connections add 'livy_default' \
        --conn-json '{
            "conn_type": "livy",
            "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx",   # Gateway中的Endpoint信息。
            "schema": "https",
            "extra": {
                "x-acs-spark-livy-token": "6ac**********kfu"  # 為您前一個步驟中復制的Token信息。
            }
        }'

步驟三: 使用Livy Operator提交Spark任務

Airflow的DAG(Directed Acyclic Graph)定義允許您聲明任務執行的方式,以下是通過Airflow使用Livy Operator執行Spark任務的示例。

從阿里云OSS獲取并執行Python腳本文件。

from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator

default_args = {
    'owner': 'aliyun',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Initiate DAG
livy_operator_sparkpi_dag = DAG(
    dag_id="livy_operator_sparkpi_dag",
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2024, 5, 20),
    tags=['example', 'spark', 'livy'],
    catchup=False
)

# define livy task with LivyOperator
# 請根據實際情況替換file內容。
livy_sparkpi_submit_task = LivyOperator(
    file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
    class_name="org.apache.spark.examples.SparkPi",
    args=['1000'],
    driver_memory="1g",
    driver_cores=1,
    executor_memory="1g",
    executor_cores=2,
    num_executors=1,
    name="LivyOperator SparkPi",
    task_id="livy_sparkpi_submit_task",
    dag=livy_operator_sparkpi_dag,
)

livy_sparkpi_submit_task
說明

file為您的Spark任務對應的文件路徑,本文示例為上傳至阿里云OSS上的JAR包spark-examples_2.12-3.3.1.jar的路徑,請您根據實際情況替換。上傳操作可參見簡單上傳

步驟四:查看提交至EMR的任務

  1. EMR Serverless Spark頁面,單擊左側導航欄中的任務歷史

  2. 任務歷史開發任務頁簽,您可以查看提交的任務。

    image

相關文檔

在Apache Airflow中,您也可以選擇使用EMR提供的EmrServerlessSparkStartJobRunOperator接口來提交EMR Serverless Spark任務,提供了一種除了Livy之外的便捷途徑。更多詳情,請參見通過Apache Airflow向EMR Serverless Spark提交任務