日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過Apache Airflow向EMR Serverless Spark提交任務

Apache Airflow是一個強大的工作流程自動化和調度工具,它允許開發者編排、計劃和監控數據管道的執行。EMR Serverless Spark為處理大規模數據處理任務提供了一個無服務器計算環境。本文為您介紹如何通過Apache Airflow實現自動化地向EMR Serverless Spark提交任務,以實現作業調度和執行的自動化,幫助您更有效地管理數據處理任務。

前提條件

注意事項

當前EmrServerlessSparkStartJobRunOperator未輸出實際作業的日志。如果您需要查看詳細的作業日志,請登錄EMR Serverless Spark控制臺,通過任務運行ID找到對應的任務實例,然后,您可以在日志探查頁簽或者Spark UI中進一步檢查和分析任務日志。

操作步驟

步驟一:配置Apache Airflow

  1. 下載airflow_alibaba_provider-0.0.3-py3-none-any.whl

  2. 在Airflow的每個節點上安裝airflow-alibaba-provider插件。

    airflow-alibaba-provider插件是由EMR Serverless Spark團隊提供,包含了一個專門用于提交EMR Serverless Spark任務的EmrServerlessSparkStartJobRunOperator組件。

    pip install airflow_alibaba_provider-0.0.3-py3-none-any.whl
  3. 添加Connection。

    CLI方式

    通過Airflow CLI執行相應命令來建立Connection,詳情請參見創建Connection

    airflow connections add 'emr-serverless-spark-id' \
        --conn-json '{
            "conn_type": "emr_serverless_spark",
            "extra": {
                "auth_type": "AK",  #指定使用阿里云的AccessKey(AK)方式認證。
                "access_key_id": "<yourAccesskeyId>",  # 阿里云賬號的AccessKey ID。
                "access_key_secret": "<yourAccesskeyKey>",  # 阿里云賬號的AccessKey Secret。
                "region": "<yourRegion>"
            }
        }'

    UI方式

    通過在Airflow Web頁面手動添加Connection,詳情請參見創建Connection

    Add Connection頁面,配置以下信息。

    image

    涉及參數如下表所示。

    參數

    說明

    Connection Id

    本文示例為emr-serverless-spark-id。

    Connection Type

    選擇Generic。如果沒有該類型,您也可以選擇Email

    Extra

    填寫內容如下。

    {
                "auth_type": "AK",  #指定使用阿里云的AccessKey(AK)方式認證。
                "access_key_id": "<yourAccesskeyId>",  # 阿里云賬號的AccessKey ID。
                "access_key_secret": "<yourAccesskeyKey>",  # 阿里云賬號的AccessKey Secret。
                "region": "<yourRegion>"
            }

步驟二:DAG示例

Airflow的DAG(Directed Acyclic Graph)定義允許您聲明任務執行的方式,以下是通過Airflow使用EmrServerlessSparkStartJobRunOperator執行不同類型的Spark作業的示例。

提交JAR包

此場景涉及使用Airflow任務提交一個預編譯的Spark JAR作業到阿里云EMR Serverless Spark。

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_jar"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_jar = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_jar",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-7e2f1750c6b3****",
        resource_queue_id="root_queue",
        code_type="JAR",
        name="airflow-emr-spark-jar",
        entry_point="oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
        entry_point_args=["1"],
        spark_submit_parameters="--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_jar

提交SQL文件

在Airflow DAG中直接執行SQL命令。

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "emr_spark_sql"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_sql = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_sql",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-7e2f1750c6b3****",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-emr-spark-sql",
        entry_point=None,
        entry_point_args=["-e","show tables;show tables;"],
        spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None,
    )

    emr_spark_sql

從OSS提交SQL文件

從阿里云OSS獲取并執行SQL腳本文件。

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_sql_2"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_sql_2 = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_sql_2",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-ae42e9c92927****",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-emr-spark-sql-2",
        entry_point="",
        entry_point_args=["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
        spark_submit_parameters="--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_sql_2

從OSS提交Python腳本

從阿里云OSS獲取并執行Python腳本文件。

from __future__ import annotations

from datetime import datetime

from airflow.models.dag import DAG
from airflow_alibaba_provider.alibaba.cloud.operators.emr import EmrServerlessSparkStartJobRunOperator

# Ignore missing args provided by default_args
# mypy: disable-error-code="call-arg"

DAG_ID = "emr_spark_python"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2024, 5, 1),
    default_args={},
    max_active_runs=1,
    catchup=False,
) as dag:
    emr_spark_python = EmrServerlessSparkStartJobRunOperator(
        task_id="emr_spark_python",
        emr_serverless_spark_conn_id="emr-serverless-spark-id",
        region="cn-hangzhou",
        polling_interval=5,
        workspace_id="w-ae42e9c92927****",
        resource_queue_id="root_queue",
        code_type="PYTHON",
        name="airflow-emr-spark-python",
        entry_point="oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
        entry_point_args=["1"],
        spark_submit_parameters="--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1",
        is_prod=True,
        engine_release_version=None
    )

    emr_spark_python

涉及參數如下表所示。

參數

參數類型

描述

task_id

str

指定Airflow任務的唯一標識符。

emr_serverless_spark_conn_id

str

指定Airflow用于連接EMR Serverless Spark的Connection ID。

region

str

指定EMR Spark所處的區域。

polling_interval

int

設置Airflow輪詢任務狀態的時間間隔,單位為秒。

workspace_id

str

EMR Spark工作區的唯一標識符。

resource_queue_id

str

用于指定EMR Spark任務所使用資源隊列的ID。

code_type

str

任務類型,可以是SQL、Python或JAR,根據任務類型,entry_point參數將有不同的含義。

name

str

EMR Spark任務的名稱。

entry_point

str

指定啟動任務的文件位置,例如JAR、SQL或Python文件。根據code_type的不同,此參數代表的含義不同。

entry_point_args

List

傳遞給Spark程序的參數列表。

spark_submit_parameters

str

包含用于spark-submit命令的額外參數。

is_prod

bool

指定任務運行的環境。當設置為True時,則表明任務將在生產環境中執行,resource_queue_id應指定對應生產環境的資源隊列ID,例如root_queue。

engine_release_version

str

設定EMR Spark引擎的版本。默認值是"esr-2.1-native",對應Spark版本為3.3.1和Scala版本為2.12,使用原生運行時。