Apache Airflow是一個強大的工作流程自動化和調度工具,它允許開發者編排、計劃和監控數據管道的執行。EMR Serverless Spark為處理大規模數據處理任務提供了一個無服務器計算環境。本文為您介紹如何通過Apache Airflow實現自動化地向EMR Serverless Spark提交任務,以實現作業調度和執行的自動化,幫助您更有效地管理數據處理任務。
前提條件
已安裝并啟動Airflow服務,詳情請參見Installation of Airflow。
已創建工作空間,詳情請參見創建工作空間。
注意事項
當前EmrServerlessSparkStartJobRunOperator未輸出實際作業的日志。如果您需要查看詳細的作業日志,請登錄EMR Serverless Spark控制臺,通過任務運行ID找到對應的任務實例,然后,您可以在日志探查頁簽或者Spark UI中進一步檢查和分析任務日志。
操作步驟
步驟一:配置Apache Airflow
在Airflow的每個節點上安裝airflow-alibaba-provider插件。
airflow-alibaba-provider插件是由EMR Serverless Spark團隊提供,包含了一個專門用于提交EMR Serverless Spark任務的EmrServerlessSparkStartJobRunOperator組件。
pip install airflow_alibaba_provider-0.0.3-py3-none-any.whl
添加Connection。
CLI方式
通過Airflow CLI執行相應命令來建立Connection,詳情請參見創建Connection。
airflow connections add 'emr-serverless-spark-id' \ --conn-json '{ "conn_type": "emr_serverless_spark", "extra": { "auth_type": "AK", #指定使用阿里云的AccessKey(AK)方式認證。 "access_key_id": "<yourAccesskeyId>", # 阿里云賬號的AccessKey ID。 "access_key_secret": "<yourAccesskeyKey>", # 阿里云賬號的AccessKey Secret。 "region": "<yourRegion>" } }'
UI方式
通過在Airflow Web頁面手動添加Connection,詳情請參見創建Connection。
在Add Connection頁面,配置以下信息。
涉及參數如下表所示。
參數
說明
Connection Id
本文示例為emr-serverless-spark-id。
Connection Type
選擇Generic。如果沒有該類型,您也可以選擇Email。
Extra
填寫內容如下。
{ "auth_type": "AK", #指定使用阿里云的AccessKey(AK)方式認證。 "access_key_id": "<yourAccesskeyId>", # 阿里云賬號的AccessKey ID。 "access_key_secret": "<yourAccesskeyKey>", # 阿里云賬號的AccessKey Secret。 "region": "<yourRegion>" }
步驟二:DAG示例
Airflow的DAG(Directed Acyclic Graph)定義允許您聲明任務執行的方式,以下是通過Airflow使用EmrServerlessSparkStartJobRunOperator執行不同類型的Spark作業的示例。
提交JAR包
此場景涉及使用Airflow任務提交一個預編譯的Spark JAR作業到阿里云EMR Serverless Spark。
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_jar"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_jar = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_jar",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-7e2f1750c6b3****",
resource_queue_id="root_queue",
code_type="JAR",
name="airflow-emr-spark-jar",
entry_point="oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
entry_point_args=["1"],
spark_submit_parameters="--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_jar
提交SQL文件
在Airflow DAG中直接執行SQL命令。
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "emr_spark_sql"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_sql = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_sql",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-7e2f1750c6b3****",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-emr-spark-sql",
entry_point=None,
entry_point_args=["-e","show tables;show tables;"],
spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None,
)
emr_spark_sql
從OSS提交SQL文件
從阿里云OSS獲取并執行SQL腳本文件。
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_sql_2"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_sql_2 = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_sql_2",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-ae42e9c92927****",
resource_queue_id="root_queue",
code_type="SQL",
name="airflow-emr-spark-sql-2",
entry_point="",
entry_point_args=["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_sql_2
從OSS提交Python腳本
從阿里云OSS獲取并執行Python腳本文件。
from __future__ import annotations
from datetime import datetime
from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator
# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"
DAG_ID = "emr_spark_python"
with DAG(
dag_id=DAG_ID,
start_date=datetime(2024, 5, 1),
default_args={},
max_active_runs=1,
catchup=False,
) as dag:
emr_spark_python = EmrServerlessSparkStartJobRunOperator(
task_id="emr_spark_python",
emr_serverless_spark_conn_id="emr-serverless-spark-id",
region="cn-hangzhou",
polling_interval=5,
workspace_id="w-ae42e9c92927****",
resource_queue_id="root_queue",
code_type="PYTHON",
name="airflow-emr-spark-python",
entry_point="oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
entry_point_args=["1"],
spark_submit_parameters="--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
is_prod=True,
engine_release_version=None
)
emr_spark_python
涉及參數如下表所示。
參數 | 參數類型 | 描述 |
|
| 指定Airflow任務的唯一標識符。 |
|
| 指定Airflow用于連接EMR Serverless Spark的Connection ID。 |
|
| 指定EMR Spark所處的區域。 |
|
| 設置Airflow輪詢任務狀態的時間間隔,單位為秒。 |
|
| EMR Spark工作區的唯一標識符。 |
|
| 用于指定EMR Spark任務所使用資源隊列的ID。 |
|
| 任務類型,可以是SQL、Python或JAR,根據任務類型,entry_point參數將有不同的含義。 |
|
| EMR Spark任務的名稱。 |
|
| 指定啟動任務的文件位置,例如JAR、SQL或Python文件。根據 |
|
| 傳遞給Spark程序的參數列表。 |
|
| 包含用于 |
|
| 指定任務運行的環境。當設置為True時,則表明任務將在生產環境中執行, |
|
| 設定EMR Spark引擎的版本。默認值是"esr-2.1-native",對應Spark版本為3.3.1和Scala版本為2.12,使用原生運行時。 |