開發(fā)PyODPS 3任務(wù)
DataWorks為您提供PyODPS 3節(jié)點(diǎn),您可以在該節(jié)點(diǎn)中直接使用Python代碼編寫MaxCompute作業(yè),并進(jìn)行作業(yè)的周期性調(diào)度。本文為您介紹如何通過DataWorks實(shí)現(xiàn)Python任務(wù)的配置與調(diào)度。
前提條件
已創(chuàng)建PyODPS 3節(jié)點(diǎn),詳情請參見創(chuàng)建并管理MaxCompute節(jié)點(diǎn)。
背景信息
PyODPS是MaxCompute的Python版本的SDK,提供簡單方便的Python編程接口,以便您使用Python編寫MaxCompute作業(yè)、查詢MaxCompute表和視圖,以及管理MaxCompute資源,詳情請參見PyODPS概述。在DataWorks中,您可通過PyODPS節(jié)點(diǎn)實(shí)現(xiàn)Python任務(wù)的調(diào)度運(yùn)行,以及與其他作業(yè)的集成操作。
注意事項(xiàng)
DataWorks支持可視化創(chuàng)建Python資源,如果您需要在PyODPS節(jié)點(diǎn)中調(diào)用第三方包,請使用獨(dú)享調(diào)度資源組并通過運(yùn)維助手進(jìn)行安裝。
DataWorks運(yùn)維助手中安裝的Python第三方包,僅支持在DataWorks獨(dú)享調(diào)度資源組本地運(yùn)行PyODPS任務(wù)代碼時(shí)引用,如果需要在MaxCompute的Python UDF中引用Python第三方包,詳情請參見UDF示例:Python UDF使用第三方包。
如果您的PyODPS任務(wù)需要訪問特殊的網(wǎng)絡(luò)環(huán)境(如VPC網(wǎng)絡(luò)或IDC網(wǎng)絡(luò)中的數(shù)據(jù)源或服務(wù)等),請使用獨(dú)享調(diào)度資源組,并參考網(wǎng)絡(luò)連通解決方案打通獨(dú)享資源組與目標(biāo)環(huán)境的網(wǎng)絡(luò)連通。
PyODPS語法及更多信息請參見PyODPS文檔。
PyODPS節(jié)點(diǎn)分為PyODPS 2和PyODPS 3兩種,二者的區(qū)別在于底層Python語言版本不同。PyODPS 2底層Python語言版本為Python 2,PyODPS 3底層Python語言版本為Python 3,請您根據(jù)實(shí)際使用的Python語言版本創(chuàng)建PyODPS節(jié)點(diǎn)。
若通過PyODPS節(jié)點(diǎn)執(zhí)行SQL無法正常產(chǎn)生數(shù)據(jù)血緣關(guān)系,即數(shù)據(jù)血緣在數(shù)據(jù)地圖無法正常展示,您可在任務(wù)代碼處通過手動設(shè)置DataWorks調(diào)度運(yùn)行的相關(guān)參數(shù)解決。查看數(shù)據(jù)血緣,詳情請參見查看血緣信息;參數(shù)設(shè)置,詳情請參見設(shè)置運(yùn)行參數(shù)hints。任務(wù)運(yùn)行時(shí)所需參數(shù)可參考如下代碼獲取。
import os ... # get DataWorks sheduler runtime parameters skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # setting hints while submiting a task o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...
使用限制
DataWorks建議您在PyODPS節(jié)點(diǎn)內(nèi)獲取到本地處理的數(shù)據(jù)不超過50 MB,該操作受限于DataWorks執(zhí)行資源的不同規(guī)格(包括公共調(diào)度資源組和獨(dú)享調(diào)度資源組),處理的本地?cái)?shù)據(jù)過多并超出操作系統(tǒng)閾值時(shí)可能發(fā)生OOM(Got killed)錯(cuò)誤。請避免在PyODPS節(jié)點(diǎn)中寫入過多的數(shù)據(jù)處理代碼。詳情請參見高效使用PyODPS最佳實(shí)踐。
如果您發(fā)現(xiàn)有Got killed報(bào)錯(cuò),即表明內(nèi)存使用超限,進(jìn)程被中止。因此,請盡量避免本地的數(shù)據(jù)操作。通過PyODPS發(fā)起的SQL和DataFrame任務(wù)(除to_pandas外)不受此限制。
非自定義函數(shù)代碼可以使用平臺預(yù)裝的Numpy和Pandas。不支持其他帶有二進(jìn)制代碼的三方包。
由于兼容性原因,在DataWorks中,options.tunnel.use_instance_tunnel默認(rèn)設(shè)置為False。如果需要全局開啟instance tunnel,需要手動將該值設(shè)置為True。
當(dāng)Python 3的子版本號不同(例如Python 3.8和Python 3.7)時(shí),字節(jié)碼的定義有所不同。
目前MaxCompute使用的Python 3版本為3.7,當(dāng)使用其它版本Python 3中的部分語法(例如Python 3.8中的finally block)時(shí),執(zhí)行會報(bào)錯(cuò),建議您選擇Python 3.7。
PyODPS 3支持運(yùn)行在公共資源組和2020年4月之后購買的獨(dú)享調(diào)度資源組上。如果您的獨(dú)享調(diào)度資源組的創(chuàng)建時(shí)間較早,請通過DataWorks交流群聯(lián)系值班同學(xué)升級資源組。
編輯代碼:簡單示例
創(chuàng)建PyODPS節(jié)點(diǎn)后,您可以進(jìn)行代碼編輯及運(yùn)行,更多關(guān)于PyODPS語法說明,請參見基本操作概述。
ODPS入口
DataWorks的PyODPS節(jié)點(diǎn)中,將會包含一個(gè)全局的變量odps或o,即ODPS入口,您無需手動定義ODPS入口。
print(odps.exist_table('PyODPS_iris'))
執(zhí)行SQL
您可以在PyODPS節(jié)點(diǎn)中執(zhí)行SQL,詳情請參見SQL。
DataWorks上默認(rèn)未開啟instance tunnel,即instance.open_reader默認(rèn)使用Result接口(最多一萬條記錄)。您可以通過reader.count獲取記錄數(shù)。如果您需要迭代獲取全部數(shù)據(jù),則需要關(guān)閉
limit
限制。您可以通過下列語句在全局范圍內(nèi)打開Instance Tunnel并關(guān)閉limit
限制。options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # 關(guān)閉limit限制,讀取全部數(shù)據(jù)。 with instance.open_reader() as reader: # 通過Instance Tunnel可讀取全部數(shù)據(jù)。
您也可以通過在
上添加tunnel=True
,實(shí)現(xiàn)僅對本次open_reader開啟instance tunnel。同時(shí),您還可以添加limit=False
,實(shí)現(xiàn)僅對本次關(guān)閉limit
限制。# 本次open_reader使用Instance Tunnel接口,且能讀取全部數(shù)據(jù)。 with instance.open_reader(tunnel=True, limit=False) as reader:
說明若您未開啟Instance Tunnel,可能導(dǎo)致獲取數(shù)據(jù)格式錯(cuò)誤,解決方法請參見Python SDK常見問題。
設(shè)置運(yùn)行參數(shù)
您可以通過設(shè)置hints參數(shù),來設(shè)置運(yùn)行時(shí)的參數(shù),參數(shù)類型是dict。 Hints參數(shù)的詳情請參見SET操作。
o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
對全局配置設(shè)置sql.settings后,每次運(yùn)行時(shí),都需要添加相關(guān)的運(yùn)行時(shí)的參數(shù)。
from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # 根據(jù)全局配置添加hints。
讀取運(yùn)行結(jié)果
運(yùn)行SQL的實(shí)例能夠直接執(zhí)行open_reader的操作,有以下兩種情況:
SQL返回了結(jié)構(gòu)化的數(shù)據(jù)。
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # 處理每一個(gè)record。
可能執(zhí)行的是desc等SQL語句,通過reader.raw屬性,獲取到原始的SQL執(zhí)行結(jié)果。
with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)
說明如果使用了自定義調(diào)度參數(shù),頁面上直接觸發(fā)運(yùn)行PyODPS 3節(jié)點(diǎn)時(shí),需要寫死時(shí)間,PyODPS節(jié)點(diǎn)無法直接替換。
DataFrame
您還可以通過DataFrame的方式處理數(shù)據(jù)。
執(zhí)行
在DataWorks的環(huán)境里,DataFrame的執(zhí)行需要顯式調(diào)用立即執(zhí)行的方法。
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # 調(diào)用立即執(zhí)行的方法,處理每條Record。
如果您需要在Print時(shí)調(diào)用立即執(zhí)行,需要開啟
options.interactive
。from odps import options from odps.df import DataFrame options.interactive = True # 在開始處打開開關(guān)。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # Print時(shí)會立即執(zhí)行。
打印詳細(xì)信息
通過設(shè)置
options.verbose
選項(xiàng)。在DataWorks上,默認(rèn)已經(jīng)處于打開狀態(tài),運(yùn)行過程會打印Logview等詳細(xì)過程。
示例
以下以一個(gè)簡單示例為您介紹PyODPS節(jié)點(diǎn)的使用:
準(zhǔn)備數(shù)據(jù)集,創(chuàng)建pyodps_iris示例表,具體操作請參見Dataframe數(shù)據(jù)處理。
創(chuàng)建DataFrame,詳情請參見從MaxCompute表創(chuàng)建DataFrame。
在PyODPS節(jié)點(diǎn)中輸入以下代碼并運(yùn)行。
from odps.df import DataFrame # 從ODPS表創(chuàng)建DataFrame。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))
返回結(jié)果:
sepallength 0 4.5 1 5.5 2 4.9 3 5.0 4 6.0
編輯代碼:進(jìn)階示例
若節(jié)點(diǎn)需要周期性調(diào)度,您需要定義節(jié)點(diǎn)調(diào)度時(shí)的相關(guān)屬性,調(diào)度配置詳情請參見任務(wù)調(diào)度屬性配置概述。
使用調(diào)度參數(shù)
單擊節(jié)點(diǎn)編輯區(qū)域右側(cè)的調(diào)度配置,在參數(shù)區(qū)域配置自定義參數(shù),PyODPS節(jié)點(diǎn)與SQL節(jié)點(diǎn)定義變量的方式不同,詳情請參見調(diào)度參數(shù)配置。
與DataWorks中的SQL節(jié)點(diǎn)不同,為了避免影響代碼,PyODPS節(jié)點(diǎn)不會在代碼中替換類似 ${param_name}的字符串,而是在執(zhí)行代碼前,在全局變量中增加一個(gè)名為args
的dict,調(diào)度參數(shù)可以在此獲取。例如,在參數(shù)中設(shè)置ds=${yyyymmdd}
,則可以通過以下方式在代碼中獲取該參數(shù)。
print('ds=' + args['ds'])
ds=20161116
如果您需要獲取名為ds
的分區(qū),則可以使用如下方法。
o.get_table('table_name').get_partition('ds=' + args['ds'])
更多場景的PyODPS任務(wù)開發(fā),請參考:
后續(xù)步驟
如何判斷Shell自定義腳本任務(wù)的成功完成:Python自定義腳本任務(wù)的成功完成的判斷邏輯與Shell節(jié)點(diǎn)一致,您可通過該方法進(jìn)行判斷。
發(fā)布任務(wù):如果您使用的是標(biāo)準(zhǔn)模式的工作空間,需要通過任務(wù)發(fā)布流程,將任務(wù)發(fā)布至生產(chǎn)環(huán)境后,任務(wù)才會周期調(diào)度運(yùn)行。
周期任務(wù)運(yùn)維:任務(wù)提交發(fā)布至生產(chǎn)運(yùn)維中心調(diào)度后,您可通過DataWorks運(yùn)維中心進(jìn)行相關(guān)運(yùn)維操作。
PyODPS常見問題:您可了解PyODPS執(zhí)行過程中的常見問題,便于出現(xiàn)異常時(shí)快速排查解決。