DataWorks提供PyODPS 2節點類型,您可以在DataWorks上通過PyODPS語法進行PyODPS任務開發,PyODPS集成了MaxCompute的Python SDK。支持您在DataWorks的PyODPS 2節點上直接編輯Python代碼,操作MaxCompute。
前提條件
已創建PyODPS 2節點,詳情請參見創建并管理MaxCompute節點。
背景信息
PyODPS是MaxCompute的Python版本的SDK,提供簡單方便的Python編程接口,以便您使用Python編寫MaxCompute作業、查詢MaxCompute表和視圖,以及管理MaxCompute資源,詳情請參見PyODPS概述。在DataWorks中,您可通過PyODPS節點實現Python任務的調度運行,以及與其他作業的集成操作。
注意事項
DataWorks支持可視化創建Python資源,如果您需要在PyODPS節點中調用第三方包,請使用獨享調度資源組并通過運維助手進行安裝。
DataWorks運維助手中安裝的Python第三方包,僅支持在DataWorks獨享調度資源組本地運行PyODPS任務代碼時引用,如果需要在MaxCompute的Python UDF中引用Python第三方包,詳情請參見UDF示例:Python UDF使用第三方包。
如果您的PyODPS任務需要訪問特殊的網絡環境(如VPC網絡或IDC網絡中的數據源或服務等),請使用獨享調度資源組,并參考網絡連通解決方案打通獨享資源組與目標環境的網絡連通。
PyODPS語法及更多信息請參見PyODPS文檔。
PyODPS節點分為PyODPS 2和PyODPS 3兩種,二者的區別在于底層Python語言版本不同。PyODPS 2底層Python語言版本為Python 2,PyODPS 3底層Python語言版本為Python 3,請您根據實際使用的Python語言版本創建PyODPS節點。
使用限制
DataWorks建議您在PyODPS節點內獲取到本地處理的數據不超過50 MB,該操作受限于DataWorks執行資源的不同規格(包括公共調度資源組和獨享調度資源組),處理的本地數據過多并超出操作系統閾值時可能發生OOM(Got killed)錯誤。請避免在PyODPS節點中寫入過多的數據處理代碼。詳情請參見高效使用PyODPS最佳實踐。
如果您發現有Got killed報錯,即表明內存使用超限,進程被中止。因此,請盡量避免本地的數據操作。通過PyODPS發起的SQL和DataFrame任務(除to_pandas外)不受此限制。
非自定義函數代碼可以使用平臺預裝的Numpy和Pandas。不支持其他帶有二進制代碼的三方包。
由于兼容性原因,在DataWorks中,options.tunnel.use_instance_tunnel默認設置為False。如果需要全局開啟instance tunnel,需要手動將該值設置為True。
PyODPS 2節點底層的Python版本為2.7。
編輯代碼:簡單示例
創建PyODPS節點后,您可以進行代碼編輯及運行,更多關于PyODPS語法說明,請參見基本操作概述。
ODPS入口
DataWorks的PyODPS節點中,將會包含一個全局的變量odps或o,即ODPS入口,您無需手動定義ODPS入口。
print(odps.exist_table('PyODPS_iris'))
執行SQL
您可以在PyODPS節點中執行SQL,詳情請參見SQL。
DataWorks上默認未開啟instance tunnel,即instance.open_reader默認使用Result接口(最多一萬條記錄)。您可以通過reader.count獲取記錄數。如果您需要迭代獲取全部數據,則需要關閉
limit
限制。您可以通過下列語句在全局范圍內打開Instance Tunnel并關閉limit
限制。options.tunnel.use_instance_tunnel = True options.tunnel.limit_instance_tunnel = False # 關閉limit限制,讀取全部數據。 with instance.open_reader() as reader: # 通過Instance Tunnel可讀取全部數據。
您也可以通過在
上添加tunnel=True
,實現僅對本次open_reader開啟instance tunnel。同時,您還可以添加limit=False
,實現僅對本次關閉limit
限制。# 本次open_reader使用Instance Tunnel接口,且能讀取全部數據。 with instance.open_reader(tunnel=True, limit=False) as reader:
說明若您未開啟Instance Tunnel,可能導致獲取數據格式錯誤,解決方法請參見Python SDK常見問題。
設置運行參數
您可以通過設置hints參數,來設置運行時的參數,參數類型是dict。 Hints參數的詳情請參見SET操作。
o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
對全局配置設置sql.settings后,每次運行時,都需要添加相關的運行時的參數。
from odps import options options.sql.settings = {'odps.sql.mapper.split.size': 16} o.execute_sql('select * from PyODPS_iris') # 根據全局配置添加hints。
讀取運行結果
運行SQL的實例能夠直接執行open_reader的操作,有以下兩種情況:
SQL返回了結構化的數據。
with o.execute_sql('select * from dual').open_reader() as reader: for record in reader: # 處理每一個record。
可能執行的是desc等SQL語句,通過reader.raw屬性,獲取到原始的SQL執行結果。
with o.execute_sql('desc dual').open_reader() as reader: print(reader.raw)
說明如果使用了自定義調度參數,頁面上直接觸發運行PyODPS 3節點時,需要寫死時間,PyODPS節點無法直接替換。
DataFrame
您還可以通過DataFrame的方式處理數據。
執行
在DataWorks的環境里,DataFrame的執行需要顯式調用立即執行的方法。
from odps.df import DataFrame iris = DataFrame(o.get_table('pyodps_iris')) for record in iris[iris.sepal_width < 3].execute(): # 調用立即執行的方法,處理每條Record。
如果您需要在Print時調用立即執行,需要開啟
options.interactive
。from odps import options from odps.df import DataFrame options.interactive = True # 在開始處打開開關。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepal_width.sum()) # Print時會立即執行。
打印詳細信息
通過設置
options.verbose
選項。在DataWorks上,默認已經處于打開狀態,運行過程會打印Logview等詳細過程。
示例
以下以一個簡單示例為您介紹PyODPS節點的使用:
準備數據集,創建pyodps_iris示例表,具體操作請參見Dataframe數據處理。
創建DataFrame,詳情請參見從MaxCompute表創建DataFrame。
在PyODPS節點中輸入以下代碼并運行。
from odps.df import DataFrame # 從ODPS表創建DataFrame。 iris = DataFrame(o.get_table('pyodps_iris')) print(iris.sepallength.head(5))
返回結果:
sepallength 0 4.5 1 5.5 2 4.9 3 5.0 4 6.0
編輯代碼:進階示例
若節點需要周期性調度,您需要定義節點調度時的相關屬性,調度配置詳情請參見任務調度屬性配置概述。
使用調度參數
單擊節點編輯區域右側的調度配置,在參數區域配置自定義參數,PyODPS節點與SQL節點定義變量的方式不同,詳情請參見調度參數配置。
與DataWorks中的SQL節點不同,為了避免影響代碼,PyODPS節點不會在代碼中替換類似 ${param_name}的字符串,而是在執行代碼前,在全局變量中增加一個名為args
的dict,調度參數可以在此獲取。例如,在參數中設置ds=${yyyymmdd}
,則可以通過以下方式在代碼中獲取該參數。
print('ds=' + args['ds'])
ds=20161116
如果您需要獲取名為ds
的分區,則可以使用如下方法。
o.get_table('table_name').get_partition('ds=' + args['ds'])
更多場景的PyODPS任務開發,請參考:
后續步驟
如何判斷Shell自定義腳本任務的成功完成:Python自定義腳本任務的成功完成的判斷邏輯與Shell節點一致,您可通過該方法進行判斷。
發布任務:如果您使用的是標準模式的工作空間,需要通過任務發布流程,將任務發布至生產環境后,任務才會周期調度運行。
周期任務運維:任務提交發布至生產運維中心調度后,您可通過DataWorks運維中心進行相關運維操作。
PyODPS常見問題:您可了解PyODPS執行過程中的常見問題,便于出現異常時快速排查解決。