PyODPS使用第三方包
本文為您介紹如何在PyODPS中使用第三方包。PyODPS制作第三方包的操作請參見PyODPS制作第三方包。
前提條件
已開通MaxCompute產(chǎn)品。如何開通請參見開通MaxCompute。
已開通DataWorks產(chǎn)品。如何開通請參見開通DataWorks。
上傳三方包
使用三方包前,請確保您生成的包已被上傳至MaxCompute Archive資源。上傳方式如下:
使用代碼上傳資源。您需要將
packages.tar.gz
替換成目標(biāo)包所在的路徑和文件名:import os from odps import ODPS # 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環(huán)境變量設(shè)置為用戶 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環(huán)境變量設(shè)置為用戶 Access Key Secret, # 不建議直接使用 Access Key ID / Access Key Secret 字符串 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='<your-default-project>', endpoint='<your-end-point>', ) o.create_resource("test_packed.tar.gz", "archive", fileobj=open("packages.tar.gz", "rb"))
使用DataWorks上傳資源。具體操作請參見步驟一:創(chuàng)建或上傳資源。
在Python UDF中使用三方包
您需要對UDF進行修改以使用上傳的三方包。具體如下:
在UDF類的
_init_
方法中添加對三方包的引用。在UDF代碼(例如evaluate或process方法)中調(diào)用三方包。
示例
下面以實現(xiàn)scipy中的psi函數(shù)為例,為您介紹如何在Python UDF中使用三方包。
使用以下命令打包scipy。
pyodps-pack -o scipy-bundle.tar.gz scipy
編寫以下代碼,并將其保存為
test_psi_udf.py
。import sys from odps.udf import annotate @annotate("double->double") class MyPsi(object): def __init__(self): # 將路徑增加到引用路徑 sys.path.insert(0, "work/scipy-bundle.tar.gz/packages") def evaluate(self, arg0): # 將 import 語句保持在 evaluate 函數(shù)內(nèi)部 from scipy.special import psi return float(psi(arg0))
代碼解釋:
__init__
函數(shù)中將work/scipy-bundle.tar.gz/packages
添加至sys.path
,因為MaxCompute會將所有UDF引用的Archive資源以資源名稱為目錄解壓至work
目錄下,而packages
則是pyodps-pack
生成包的子目錄。而將對scipy的import放在evaluate函數(shù)體內(nèi)部的原因是三方包僅在執(zhí)行時可用,當(dāng)UDF在MaxCompute服務(wù)端被解析時,解析環(huán)境不包含三方包,函數(shù)體外的三方包import會導(dǎo)致報錯。將
test_psi_udf.py
上傳為MaxCompute Python資源,并將scipy-bundle.tar.gz
上傳為Archive資源。創(chuàng)建UDF名為
test_psi_udf
,引用上述兩個資源文件,并指定類名為test_psi_udf.MyPsi
。步驟3~4中,可以使用PyODPS或者MaxCompute客戶端的方式執(zhí)行。
使用PyODPS執(zhí)行方法:
import os from odps import ODPS # 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環(huán)境變量設(shè)置為用戶 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環(huán)境變量設(shè)置為用戶 Access Key Secret, # 不建議直接使用 Access Key ID / Access Key Secret 字符串 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='<your-default-project>', endpoint='<your-end-point>', ) bundle_res = o.create_resource( "scipy-bundle.tar.gz", "archive", fileobj=open("scipy-bundle.tar.gz", "rb") ) udf_res = o.create_resource( "test_psi_udf.py", "py", fileobj=open("test_psi_udf.py", "rb") ) o.create_function( "test_psi_udf", class_type="test_psi_udf.MyPsi", resources=[bundle_res, udf_res] )
使用MaxCompute客戶端執(zhí)行方法:
add archive scipy-bundle.tar.gz; add py test_psi_udf.py; create function test_psi_udf as test_psi_udf.MyPsi using test_psi_udf.py,scipy-bundle.tar.gz;
完成以上操作后,即可使用UDF執(zhí)行SQL。
set odps.pypy.enabled=false; set odps.isolation.session.enable=true; select test_psi_udf(sepal_length) from iris;
在PyODPS DataFrame中使用三方包
PyODPS DataFrame支持在execute或persist時使用libraries
參數(shù)使用上面的第三方庫。 下面以map方法為例,apply或map_reduce方法的過程類似。
使用以下命令打包scipy。
pyodps-pack -o scipy-bundle.tar.gz scipy
假定表名為
test_float_col
,內(nèi)容只包含一列FLOAT值:col1 0 3.75 1 2.51
計算
psi(col1)
的值,代碼如下:import os from odps import ODPS, options def my_psi(v): from scipy.special import psi return float(psi(v)) # 如果 Project 開啟了 Isolation,下面的選項不是必需的 options.sql.settings = {"odps.isolation.session.enable": True} # 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環(huán)境變量設(shè)置為用戶 Access Key ID, # ALIBABA_CLOUD_ACCESS_KEY_SECRET 環(huán)境變量設(shè)置為用戶 Access Key Secret, # 不建議直接使用 Access Key ID / Access Key Secret 字符串 o = ODPS( os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'), os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'), project='<your-default-project>', endpoint='<your-end-point>', ) df = o.get_table("test_float_col").to_df() # 直接執(zhí)行并取得結(jié)果 df.col1.map(my_psi).execute(libraries=["scipy-bundle.tar.gz"]) # 保存到另一張表 df.col1.map(my_psi).persist("result_table", libraries=["scipy-bundle.tar.gz"])
(可選)如果希望在整個執(zhí)行過程中使用相同的三方包,可以設(shè)置全局選項:
from odps import options options.df.libraries = ["scipy-bundle.tar.gz"]
完成以上操作后,即可在DataFrame執(zhí)行時使用相關(guān)的三方包。
在DataWorks中使用三方包
DataWorks PyODPS節(jié)點預(yù)置了若干三方包,同時提供了load_resource_package
方法用以引用其他的包,具體使用方式請參見使用三方包。
手動上傳和使用三方包
以下內(nèi)容僅作為維護舊項目或者舊環(huán)境的參考,新項目建議直接使用pyodps-pack
打包。
部分舊項目可能采用了之前的方式使用三方包,即手動上傳所有依賴的Wheel包并在代碼中引用,或者使用了不支持二進制包的舊版MaxCompute環(huán)境,本節(jié)內(nèi)容為該場景準(zhǔn)備。下面以在map中使用python_dateutil為例為您介紹使用三方包的步驟。
在Linux Bash中使用
pip download
命令,下載包及其依賴到某個路徑。下載后會出現(xiàn)兩個包,即six-1.10.0-py2.py3-none-any.whl
和python_dateutil-2.5.3-py2.py3-none-any.whl
。pip download python-dateutil -d /to/path/
說明您需要下載支持Linux環(huán)境的包,建議直接在Linux下調(diào)用該命令。
將上述已下載的兩個包分別上傳至ODPS資源。
方式一:通過代碼上傳。
# 這里要確保資源名的后綴是正確的文件類型 odps.create_resource('six.whl', 'file', file_obj=open('six-1.10.0-py2.py3-none-any.whl', 'rb')) odps.create_resource('python_dateutil.whl', 'file', file_obj=open('python_dateutil-2.5.3-py2.py3-none-any.whl', 'rb'))
方式二:通過DataWorks界面上傳。
您可以參考步驟一:創(chuàng)建或上傳資源完成目標(biāo)資源的上傳與提交。
使用三方包。
假定DataFrame只有一個STRING類型的字段,內(nèi)容如下。
datestr 0 2016-08-26 14:03:29 1 2015-08-26 14:03:29
全局配置使用到的三方庫如下:
from odps import options def get_year(t): from dateutil.parser import parse return parse(t).strftime('%Y') options.df.libraries = ['six.whl', 'python_dateutil.whl'] df.datestr.map(get_year).execute()
datestr 0 2016 1 2015
通過立即運行方法的
libraries
參數(shù)指定:def get_year(t): from dateutil.parser import parse return parse(t).strftime('%Y') df.datestr.map(get_year).execute(libraries=['six.whl', 'python_dateutil.whl'])
datestr 0 2016 1 2015
PyODPS默認(rèn)支持執(zhí)行純Python且不含文件操作的第三方庫。在較新版本的MaxCompute服務(wù)下,PyODPS也支持執(zhí)行帶有二進制代碼或帶有文件操作的Python庫。這些庫名必須擁有一定的后綴,可根據(jù)下表判斷:
平臺 | Python版本 | 可用的后綴 |
RHEL 5 x86_64 | Python 2.7 | cp27-cp27m-manylinux1_x86_64 |
RHEL 5 x86_64 | Python 3.7 | cp37-cp37m-manylinux1_x86_64 |
RHEL 7 x86_64 | Python 2.7 | cp27-cp27m-manylinux1_x86_64, cp27-cp27m-manylinux2010_x86_64, cp27-cp27m-manylinux2014_x86_64 |
RHEL 7 x86_64 | Python 3.7 | cp37-cp37m-manylinux1_x86_64, cp37-cp37m-manylinux2010_x86_64, cp37-cp37m-manylinux2014_x86_64 |
RHEL 7 Arm64 | Python 3.7 | cp37-cp37m-manylinux2014_aarch64 |
所有的Wheel包都需要以Archive格式上傳,whl后綴的包需要重命名為zip后綴。同時,作業(yè)需要開啟odps.isolation.session.enable
選項,或者在Project級別開啟Isolation。以下示例展示了如何上傳并使用scipy中的特殊函數(shù):
# 對于含有二進制代碼的包,必須使用 Archive 方式上傳資源,whl 后綴需要改為 zip
odps.create_resource('scipy.zip', 'archive', file_obj=open('scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.whl', 'rb'))
# 如果 Project 開啟了 Isolation,下面的選項不是必需的
options.sql.settings = { 'odps.isolation.session.enable': True }
def my_psi(value):
# 建議在函數(shù)內(nèi)部 import 第三方庫,以防止不同操作系統(tǒng)下二進制包結(jié)構(gòu)差異造成執(zhí)行錯誤
from scipy.special import psi
return float(psi(value))
df.float_col.map(my_psi).execute(libraries=['scipy.zip'])
對于只提供源碼的二進制包,可以在Linux Shell中打包成Wheel再上傳,Mac和Windows中生成的Wheel包無法在MaxCompute中使用,Linux Shell中打包命令如下:
python setup.py bdist_wheel