DataWorks提供PyODPS 2節點類型,您可以在DataWorks上通過PyODPS語法進行PyODPS任務開發,PyODPS集成了MaxCompute的Python SDK。支持您在DataWorks的PyODPS 2節點上直接編輯Python代碼,操作MaxCompute。
前提條件
已創建PyODPS 2節點,詳情請參見創建并管理MaxCompute節點。
背景信息
PyODPS是MaxCompute的Python版本的SDK,提供簡單方便的Python編程接口,以便您使用Python編寫MaxCompute作業、查詢MaxCompute表和視圖,以及管理MaxCompute資源,詳情請參見PyODPS概述。在DataWorks中,您可通過PyODPS節點實現Python任務的調度運行,以及與其他作業的集成操作。
注意事項
在DataWorks資源組本地運行PyODPS節點代碼時,若代碼中需要調用第三方包,Serverless資源組可通過自定義鏡像安裝第三方包。
說明如果代碼中存在UDF引用第三方包的情況,不支持使用上述方式,具體配置方法,請參見UDF示例:Python UDF使用第三方包。
如果您的PyODPS任務需要訪問特殊的網絡環境(如VPC網絡或IDC網絡中的數據源或服務等),請使用Serverless調度資源組,并參考網絡連通解決方案打通Serverless資源組與目標環境的網絡連通。
PyODPS語法及更多信息請參見PyODPS文檔。
PyODPS節點分為PyODPS 2和PyODPS 3兩種,二者的區別在于底層Python語言版本不同。PyODPS 2底層Python語言版本為Python 2,PyODPS 3底層Python語言版本為Python 3,請您根據實際使用的Python語言版本創建PyODPS節點。
若通過PyODPS節點執行SQL無法正常產生數據血緣關系,即數據血緣在數據地圖無法正常展示,您可在任務代碼處通過手動設置DataWorks調度運行的相關參數解決。查看數據血緣,詳情請參見查看血緣信息;參數設置,詳情請參見設置運行參數hints。任務運行時所需參數可參考如下代碼獲取。
import os ... # get DataWorks sheduler runtime parameters skynet_hints = {} for k, v in os.environ.items(): if k.startswith('SKYNET_'): skynet_hints[k] = v ... # setting hints while submiting a task o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints) ...
Pyodps節點的輸出日志最大支持4MB。建議您盡量避免在日志中直接輸出大量的數據結果。相反,建議您多輸出告警日志和正常進度的日志,以提供更有價值的信息。
使用限制
使用獨享調度資源組執行PyODPS節點時,建議在節點內獲取到獨享資源組本地處理的數據不超過50MB,該操作受限于獨享調度資源組的規格,處理的本地數據過多并超出操作系統閾值時可能發生OOM(Got Killed)錯誤。請避免在PyODPS節點中寫入過多的數據處理代碼。詳情請參見高效使用PyODPS最佳實踐。
使用Serverless資源組執行PYODPS節點時,您可根據節點內需要處理的數據量合理配置PyODPS節點的CU。
如果您發現有Got killed報錯,即表明內存使用超限,進程被中止。因此,請盡量避免本地的數據操作。通過PyODPS發起的SQL和DataFrame任務(除to_pandas外)不受此限制。
非自定義函數代碼可以使用平臺預裝的Numpy和Pandas。不支持其他帶有二進制代碼的三方包。
由于兼容性原因,在DataWorks中,options.tunnel.use_instance_tunnel默認設置為False。如果需要全局開啟instance tunnel,需要手動將該值設置為True。
PyODPS 2節點底層的Python版本為2.7。
編輯代碼:簡單示例
創建PyODPS節點后,您可以進行代碼編輯及運行,更多關于PyODPS語法說明,請參見基本操作概述。
ODPS入口
DataWorks的PyODPS節點中,將會包含一個全局的變量odps或o,即ODPS入口,您無需手動定義ODPS入口。
print(odps.exist_table('PyODPS_iris'))
執行SQL
您可以在PyODPS節點中執行SQL,詳情請參見SQL。
DataWorks上默認未開啟instance tunnel,即instance.open_reader默認使用Result接口(最多一萬條記錄)。您可以通過reader.count獲取記錄數。如果您需要迭代獲取全部數據,則需要關閉
limit
限制。您可以通過下列語句在全局范圍內打開Instance Tunnel并關閉limit
限制。options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # 關閉limit限制,讀取全部數據。 with instance.open_reader() as reader: # 通過Instance Tunnel可讀取全部數據。
您也可以通過在
上添加tunnel=True
,實現僅對本次open_reader開啟instance tunnel。同時,您還可以添加limit=False
,實現僅對本次關閉limit
限制。# 本次open_reader使用Instance Tunnel接口,且能讀取全部數據。 with instance.open_reader(tunnel=True, limit=False) as reader:
說明若您未開啟Instance Tunnel,可能導致獲取數據格式錯誤,解決方法請參見Python SDK常見問題。
設置運行參數
您可以通過設置hints參數,來設置運行時的參數,參數類型是dict。 Hints參數的詳情請參見SET操作。
o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
對全局配置設置sql.settings后,每次運行時,都需要添加相關的運行時的參數。
from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # 根據全局配置添加hints。
讀取運行結果
運行SQL的實例能夠直接執行open_reader的操作,有以下兩種情況:
SQL返回了結構化的數據。
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # 處理每一個record。
可能執行的是desc等SQL語句,通過reader.raw屬性,獲取到原始的SQL執行結果。
with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)
說明如果使用了自定義調度參數,頁面上直接觸發運行PyODPS 3節點時,需要寫死時間,PyODPS節點無法直接替換。
DataFrame
您還可以通過DataFrame的方式處理數據。
執行
在DataWorks的環境里,DataFrame的執行需要顯式調用立即執行的方法。
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # 調用立即執行的方法,處理每條Record。
如果您需要在Print時調用立即執行,需要開啟
options.interactive
。from odps import options from odps.df import DataFrame options.interactive = True # 在開始處打開開關。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # Print時會立即執行。
打印詳細信息
通過設置
options.verbose
選項。在DataWorks上,默認已經處于打開狀態,運行過程會打印Logview等詳細過程。
示例
以下以一個簡單示例為您介紹PyODPS節點的使用:
準備數據集,創建pyodps_iris示例表,具體操作請參見Dataframe數據處理。
創建DataFrame,詳情請參見從MaxCompute表創建DataFrame。
在PyODPS節點中輸入以下代碼并運行。
from odps.df import DataFrame # 從ODPS表創建DataFrame。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))
返回結果:
sepallength 0 4.5 1 5.5 2 4.9 3 5.0 4 6.0
編輯代碼:進階示例
若節點需要周期性調度,您需要定義節點調度時的相關屬性,調度配置詳情請參見任務調度屬性配置概述。
使用調度參數
單擊節點編輯區域右側的調度配置,在參數區域配置自定義參數,PyODPS節點與SQL節點定義變量的方式不同,詳情請參見調度參數配置。
與DataWorks中的SQL節點不同,為了避免影響代碼,PyODPS節點不會在代碼中替換類似 ${param_name}的字符串,而是在執行代碼前,在全局變量中增加一個名為args
的dict,調度參數可以在此獲取。例如,在參數中設置ds=${yyyymmdd}
,則可以通過以下方式在代碼中獲取該參數。
print('ds=' + args['ds'])
ds=20161116
如果您需要獲取名為ds
的分區,則可以使用如下方法。
o.get_table('table_name').get_partition('ds=' + args['ds'])
更多場景的PyODPS任務開發,請參考:
后續步驟
如何判斷Shell自定義腳本任務的成功完成:Python自定義腳本任務的成功完成的判斷邏輯與Shell節點一致,您可通過該方法進行判斷。
發布任務:如果您使用的是標準模式的工作空間,需要通過任務發布流程,將任務發布至生產環境后,任務才會周期調度運行。
周期任務運維:任務提交發布至生產運維中心調度后,您可通過DataWorks運維中心進行相關運維操作。
PyODPS常見問題:您可了解PyODPS執行過程中的常見問題,便于出現異常時快速排查解決。