Airflow調(diào)度Spark
Airflow是比較流行的開源調(diào)度工具,可以實(shí)現(xiàn)各類工作負(fù)載的DAG編排與調(diào)度。您可以通過AnalyticDB for MySQL Spark Airflow Operator、Spark-Submit命令行工具來實(shí)現(xiàn)Airflow調(diào)度Spark任務(wù)。本文介紹如何通過Airflow調(diào)度AnalyticDB for MySQL Spark作業(yè)。
注意事項(xiàng)
AnalyticDB for MySQL Spark支持的配置參數(shù),請參見Spark應(yīng)用配置參數(shù)說明。
如果您使用的是Apache Livy的調(diào)度方式,AnalyticDB for MySQL Spark Livy Proxy相關(guān)工具會在近期發(fā)布,可與維護(hù)團(tuán)隊(duì)聯(lián)系申請邀測使用。
Spark Airflow Operator命令行工具
準(zhǔn)備工作
安裝Airflow服務(wù)并啟動。具體操作,請參見Airflow社區(qū)文檔。
安裝Airflow Spark插件。執(zhí)行如下命令:
pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl
操作步驟
準(zhǔn)備Connection,示例如下。具體操作,請參見創(chuàng)建Connection。
{ "auth_type": "AK", "access_key_id": "<your_access_key_ID>", "access_key_secret": "<your_access_key_secret>", "region": "<your_region>" }
創(chuàng)建DAG聲明Spark工作流,本文的DAG聲明文件為
spark_dags.py
。from datetime import datetime from airflow.models.dag import DAG from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator with DAG( dag_id="my_dag_name", start_date=datetime(2021, 1, 1), default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"}, max_active_runs=1, catchup=False, ) as dag: spark_batch = AnalyticDBSparkBatchOperator( task_id="task1", file="oss://<bucket_name>/tmp/spark-examples.jar", class_name="org.apache.spark.examples.SparkPi" ) spark_sql = AnalyticDBSparkSQLOperator( task_id="task2", sql="SHOW DATABASES;" ) spark_batch >> spark_sql
參數(shù)說明如下。
AnalyticDBSparkBatchOperator支持配置的參數(shù)。
參數(shù)
是否必填
說明
file
是
Spark應(yīng)用主文件的存儲路徑,文件路徑需為絕對路徑。主文件是入口類所在的JAR包或者Python的入口執(zhí)行文件。
重要Spark應(yīng)用主文件目前只支持存儲在OSS中。
OSS Bucket與AnalyticDB for MySQL集群需要在同一地域。
class_name
是
Java或Scala程序入口類名稱,必填參數(shù)。
Python不需要指定入口類,非必填參數(shù)。
args
否
Spark應(yīng)用參數(shù)。
conf
否
與開源Spark中的配置項(xiàng)基本一致,參數(shù)格式為key: value形式。與開源Spark用法不一致的配置參數(shù)及AnalyticDB for MySQL特有的配置參數(shù),請參見Spark應(yīng)用配置參數(shù)說明。
jars
否
Spark應(yīng)用依賴的JAR包。需填寫JAR包文件的絕對路徑。JAR包在運(yùn)行時會被加入到Driver和Executor JVM的ClassPath里面。
重要Spark應(yīng)用所依賴的所有JAR包必須存儲在OSS中。
OSS Bucket與AnalyticDB for MySQL集群需要在同一地域。
py_files
否
PySpark依賴的Python文件,后綴可以是ZIP、PY和EGG。如果依賴多個Python文件,建議使用ZIP或者EGG壓縮包。您可以在Python代碼中以module方式引用Python文件。
重要Spark應(yīng)用所依賴的所有Python文件須存儲在OSS中。
files
否
Spark應(yīng)用依賴的文件資源,文件會被下載到Driver和Executor進(jìn)程的當(dāng)前執(zhí)行目錄下。
支持配置文件別名,例如oss://<testBucketName>/test/test1.txt#test1,test1為文件別名,您可以使用./test1或者./test1.txt訪問文件。
說明files中包含名為log4j.properties的文件時,Spark會使用該log4j.properties文件作為日志配置。
Spark應(yīng)用所依賴的所有文件須存儲在OSS中。
driver_resource_spec
否
Spark driver的資源規(guī)格。默認(rèn)值為medium。
不同型號的取值對應(yīng)不同的規(guī)格,詳情請參見Spark資源規(guī)格列表的型號列。
說明spark.driver.resourceSpec與spark.executor.resourceSpec參數(shù)取值相同。
僅提交Spark離線應(yīng)用時,可使用開源Spark參數(shù),且取值需為Spark資源規(guī)格列表中的核數(shù)和內(nèi)存。
executor_resource_spec
否
Spark executor的資源規(guī)格。默認(rèn)值為medium。
不同型號的取值對應(yīng)不同的規(guī)格,詳情請參見Spark資源規(guī)格列表的型號列。
num_executors
否
Spark Executor個數(shù)。默認(rèn)值為3。
archives
否
Spark應(yīng)用依賴的壓縮包資源,目前支持.TAR.GZ后綴。壓縮包會被解壓到當(dāng)前Spark進(jìn)程的當(dāng)前目錄下。
支持配置文件別名,例如oss://testBucketName/test/test1.tar.gz#test1,test1為文件別名。假設(shè)test2.txt是test1.tar.gz壓縮包中的文件,您可以使用./test1/test2.txt或者./test1.tar.gz/test2.txt訪問解壓后的文件。
說明Spark應(yīng)用所依賴的所有壓縮包須存儲在OSS中。壓縮包解壓縮失敗,任務(wù)會失敗。
name
否
Spark應(yīng)用名稱。
cluster_id
是
AnalyticDB for MySQL企業(yè)版、基礎(chǔ)版及湖倉版集群ID。
rg_name
是
AnalyticDB for MySQL企業(yè)版、基礎(chǔ)版及湖倉版集群的Job型資源組名稱。
adb_spark_conn_id
是
AnalyticDB for MySQL Spark Airflow Connection ID。默認(rèn)值為
adb_spark_default
。region
是
AnalyticDB for MySQL企業(yè)版、基礎(chǔ)版及湖倉版集群所屬地域ID。
polling_interval
否
掃描Spark應(yīng)用狀態(tài)周期。
AnalyticDBSparkSQLOperator支持配置的參數(shù)。
參數(shù)
是否必填
說明
SQL
是
Spark SQL語句。
conf
否
與開源Spark中的配置項(xiàng)基本一致,參數(shù)格式為key: value形式。與開源Spark用法不一致的配置參數(shù)及AnalyticDB for MySQL特有的配置參數(shù),請參見Spark應(yīng)用配置參數(shù)說明。
driver_resource_spec
否
Spark driver的資源規(guī)格。默認(rèn)值為medium。
不同型號的取值對應(yīng)不同的規(guī)格,詳情請參見Spark資源規(guī)格列表的型號列。
說明spark.driver.resourceSpec與spark.executor.resourceSpec參數(shù)取值相同。
僅提交Spark離線應(yīng)用時,可使用開源Spark參數(shù),且取值需為Spark資源規(guī)格列表中的核數(shù)和內(nèi)存。
executor_resource_spec
否
Spark executor的資源規(guī)格。默認(rèn)值為medium。
不同型號的取值對應(yīng)不同的規(guī)格,詳情請參見Spark資源規(guī)格列表的型號列。
num_executors
否
Spark Executor個數(shù)。默認(rèn)值為3。
name
否
Spark應(yīng)用名稱。
cluster_id
是
AnalyticDB for MySQL企業(yè)版、基礎(chǔ)版及湖倉版集群ID。
rg_name
是
AnalyticDB for MySQL企業(yè)版、基礎(chǔ)版及湖倉版集群的Job型資源組名稱。
adb_spark_conn_id
是
AnalyticDB for MySQL Spark Airflow Connection ID。默認(rèn)值為
adb_spark_default
。region
是
AnalyticDB for MySQL企業(yè)版、基礎(chǔ)版及湖倉版集群所屬地域ID。
polling_interval
否
掃描Spark應(yīng)用狀態(tài)周期。
將
spark_dags.py
文件存放至Airflow Configuration聲明dags_folder所在的文件夾中。執(zhí)行DAG。具體操作請參見Airflow社區(qū)文檔。
Spark-Submit命令行工具
對于AnalyticDB for MySQL特有的配置項(xiàng),例如clusterId、regionId、keyId、secretId、ossUploadPath,您可以在AnalyticDB for MySQL Spark工具包的配置文件conf/spark-defaults.conf
中進(jìn)行配置,也可以通過Airflow參數(shù)來配置。詳情請參見Spark應(yīng)用配置參數(shù)。
準(zhǔn)備工作
準(zhǔn)備工作一:安裝Airflow服務(wù)
安裝Airflow服務(wù)并啟動。具體操作請參見Airflow社區(qū)文檔。
安裝Airflow Spark插件。執(zhí)行如下命令:
pip3 install apache-airflow-providers-apache-spark
重要您需要使用Python3來安裝Airflow Spark插件。
安裝apache-airflow-providers-apache-spark會默認(rèn)安裝社區(qū)版Pyspark,需要執(zhí)行如下命令將pyspark卸載。
pip3 uninstall pyspark
準(zhǔn)備工作二:下載并配置Spark-Submit命令行工具
下載Spark-Submit命令行工具包并進(jìn)行配置。具體操作請參見通過Spark-Submit命令行工具開發(fā)Spark應(yīng)用。
配置PATH路徑。執(zhí)行以下命令,將Spark-Submit命令行工具的地址加入Airflow執(zhí)行地址。
export PATH=$PATH:</your/adb/spark/path/bin>
重要在啟動Airflow之前需要將Spark-Submit加入到PATH中,否則調(diào)度任務(wù)可能會找不到Spark-Submit命令。
操作步驟
準(zhǔn)備DAG聲明文件。本文以創(chuàng)建Airflow DAG的demo.py文件為例。
from airflow.models import DAG from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from airflow.utils.dates import days_ago args = { 'owner': 'Aliyun ADB Spark', } with DAG( dag_id='example_spark_operator', default_args=args, schedule_interval=None, start_date=days_ago(2), tags=['example'], ) as dag: adb_spark_conf = { "spark.driver.resourceSpec": "medium", "spark.executor.resourceSpec": "medium" } # [START howto_operator_spark_submit] submit_job = SparkSubmitOperator( conf=adb_spark_conf, application="oss://<bucket_name>/jar/pi.py", task_id="submit_job", verbose=True ) # [END howto_operator_spark_submit] # [START howto_operator_spark_sql] sql_job = SparkSqlOperator( conn_id="spark_default", sql="SELECT * FROM yourdb.yourtable", conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]), task_id="sql_job", verbose=True ) # [END howto_operator_spark_sql] submit_job >> sql_job
將編輯完成的demo.py文件放至Airflow安裝目錄的dags目錄下。
執(zhí)行DAG。具體操作請參見Airflow社區(qū)文檔。