Apache Airflow是一個強大的工作流程自動化和調度工具,它允許開發者編排、計劃和監控數據管道的執行。EMR Serverless Spark為處理大規模數據處理任務提供了一個無服務器計算環境。本文為您介紹如何通過Apache Airflow的Livy Operator實現自動化地向EMR Serverless Spark提交任務,以實現任務調度和執行的自動化,幫助您更有效地管理數據處理任務。
背景信息
Apache Livy通過REST接口與Spark進行交互,極大簡化了Spark和應用程序服務器之間的通信復雜度。關于Livy API,請參見REST API。
前提條件
已安裝并啟動Airflow服務,詳情請參見Installation of Airflow。
已創建工作空間,詳情請參見創建工作空間。
操作步驟
步驟一:創建Gateway及訪問Token
創建Gateway。
進入Gateway頁面。
在左側導航欄,選擇
。在Spark頁面,單擊目標工作空間名稱。
在EMR Serverless Spark頁面,單擊左側導航欄中的
。
在Livy Gateway頁面,單擊創建Livy Gateway。
在創建Gateway頁面,輸入名稱(例如,Livy-gateway),單擊創建。
其余參數請根據具體情況進行調整,更多參數信息請參見管理Gateway。
創建Token。
在Gateway頁面,單擊Livy-gateway操作列的Token管理。
單擊創建Token。
在創建Token對話框中,輸入名稱(例如,Livy-token),單擊確定。
復制Token信息。
重要Token創建完成后,請務必立即復制新Token的信息,后續不支持查看。如果您的Token過期或遺失,請選擇新建Token或重置Token。
步驟二:配置Apache Airflow
執行以下命令,在Apache Airflow環境中安裝Apache Livy。
pip install apache-airflow-providers-apache-livy
添加Connection。
UI方式
在Airflow中找到默認為livy_default的Connection,并對其信息進行修改;或者您也可以在Airflow Web頁面手動添加Connection,詳情請參見創建Connection。
涉及以下信息:
Host:填寫為Gateway中的Endpoint信息。
Schema:填寫為https。
Extra:填寫JSON字符串,
x-acs-spark-livy-token
為您前一個步驟中復制的Token信息。{ "x-acs-spark-livy-token": "6ac**********kfu" }
CLI方式
通過Airflow CLI執行相應命令來建立Connection,詳情請參見創建Connection。
airflow connections add 'livy_default' \ --conn-json '{ "conn_type": "livy", "host": "pre-emr-spark-livy-gateway-cn-hangzhou.data.aliyun.com/api/v1/workspace/w-xxxxxxx/livycompute/lc-xxxxxxx", # Gateway中的Endpoint信息。 "schema": "https", "extra": { "x-acs-spark-livy-token": "6ac**********kfu" # 為您前一個步驟中復制的Token信息。 } }'
步驟三: 使用Livy Operator提交Spark任務
Airflow的DAG(Directed Acyclic Graph)定義允許您聲明任務執行的方式,以下是通過Airflow使用Livy Operator執行Spark任務的示例。
從阿里云OSS獲取并執行Python腳本文件。
from datetime import timedelta, datetime
from airflow import DAG
from airflow.providers.apache.livy.operators.livy import LivyOperator
default_args = {
'owner': 'aliyun',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Initiate DAG
livy_operator_sparkpi_dag = DAG(
dag_id="livy_operator_sparkpi_dag",
default_args=default_args,
schedule_interval=None,
start_date=datetime(2024, 5, 20),
tags=['example', 'spark', 'livy'],
catchup=False
)
# define livy task with LivyOperator
# 請根據實際情況替換file內容。
livy_sparkpi_submit_task = LivyOperator(
file="oss://<YourBucket>/jars/spark-examples_2.12-3.3.1.jar",
class_name="org.apache.spark.examples.SparkPi",
args=['1000'],
driver_memory="1g",
driver_cores=1,
executor_memory="1g",
executor_cores=2,
num_executors=1,
name="LivyOperator SparkPi",
task_id="livy_sparkpi_submit_task",
dag=livy_operator_sparkpi_dag,
)
livy_sparkpi_submit_task
file
為您的Spark任務對應的文件路徑,本文示例為上傳至阿里云OSS上的JAR包spark-examples_2.12-3.3.1.jar的路徑,請您根據實際情況替換。上傳操作可參見簡單上傳。
步驟四:查看提交至EMR的任務
在EMR Serverless Spark頁面,單擊左側導航欄中的任務歷史。
在任務歷史的開發任務頁簽,您可以查看提交的任務。
相關文檔
在Apache Airflow中,您也可以選擇使用EMR提供的EmrServerlessSparkStartJobRunOperator
接口來提交EMR Serverless Spark任務,提供了一種除了Livy之外的便捷途徑。更多詳情,請參見通過Apache Airflow向EMR Serverless Spark提交任務。