MaxCompute使用的Python 2版本為2.7。本文為您介紹如何通過Python 2語言編寫UDF。
UDF代碼結構
您可以通過MaxCompute Studio工具使用Python 2語言編寫UDF代碼,代碼中需要包含如下信息:
編碼聲明:可選。
固定聲明格式為
#coding:utf-8
或# -*- coding: utf-8 -*-
,二者等效。當Python 2代碼中出現中文字符時,運行程序會報錯,必須在代碼頭部增加編碼聲明。導入模塊:必選。
至少要包含
from odps.udf import annotate
,導入函數簽名模塊,MaxCompute才可以識別后續代碼中定義的函數簽名。當UDF代碼中需要引用文件資源或表資源時,需要包含from odps.distcache import get_cache_file
(文件資源)或from odps.distcache import get_cache_table
(表資源)。函數簽名。必選。
格式為
@annotate(<signature>)
,signature
用于定義函數的輸入參數和返回值的數據類型。更多函數簽名信息,請參見函數簽名及數據類型。自定義Python類:必選。
UDF代碼的組織單位,定義了實現業務需求的變量及方法。您還可以在代碼中引用MaxCompute內置的第三方庫或引用文件、表資源。更多信息,請參見第三方庫或引用資源。
evaluate
方法:必選。位于自定義的Python類中。
evaluate
方法定義了輸入參數和返回值。一個Python類中只能包含一個evaluate
方法。
UDF代碼示例如下。
#coding:utf-8
#導入函數簽名模塊。
from odps.udf import annotate
#函數簽名。
@annotate("bigint,bigint->bigint")
#自定義Python類。
class MyPlus(object):
#evaluate方法。
def evaluate(self, arg0, arg1):
if None in (arg0, arg1):
return None
return arg0 + arg1
使用限制
MaxCompute Python 2 UDF使用的Python版本為2.7,并以沙箱模式執行用戶代碼,即代碼是在一個受限的運行環境中執行的。在該環境中,以下行為會被禁止:
讀寫本地文件。
啟動子進程。
啟動線程。
使用Socket通信。
其他系統調用。
基于上述原因,您上傳的代碼都必須通過標準Python實現。Python標準庫中涉及到上述功能的模塊或C擴展模塊都會被禁止使用。具體標準庫的可用模塊說明如下:
所有基于標準Python實現(不依賴擴展模塊)的模塊都可用。
C擴展模塊中下列模塊可用:
array、audioop
binascii、bisect
cmath、_codecs_cn、_codecs_hk、_codecs_iso2022、_codecs_jp、_codecs_kr、_codecs_tw、_collections、cStringIO
datetime
_functools、future_builtins、
_heapq、_hashlib
itertools
_json
_locale、_lsprof
math、_md5、_multibytecodec
operator
_random
_sha256、_sha512、_sha、_struct、strop
time
unicodedata
_weakref
cPickle
沙箱限制了您的代碼最多可向標準輸出和標準錯誤輸出寫入數據的大小為20 KB,即
sys.stdout/sys.stderr
最多能寫入20 KB數據,多余的字符會被忽略。
第三方庫
MaxCompute的Python 2運行環境中安裝了除Python標準庫外比較常用的第三方庫,作為標準庫的補充,例如Numpy。
使用第三方庫存在限制,例如禁止本地訪問、網絡I/O受限,因此第三方庫中涉及到相關功能的API也被禁止使用。
函數簽名及數據類型
函數簽名格式如下。
@annotate(<signature>)
signature
為字符串,用于標識輸入參數和返回值的數據類型。執行UDF時,UDF函數的輸入參數和返回值類型要與函數簽名指定的類型一致。查詢語義解析階段會檢查不符合函數簽名定義的用法,檢查到類型不匹配時會報錯。具體格式如下。
'arg_type_list -> type'
其中:
arg_type_list
:表示輸入參數的數據類型。輸入參數可以為多個,用英文逗號(,)分隔。支持的數據類型為BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、復雜數據類型(ARRAY、MAP、STRUCT)或復雜數據類型嵌套。arg_type_list
還支持星號(*)或為空(''):當
arg_type_list
為星號(*)時,表示輸入參數為任意個數。當
arg_type_list
為空('')時,表示無輸入參數。
type
:表示返回值的數據類型。UDF只返回一列。支持的數據類型為:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、復雜數據類型(ARRAY、MAP、STRUCT)或復雜數據類型嵌套。
在編寫UDF代碼過程中,您可以根據MaxCompute項目的數據類型版本選取合適的數據類型,更多數據類型版本及各版本支持的數據類型信息,請參見數據類型版本說明。
合法的函數簽名示例如下。
函數簽名示例 | 說明 |
| 輸入參數類型為BIGINT、DOUBLE,返回值類型為STRING。 |
| 輸入任意個參數,返回值類型為STRING。 |
| 無輸入參數,返回值類型為DOUBLE。 |
| 輸入參數類型為ARRAY<BIGINT>,返回值類型為STRUCT<x:STRING, y:INT>。 |
| 無輸入參數,返回值類型為MAP<BIGINT, STRING>。 |
為確保編寫Python UDF過程中使用的數據類型與MaxCompute支持的數據類型保持一致,您需要關注二者間的數據類型映射關系。具體映射關系如下。
MaxCompute SQL Type | Python 2 Type |
BIGINT | INT |
STRING | STR |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
DATETIME | INT |
FLOAT | FLOAT |
CHAR | STR |
VARCHAR | STR |
BINARY | BYTEARRAY |
DATE | INT |
DECIMAL | DECIMAL.DECIMAL |
ARRAY | LIST |
MAP | DICT |
STRUCT | COLLECTIONS.NAMEDTUPLE |
DATETIME類型對應的Python類型是INT,值為Epoch UTC Time起至今的毫秒數。您可以通過Python標準庫中的DATETIME模塊處理日期時間類型。
odps.udf.int(value,[silent=True])
增加了參數silent
。當silent
為True時,如果value
無法轉為INT,則會返回None(不會返回異常)。NULL值對應Python的None。
引用資源
Python UDF可以通過odps.distcache
模塊引用資源,支持引用文件資源和表資源。
odps.distcache.get_cache_file(resource_name)
:返回指定文件資源的內容。resource_name
為STRING類型,對應當前MaxCompute項目中已存在的文件資源名。如果文件資源名非法或者沒有相應的文件資源,會返回異常。說明使用UDF訪問資源,在創建UDF時需要聲明引用的資源,否則會報錯。
返回值為File-like對象。在使用完此對象后,您需要調用
close
方法釋放打開的資源文件。
引用文件資源示例如下。
from odps.udf import annotate from odps.distcache import get_cache_file @annotate('bigint->string') class DistCacheExample(object): def __init__(self): cache_file = get_cache_file('test_distcache.txt') kv = {} for line in cache_file: line = line.strip() if not line: continue k, v = line.split() kv[int(k)] = v cache_file.close() self.kv = kv def evaluate(self, arg): return self.kv.get(arg)
odps.distcache.get_cache_table(resource_name)
:返回指定表資源的內容。resource_name
支持STRING類型,對應當前MaxCompute項目中已存在的表資源名。如果表資源名非法或者沒有相應的表資源,會返回異常。返回值為GENERATOR類型,調用者以遍歷方式獲取表的內容,每次遍歷可得到以數組形式存在的表中的一條記錄。
引用表資源示例如下。
from odps.udf import annotate from odps.distcache import get_cache_table @annotate('->string') class DistCacheTableExample(object): def __init__(self): self.records = list(get_cache_table('udf_test')) self.counter = 0 self.ln = len(self.records) def evaluate(self): if self.counter > self.ln - 1: return None ret = self.records[self.counter] self.counter += 1 return str(ret)
UDF開發:通用流程
開發UDF時通常需進行準備工作、編寫UDF代碼、上傳并注冊UDF、調用調試UDF這幾個步驟。同時MaxCompute支持多種工具,例如常見的MaxCompute Studio、DataWorks、odpscmd。
使用各類工具開發Python2 UDF的流程與Python3的流程一致,通用流程示例請參見Python3的相關文檔UDF開發:通用流程。
使用MaxCompute Studio完整開發及調用Python 2 UDF的示例也可參見開發Python UDF。
UDF開發完成后:UDF調用說明
完成Python 2 UDF開發后,您即可通過MaxCompute SQL調用Python 2 UDF。調用方法如下:
在歸屬MaxCompute項目中使用自定義函數:使用方法與內建函數類似,您可以參照內建函數的使用方法使用自定義函數。
跨項目使用自定義函數:即在項目A中使用項目B的自定義函數,跨項目分享語句示例:
select B:udf_in_other_project(arg0, arg1) as res from table_t;
。更多跨項目分享信息,請參見基于Package跨項目訪問資源。