日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

開發PyODPS 2任務

DataWorks提供PyODPS 2節點類型,您可以在DataWorks上通過PyODPS語法進行PyODPS任務開發,PyODPS集成了MaxCompute的Python SDK。支持您在DataWorks的PyODPS 2節點上直接編輯Python代碼,操作MaxCompute。

前提條件

已創建PyODPS 2節點,詳情請參見創建并管理MaxCompute節點

背景信息

PyODPS是MaxCompute的Python版本的SDK,提供簡單方便的Python編程接口,以便您使用Python編寫MaxCompute作業、查詢MaxCompute表和視圖,以及管理MaxCompute資源,詳情請參見PyODPS概述。在DataWorks中,您可通過PyODPS節點實現Python任務的調度運行,以及與其他作業的集成操作。

注意事項

  • 如果您的PyODPS任務需要訪問特殊的網絡環境(如VPC網絡或IDC網絡中的數據源或服務等),請使用Serverless調度資源組,并參考網絡連通解決方案打通Serverless資源組與目標環境的網絡連通。

  • PyODPS語法及更多信息請參見PyODPS文檔

  • PyODPS節點分為PyODPS 2和PyODPS 3兩種,二者的區別在于底層Python語言版本不同。PyODPS 2底層Python語言版本為Python 2,PyODPS 3底層Python語言版本為Python 3,請您根據實際使用的Python語言版本創建PyODPS節點。

  • 若通過PyODPS節點執行SQL無法正常產生數據血緣關系,即數據血緣在數據地圖無法正常展示,您可在任務代碼處通過手動設置DataWorks調度運行的相關參數解決。查看數據血緣,詳情請參見查看血緣信息;參數設置,詳情請參見設置運行參數hints。任務運行時所需參數可參考如下代碼獲取。

    import os
    ...
    # get DataWorks sheduler runtime parameters
    skynet_hints = {}
    for k, v in os.environ.items():
        if k.startswith('SKYNET_'):
            skynet_hints[k] = v
    ...
    # setting hints while submiting a task
    o.execute_sql('INSERT OVERWRITE TABLE XXXX SELECT * FROM YYYY WHERE ***', hints=skynet_hints)
    ...
  • Pyodps節點的輸出日志最大支持4MB。建議您盡量避免在日志中直接輸出大量的數據結果。相反,建議您多輸出告警日志和正常進度的日志,以提供更有價值的信息。

使用限制

  • 使用獨享調度資源組執行PyODPS節點時,建議在節點內獲取到獨享資源組本地處理的數據不超過50MB,該操作受限于獨享調度資源組的規格,處理的本地數據過多并超出操作系統閾值時可能發生OOM(Got Killed)錯誤。請避免在PyODPS節點中寫入過多的數據處理代碼。詳情請參見高效使用PyODPS最佳實踐

  • 使用Serverless資源組執行PYODPS節點時,您可根據節點內需要處理的數據量合理配置PyODPS節點的CU。

  • 如果您發現有Got killed報錯,即表明內存使用超限,進程被中止。因此,請盡量避免本地的數據操作。通過PyODPS發起的SQL和DataFrame任務(除to_pandas外)不受此限制。

  • 非自定義函數代碼可以使用平臺預裝的Numpy和Pandas。不支持其他帶有二進制代碼的三方包。

  • 由于兼容性原因,在DataWorks中,options.tunnel.use_instance_tunnel默認設置為False。如果需要全局開啟instance tunnel,需要手動將該值設置為True。

  • PyODPS 2節點底層的Python版本為2.7。

編輯代碼:簡單示例

創建PyODPS節點后,您可以進行代碼編輯及運行,更多關于PyODPS語法說明,請參見基本操作概述

  • ODPS入口

    DataWorks的PyODPS節點中,將會包含一個全局的變量odpso,即ODPS入口,您無需手動定義ODPS入口。

    print(odps.exist_table('PyODPS_iris'))
  • 執行SQL

    您可以在PyODPS節點中執行SQL,詳情請參見SQL

    • DataWorks上默認未開啟instance tunnel,即instance.open_reader默認使用Result接口(最多一萬條記錄)。您可以通過reader.count獲取記錄數。如果您需要迭代獲取全部數據,則需要關閉limit限制。您可以通過下列語句在全局范圍內打開Instance Tunnel并關閉limit限制。

      options.tunnel.use_instance_tunnel = True
      options.tunnel.limit_instance_tunnel = False  # 關閉limit限制,讀取全部數據。
      
      with instance.open_reader() as reader:
        # 通過Instance Tunnel可讀取全部數據。
    • 您也可以通過在open_reader上添加tunnel=True,實現僅對本次open_reader開啟instance tunnel。同時,您還可以添加 limit=False,實現僅對本次關閉limit限制。

      # 本次open_reader使用Instance Tunnel接口,且能讀取全部數據。
      with instance.open_reader(tunnel=True, limit=False) as reader:
    說明

    若您未開啟Instance Tunnel,可能導致獲取數據格式錯誤,解決方法請參見Python SDK常見問題

  • 設置運行參數

    • 您可以通過設置hints參數,來設置運行時的參數,參數類型是dict。 Hints參數的詳情請參見SET操作

      o.execute_sql('select * from PyODPS_iris', hints={'odps.sql.mapper.split.size': 16})
    • 對全局配置設置sql.settings后,每次運行時,都需要添加相關的運行時的參數。

      from odps import options
      options.sql.settings = {'odps.sql.mapper.split.size': 16}
      o.execute_sql('select * from PyODPS_iris')  # 根據全局配置添加hints。
  • 讀取運行結果

    運行SQL的實例能夠直接執行open_reader的操作,有以下兩種情況:

    • SQL返回了結構化的數據。

      with o.execute_sql('select * from dual').open_reader() as reader:
      	for record in reader:  # 處理每一個record。
    • 可能執行的是desc等SQL語句,通過reader.raw屬性,獲取到原始的SQL執行結果。

      with o.execute_sql('desc dual').open_reader() as reader:
      	print(reader.raw)
      說明

      如果使用了自定義調度參數,頁面上直接觸發運行PyODPS 3節點時,需要寫死時間,PyODPS節點無法直接替換。

  • DataFrame

    您還可以通過DataFrame的方式處理數據。

    • 執行

      在DataWorks的環境里,DataFrame的執行需要顯式調用立即執行的方法

      from odps.df import DataFrame
      iris = DataFrame(o.get_table('pyodps_iris'))
      for record in iris[iris.sepal_width < 3].execute():  # 調用立即執行的方法,處理每條Record。

      如果您需要在Print時調用立即執行,需要開啟options.interactive

      from odps import options
      from odps.df import DataFrame
      options.interactive = True  # 在開始處打開開關。
      iris = DataFrame(o.get_table('pyodps_iris'))
      print(iris.sepal_width.sum())  # Print時會立即執行。
    • 打印詳細信息

      通過設置options.verbose選項。在DataWorks上,默認已經處于打開狀態,運行過程會打印Logview等詳細過程。

示例

以下以一個簡單示例為您介紹PyODPS節點的使用:

  1. 準備數據集,創建pyodps_iris示例表,具體操作請參見Dataframe數據處理

  2. 創建DataFrame,詳情請參見從MaxCompute表創建DataFrame

  3. 在PyODPS節點中輸入以下代碼并運行。

    from odps.df import DataFrame
    
    # 從ODPS表創建DataFrame。
    iris = DataFrame(o.get_table('pyodps_iris'))
    print(iris.sepallength.head(5))

    返回結果:

       sepallength
    0          4.5
    1          5.5
    2          4.9
    3          5.0
    4          6.0

編輯代碼:進階示例

若節點需要周期性調度,您需要定義節點調度時的相關屬性,調度配置詳情請參見任務調度屬性配置概述

使用調度參數

單擊節點編輯區域右側的調度配置,在參數區域配置自定義參數,PyODPS節點與SQL節點定義變量的方式不同,詳情請參見調度參數配置

與DataWorks中的SQL節點不同,為了避免影響代碼,PyODPS節點不會在代碼中替換類似 ${param_name}的字符串,而是在執行代碼前,在全局變量中增加一個名為args的dict,調度參數可以在此獲取。例如,在參數中設置ds=${yyyymmdd},則可以通過以下方式在代碼中獲取該參數。

print('ds=' + args['ds'])
ds=20161116
說明

如果您需要獲取名為ds的分區,則可以使用如下方法。

o.get_table('table_name').get_partition('ds=' + args['ds'])

更多場景的PyODPS任務開發,請參考:

后續步驟

  • 如何判斷Shell自定義腳本任務的成功完成:Python自定義腳本任務的成功完成的判斷邏輯與Shell節點一致,您可通過該方法進行判斷。

  • 發布任務:如果您使用的是標準模式的工作空間,需要通過任務發布流程,將任務發布至生產環境后,任務才會周期調度運行。

  • 周期任務運維:任務提交發布至生產運維中心調度后,您可通過DataWorks運維中心進行相關運維操作。

  • PyODPS常見問題:您可了解PyODPS執行過程中的常見問題,便于出現異常時快速排查解決。