Lindorm計算引擎通過HTTP RESTful API的方式提供Spark Python作業提交入口,您可以按照這種方式運行流批任務、機器學習和圖計算任務。本文介紹Lindorm計算引擎Python作業開發的詳細步驟。
前提條件
已開通Lindorm計算引擎,具體操作請參見開通與變配。
Spark Python作業開發流程
步驟一:準備Spark Python作業
下載Spark Python作業示例壓縮包,下載鏈接為Spark Python作業示例。
解壓Spark Python作業示例壓縮包,解壓后的目錄名稱為
lindorm-spark-examples
。打開lindorm-spark-examples/python
目錄,參考python目錄結構。項目開發的根目錄以
your_project
為例,介紹項目的目錄結構。在
your_project
目錄下新建__init__.py
文件,內容為空。改造入口文件。
在入口文件中編寫代碼,將
your_project
添加到sys.path
中,代碼詳情請參見Spark Python作業示例中的lindorm-spark-examples/python/your_project/main.py
文件的Notice1部分。# Notice1: You need to do the following step to complete the code modification: # Step1: Please add a "__init__.py" to your project directory, your project will act as a module of launcher.py # Step2: Please add current dir to sys.path, you should add the following code to your main file current_dir = os.path.abspath(os.path.dirname(__file__)) sys.path.append(current_dir) print("current dir in your_project: %s" % current_dir) print("sys.path: %s \n" % str(sys.path))
在入口文件中將入口邏輯封裝到
main(argv)
方法中。代碼詳情請參見Spark Python作業示例中的lindorm-spark-examples/python/your_project/main.py
文件的Notice2部分。# Notice2: Move the code in `if __name__ == "__main__":` branch to a new defined main(argv) function, # so that launcher.py in parent directory just call main(sys.argv) def main(argv): print("Receive arguments: %s \n" % str(argv)) print("current dir in main: %s \n" % os.path.abspath(os.path.dirname(__file__))) # Write your code here if __name__ == "__main__": main(sys.argv)
創建Spark Python作業啟動的入口文件,用來調用
main(argv)
方法。在根目錄your_project
創建同級目錄launcher.py
,可以復制Spark Python作業示例中的lindorm-spark-examples/python/launcher.py
文件。
步驟二:打包Spark Python作業
打包項目依賴的Python環境和第三方類庫。推薦使用Conda或者Virtualenv將依賴類庫打包為tar包,具體操作請參見Python Package Management。
重要使用Conda或者Virtualenv打的tar包通過spark.archives傳遞,可以是spark.archives支持的所有格式。詳細說明,請參見spark.archives。
請在Linux環境下完成該操作,以保證Lindorm計算引擎能正常識Python二進制文件。
打包項目文件。將
your_project
文件打包為.zip
或者.egg
格式文件。執行以下命令將項目文件打包為
.zip
格式文件。zip -r project.zip your_project
將項目文件打包為
.egg
格式文件,具體操作請參見Building Eggs。
步驟三:上傳Spark Python作業
步驟四:提交Spark Python作業
請求參數包括以下兩個部分:
Python作業環境參數說明,示例如下:
{"spark.archives":"oss://testBucketName/pyspark_conda_env.tar.gz#environment", "spark.kubernetes.driverEnv.PYSPARK_PYTHON":"./environment/bin/python","spark.submit.pyFiles":"oss://testBucketName/your_project.zip"}
提交Python作業項目文件(也就是
.zip
、.egg
或者.py
格式的文件)時,請配置configs參數中的spark.submit.pyFiles。提交Python環境和第三方類庫(也就是tar包)時,請配置configs參數中的spark.archives和spark.kubernetes.driverEnv.PYSPARK_PYTHON。
配置spark.archives參數時使用井號(#)指定targetDir。
配置spark.kubernetes.driverEnv.PYSPARK_PYTHON參數指定Python文件路徑。
如果將文件上傳至OSS,需要在configs參數中配置以下信息。
表 1. 配置configs相關參數
參數
示例值
說明
spark.hadoop.fs.oss.endpoint
oss-cn-beijing-internal.aliyuncs.com
存儲Python文件的OSS地址。
spark.hadoop.fs.oss.accessKeyId
testAccessKey ID
通過阿里云控制臺創建的Access Key ID和Access Key Secret,獲取方法請參見創建AccessKey。
spark.hadoop.fs.oss.accessKeySecret
testAccessKey Secret
spark.hadoop.fs.oss.impl
固定值:org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
訪問OSS的類。
說明更多參數請參見參數說明。
Python作業開發示例
下載并解壓Spark Python作業示例。
改造入口文件,打開Python目錄下的your_project/main.py文件并配置相關代碼。
將your_project目錄添加到sys.path中。
current_dir = os.path.abspath(os.path.dirname(__file__)) sys.path.append(current_dir) print("current dir in your_project: %s" % current_dir) print("sys.path: %s \n" % str(sys.path))
在main.py文件中添加入口邏輯,如下示例初始化SparkSession。
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("PythonImportTest") \ .getOrCreate() print(spark.conf) spark.stop()
在Python目錄下打包your_project文件。
zip -r your_project.zip your_project
在Linux環境下,使用Conda打包Python運行環境。
conda create -y -n pyspark_conda_env -c conda-forge numpy conda-pack conda activate pyspark_conda_env conda pack -f -o pyspark_conda_env.tar.gz
將打包好的your_project.zip和pyspark_conda_env.tar.gz上傳至OSS,并將Python目錄下的launcher.py文件上傳至OSS。
通過以下兩種方式提交作業。
作業診斷
Python作業提交成功后,可以在作業列表頁面查看作業運行狀況和SparkUI訪問地址,具體操作請參見查看作業。作業提交過程中如果有其他問題,請提交工單并將JobID和WebUI地址提供給工單處理人員。