MaxCompute當前支持利用Python 3語言來開發自定義函數(UDF),以滿足特定的業務邏輯需求。本文為您介紹如何通過Python 3語言編寫UDF。
UDF代碼結構
您可以通過MaxCompute Studio工具使用Python 3語言編寫UDF代碼,代碼中需要包含如下信息:
導入模塊:必選。
至少要包含
from odps.udf import annotate
,導入函數簽名模塊,MaxCompute才可以識別后續代碼中定義的函數簽名。當UDF代碼中需要引用文件資源或表資源時,需要包含from odps.distcache import get_cache_file
(文件資源)或from odps.distcache import get_cache_table
(表資源)。函數簽名:必選。
格式為
@annotate(<signature>)
,signature
用于定義函數的輸入參數和返回值的數據類型。更多函數簽名信息,請參見函數簽名與數據類型。自定義Python類:必選。
UDF代碼的組織單位,定義了實現業務需求的變量及方法。您還可以在代碼中引用MaxCompute內置的第三方庫或引用文件、表資源。更多信息,請參見第三方庫或引用資源。
evaluate
方法:必選。位于自定義的Python類中。
evaluate
方法定義了輸入參數和返回值。一個Python類中只能包含一個evaluate
方法。
UDF代碼示例如下。
#導入函數簽名模塊。
from odps.udf import annotate
#函數簽名。
@annotate("bigint,bigint->bigint")
#自定義Python類。
class MyPlus(object):
#evaluate方法。
def evaluate(self, arg0, arg1):
if None in (arg0, arg1):
return None
return arg0 + arg1
使用限制
訪問外網
MaxCompute默認不支持通過自定義函數訪問外網。如果您需要通過自定義函數訪問外網,請根據業務情況填寫并提交網絡連接申請表單,MaxCompute技術支持團隊會及時聯系您完成網絡開通操作。表單填寫指導,請參見網絡開通流程。
訪問VPC網絡
MaxCompute默認不支持通過UDF訪問VPC網絡。如果您的UDF涉及訪問VPC網絡中的資源時,需要先創建MaxCompute與目標VPC網絡間的網絡連接,才可以直接通過UDF訪問VPC網絡中的資源,操作詳情請參見通過UDF訪問VPC網絡資源。
讀取表數據
目前版本不支持使用UDF/UDAF/UDTF讀取以下場景的表數據:
做過表結構修改(Schema Evolution)的表數據。
包含復雜數據類型的表數據。
包含JSON數據類型的表數據。
Transactional表的表數據。
注意事項
Python 3與Python 2不兼容。在您使用Python 3之前,需要考慮兼容性問題,在一個SQL中不允許同時使用Python 3和Python 2。
Python 2官方已于2020年初停止維護,建議您根據項目類型執行遷移操作:全新項目:新MaxCompute停止維護Python 2,
UDF開發:通用流程
開發UDF時通常需進行準備工作、編寫UDF代碼、上傳并注冊UDF、調用調試UDF這幾個步驟。同時MaxCompute支持多種工具,以下以常見的MaxCompute Studio、DataWorks、odpscmd三種工具為例,以一個具體的示例為您介紹UDF開發的通用流程。
使用MaxCompute Studio
準備工作。
使用MaxCompute Studio開發調試UDF時,您需要先安裝MaxCompute Studio并連接MaxCompute項目,做好UDF開發前準備工作。操作詳情請參見:
編寫UDF代碼。
在Project區域MaxCompute Studio目錄下,右鍵單擊scripts,選擇 。
在Create new MaxCompute python class對話框中輸入類名Name,選擇類型為Python UDF,單擊OK完成。
在編輯框中編寫UDF代碼。
from odps.udf import annotate @annotate("string,bigint->string") class GetUrlChar(object): def evaluate(self, url, n): if n == 0: return "" try: index = url.find(".htm") if index < 0: return "" a = url[:index] index = a.rfind("/") b = a[index + 1:] c = b.split("-") if len(c) < n: return "" return c[-n] except Exception: return "Internal error"
說明如果需要本地調試Java UDF,請參見測試UDF。
上傳并注冊UDF。
右鍵單擊目標Python程序,選擇Deploy to server…。配置函數名稱后單擊ok。操作詳情請參見上傳及注冊。
本示例配置函數名稱為UDF_GET_URL_CHAR。
調用UDF。
在左側導航欄單擊Project Explore,在目標MaxCompute項目上單擊右鍵,選擇Open Console并在Console區域輸入調用UDF的SQL語句,按Enter鍵運行即可。SQL命令示例如下。
set odps.sql.python.version=cp37; -- python3 UDF需要使用該命令開啟python3 select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);
返回結果如下。
+-----+ | _c0 | +-----+ | a | +-----+
使用DataWorks
準備工作。
使用DataWorks開發調試UDF時,您需要先開通DataWorks并綁定MaxCompute項目,做好UDF開發前準備工作。操作詳情請參見使用DataWorks連接。
編寫UDF代碼。
您可以在任意Python開發工具中開發UDF代碼并打包為一個代碼包。您可以使用以下UDF代碼示例。
from odps.udf import annotate @annotate("string,bigint->string") class GetUrlChar(object): def evaluate(self, url, n): if n == 0: return "" try: index = url.find(".htm") if index < 0: return "" a = url[:index] index = a.rfind("/") b = a[index + 1:] c = b.split("-") if len(c) < n: return "" return c[-n] except Exception: return "Internal error"
上傳并注冊UDF。
您可以將已打包好的代碼包通過DataWorks上傳并完成UDF注冊,操作詳情請參見:
調用UDF。
注冊完成UDF后,您可以創建一個ODPS SQL節點,在節點中編寫并創建SQL命令來調用調試UDF。創建ODPS SQL節點的操作請參見開發ODPS SQL任務,命令示例如下。
set odps.sql.python.version=cp37; -- python3 UDF需要使用該命令開啟python3 select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);
使用odpscmd
準備工作。
使用odpscmd開發調試UDF時,您需要先下載安裝odpscmd工具,并配置config文件連接MaxCompute項目,做好UDF開發前準備工作。操作詳情請參見使用本地客戶端(odpscmd)連接。
編寫UDF代碼。
您可以在任意Python開發工具中開發UDF代碼并打包為一個代碼包。您可以使用以下UDF代碼示例。
from odps.udf import annotate @annotate("string,bigint->string") class GetUrlChar(object): def evaluate(self, url, n): if n == 0: return "" try: index = url.find(".htm") if index < 0: return "" a = url[:index] index = a.rfind("/") b = a[index + 1:] c = b.split("-") if len(c) < n: return "" return c[-n] except Exception: return "Internal error"
上傳并注冊UDF。
您可以將已打包好的代碼包通過odpscmd上傳并完成UDF注冊,操作詳情請參見:
調用UDF。
注冊完成UDF后,您可以編寫并創建SQL命令來調用調試UDF。命令示例如下。
set odps.sql.python.version=cp37; -- python3 UDF需要使用該命令開啟python3 select UDF_GET_URL_CHAR("http://www.taobao.com/a.htm", 1);
UDF開發:安裝第三方庫Numpy
MaxCompute內置的Python 3運行環境中未安裝第三方庫Numpy。如果您需要使用Numpy的UDF,請手動上傳Numpy的WHEEL包。從PyPI或鏡像下載Numpy包時,包的文件名為numpy-<版本號>-cp37-cp37m-manylinux1_x86_64.whl。上傳包的操作請參見資源操作或Python UDF使用第三方包。
Python 3支持的標準庫列表請參見Python 3標準庫。
UDF開發:函數簽名與數據類型
函數簽名格式如下。
@annotate(<signature>)
signature
為字符串,用于標識輸入參數和返回值的數據類型。執行UDF時,UDF函數的輸入參數和返回值類型要與函數簽名指定的類型一致。查詢語義解析階段會檢查不符合函數簽名定義的用法,檢查到類型不匹配時會報錯。具體格式如下。
'arg_type_list -> type'
其中:
arg_type_list
:表示輸入參數的數據類型。輸入參數可以為多個,用英文逗號(,)分隔。支持的數據類型為BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、復雜數據類型(ARRAY、MAP、STRUCT)或復雜數據類型嵌套。arg_type_list
還支持星號(*)或為空(''):當
arg_type_list
為星號(*)時,表示輸入參數為任意個數。當
arg_type_list
為空('')時,表示無輸入參數。
type
:表示返回值的數據類型。UDF只返回一列。支持的數據類型為:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、復雜數據類型(ARRAY、MAP、STRUCT)或復雜數據類型嵌套。
在編寫UDF代碼過程中,您可以根據MaxCompute項目的數據類型版本選取合適的數據類型,更多數據類型版本及各版本支持的數據類型信息,請參見數據類型版本說明。
合法的函數簽名示例如下。
函數簽名示例 | 說明 |
| 輸入參數類型為BIGINT、DOUBLE,返回值類型為STRING。 |
| 輸入任意個參數,返回值類型為STRING。 |
| 無輸入參數,返回值類型為DOUBLE。 |
| 輸入參數類型為ARRAY<BIGINT>,返回值類型為STRUCT<x:STRING, y:INT>。 |
| 無輸入參數,返回值類型為MAP<BIGINT, STRING>。 |
為確保編寫Python UDF過程中使用的數據類型與MaxCompute支持的數據類型保持一致,您需要關注二者間的數據類型映射關系。具體映射關系如下。
MaxCompute SQL Type | Python 3 Type |
BIGINT | INT |
STRING | UNICODE |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
DATETIME | DATETIME.DATETIME |
FLOAT | FLOAT |
CHAR | UNICODE |
VARCHAR | UNICODE |
BINARY | BYTES |
DATE | DATETIME.DATE |
DECIMAL | DECIMAL.DECIMAL |
ARRAY | LIST |
MAP | DICT |
STRUCT | COLLECTIONS.NAMEDTUPLE |
UDF開發:引用資源
Python UDF可以通過odps.distcache
模塊引用資源,支持引用文件資源和表資源。
odps.distcache.get_cache_file(resource_name, mode)
:以指定模式mode
返回指定文件資源的內容。resource_name
支持STRING類型,對應當前MaxCompute項目中已存在的表資源名。如果表資源名非法或者沒有相應的表資源,會返回異常。mode
支持STRING類型,默認值為't'
。當mode
為't'
時以文本格式打開文件,當mode
為'b'
時以二進制格式打開文件。返回值為File-like對象。在使用完此對象后,您需要調用
close
方法釋放打開的資源文件。
引用文件資源示例如下。
from odps.udf import annotate from odps.distcache import get_cache_file @annotate('bigint->string') class DistCacheExample(object): def __init__(self): cache_file = get_cache_file('test_distcache.txt') kv = {} for line in cache_file: line = line.strip() if not line: continue k, v = line.split() kv[int(k)] = v cache_file.close() self.kv = kv def evaluate(self, arg): return self.kv.get(arg)
odps.distcache.get_cache_table(resource_name)
:返回指定資源表的內容。resource_name
對應當前MaxCompute項目中已存在的表資源名。如果表資源名非法或者沒有相應的表資源,會返回異常。支持讀取表中BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、FLOAT、CHAR、VARCHAR、BINARY、DATE、DECIMAL、ARRAY、MAP和STRUCT類型數據。返回值為Generator類型,調用者通過遍歷獲取表的內容,每次遍歷得到的是以數組形式存在的表中的一條記錄。
引用表資源示例如下。
from odps.udf import annotate
from odps.distcache import get_cache_table
@annotate('->string')
class DistCacheTableExample(object):
def __init__(self):
self.records = list(get_cache_table('udf_test'))
self.counter = 0
self.ln = len(self.records)
def evaluate(self):
if self.counter > self.ln - 1:
return None
ret = self.records[self.counter]
self.counter += 1
return str(ret)
UDF開發完成后:UDF調用說明
按照開發流程,完成Python 3 UDF開發后,您即可通過MaxCompute SQL調用Python 3 UDF。調用方法如下。
開啟Python 3
MaxCompute默認使用Python 2,如果您要使用Python 3,可以在Session級別設置如下屬性開啟Python 3,并與SQL語句一起提交執行。
set odps.sql.python.version=cp37;
調用函數
在歸屬MaxCompute項目中使用自定義函數:使用方法與內建函數類似,您可以參照內建函數的使用方法使用自定義函數。
跨項目使用自定義函數:即在項目A中使用項目B的自定義函數,跨項目分享語句示例:
select B:udf_in_other_project(arg0, arg1) as res from table_t;
。更多跨項目分享信息,請參見基于Package跨項目訪問資源。
Python 2 UDF遷移
Python 2官方已于2020年初停止維護,建議您根據項目類型執行遷移操作:
全新項目:新MaxCompute項目,或第一次使用Python語言編寫UDF的MaxCompute項目。建議所有的Python UDF都直接使用Python 3語言編寫。
存量項目:創建了大量Python 2 UDF的MaxCompute項目。請您謹慎開啟Python 3。如果您計劃逐步將所有Python 2 UDF遷移為Python 3 UDF,推薦方法如下:
新作業和新UDF:使用Python 3語言編寫,在Session級別開啟Python 3。開啟Python 3方法,請參見開啟Python 3。
Python 2 UDF:改寫Python 2 UDF,使其可以同時兼容Python 2和Python 3。改寫方法請參見將Python 2代碼移植到Python 3。
說明如果您需要編寫公共UDF,并為多個MaxCompute項目授權UDF的操作權限,建議UDF同時兼容Python 2和Python 3。