Python 3 UDAF
Python官方即將停止維護(hù)Python 2,MaxCompute已支持Python 3,對(duì)應(yīng)版本為CPython-3.7.3。本文為您介紹如何通過Python 3語(yǔ)言編寫UDAF。
UDAF代碼結(jié)構(gòu)
您可以通過MaxCompute Studio工具使用Python 3語(yǔ)言編寫UDAF代碼,代碼中需要包含如下信息:
- 導(dǎo)入模塊:必選。
至少要包含
from odps.udf import annotate
和from odps.udf import BaseUDAF
。from odps.udf import annotate
用于導(dǎo)入函數(shù)簽名模塊,MaxCompute才可以識(shí)別后續(xù)代碼中定義的函數(shù)簽名。from odps.udf import BaseUDAF
為Python UDAF的基類,您需要通過此類在派生類中實(shí)現(xiàn)iterate
、merge
、terminate
等方法。當(dāng)UDAF代碼中需要引用文件資源或表資源時(shí),需要包含
from odps.distcache import get_cache_file
(文件資源)或from odps.distcache import get_cache_table
(表資源)。 函數(shù)簽名:必選。
格式為
@annotate(<signature>)
,signature
用于定義函數(shù)的輸入?yún)?shù)和返回值的數(shù)據(jù)類型。更多函數(shù)簽名信息,請(qǐng)參見函數(shù)簽名及數(shù)據(jù)類型。自定義Python類(派生類):必選。
UDAF代碼的組織單位,定義了實(shí)現(xiàn)業(yè)務(wù)需求的變量及方法。您還可以在代碼中引用MaxCompute內(nèi)置的第三方庫(kù)或引用文件、表資源。更多信息,請(qǐng)參見第三方庫(kù)或引用資源。
- 實(shí)現(xiàn)Python類的方法:必選。
Python類實(shí)現(xiàn)包含如下4個(gè)方法,您可以根據(jù)實(shí)際需要進(jìn)行選擇。
方法定義 描述 BaseUDAF.new_buffer()
返回聚合函數(shù)的中間值的buffer。 buffer
必須是Marshal對(duì)象(例如LIST、DICT),并且buffer
的大小不應(yīng)該隨數(shù)據(jù)量遞增。在極限情況下,buffer
在執(zhí)行對(duì)象序列化后的大小不應(yīng)該超過2 MB。BaseUDAF.iterate(buffer[, args, ...])
將 args
聚合到中間值buffer
中。BaseUDAF.merge(buffer, pbuffer)
將中間值 buffer
和pbuffer
合并的結(jié)果存放在buffer
中。BaseUDAF.terminate(buffer)
將 buffer
轉(zhuǎn)換為MaxCompute SQL的基本類型。
UDAF代碼示例如下。
#導(dǎo)入函數(shù)簽名模塊及基類。
from odps.udf import annotate
from odps.udf import BaseUDAF
#函數(shù)簽名。
@annotate('double->double')
#自定義Python類。
class Average(BaseUDAF):
#實(shí)現(xiàn)Python類的方法。
def new_buffer(self):
return [0, 0]
def iterate(self, buffer, number):
if number is not None:
buffer[0] += number
buffer[1] += 1
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
buffer[1] += pbuffer[1]
def terminate(self, buffer):
if buffer[1] == 0:
return 0.0
return buffer[0] / buffer[1]
avg
的MaxCompute UDAF的實(shí)現(xiàn)邏輯及計(jì)算流程如下。pbuffer
相當(dāng)于上圖中的pr
,buffer
相當(dāng)于上圖中的r
。Python 2 UDAF與Python 3 UDAF區(qū)別在于底層Python語(yǔ)言版本不一致,請(qǐng)您根據(jù)對(duì)應(yīng)版本語(yǔ)言支持的能力編寫UDAF。
注意事項(xiàng)
Python 3與Python 2不兼容。在您使用Python 3之前,需要考慮兼容性問題,在一個(gè)SQL中不允許同時(shí)使用Python 3和Python 2。
Python 2 UDAF遷移
Python 2官方即將停止維護(hù),建議您根據(jù)項(xiàng)目類型執(zhí)行遷移操作:
全新項(xiàng)目:新MaxCompute項(xiàng)目,或第一次使用Python語(yǔ)言編寫UDAF的MaxCompute項(xiàng)目。建議所有的Python UDAF都直接使用Python 3語(yǔ)言編寫。
存量項(xiàng)目:創(chuàng)建了大量Python 2 UDAF的MaxCompute項(xiàng)目。請(qǐng)您謹(jǐn)慎開啟Python 3。如果您計(jì)劃逐步將所有Python 2 UDAF遷移為Python 3 UDAF,推薦方法如下:
新作業(yè)和新UDAF:使用Python 3語(yǔ)言編寫,在Session級(jí)別開啟Python 3。開啟Python 3方法,請(qǐng)參見開啟Python 3。
Python 2 UDAF:改寫Python 2 UDAF,使其可以同時(shí)兼容Python 2和Python 3。改寫方法請(qǐng)參見將Python 2代碼移植到Python 3。
說明如果您需要編寫公共UDAF,并為多個(gè)MaxCompute項(xiàng)目授權(quán)UDAF的操作權(quán)限,建議UDAF同時(shí)兼容Python 2和Python 3。
開啟Python 3
MaxCompute默認(rèn)使用Python 2,如果您要使用Python 3,可以在Session級(jí)別設(shè)置如下屬性開啟Python 3,并與SQL語(yǔ)句一起提交執(zhí)行。
set odps.sql.python.version=cp37;
第三方庫(kù)
MaxCompute內(nèi)置的Python 3運(yùn)行環(huán)境中未安裝第三方庫(kù)Numpy。如果您需要使用Numpy的UDAF,請(qǐng)手動(dòng)上傳Numpy的WHEEL包。從PyPI或鏡像下載Numpy包時(shí),包的文件名為numpy-<版本號(hào)>-cp37-cp37m-manylinux1_x86_64.whl。上傳包的操作請(qǐng)參見資源操作或UDF示例:Python UDF使用第三方包。
函數(shù)簽名及數(shù)據(jù)類型
@annotate(<signature>)
signature
為字符串,用于標(biāo)識(shí)輸入?yún)?shù)和返回值的數(shù)據(jù)類型。執(zhí)行UDAF時(shí),UDAF函數(shù)的輸入?yún)?shù)和返回值類型要與函數(shù)簽名指定的類型一致。查詢語(yǔ)義解析階段會(huì)檢查不符合函數(shù)簽名定義的用法,檢查到類型不匹配時(shí)會(huì)報(bào)錯(cuò)。具體格式如下。'arg_type_list -> type'
其中:arg_type_list
:表示輸入?yún)?shù)的數(shù)據(jù)類型。輸入?yún)?shù)可以為多個(gè),用英文逗號(hào)(,)分隔。支持的數(shù)據(jù)類型為BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、CHAR、VARCHAR、復(fù)雜數(shù)據(jù)類型(ARRAY、MAP、STRUCT)或復(fù)雜數(shù)據(jù)類型嵌套。arg_type_list
還支持星號(hào)(*)或?yàn)榭眨?'):當(dāng)
arg_type_list
為星號(hào)(*)時(shí),表示輸入?yún)?shù)為任意個(gè)數(shù)。當(dāng)
arg_type_list
為空('')時(shí),表示無(wú)輸入?yún)?shù)。
type
:表示返回值的數(shù)據(jù)類型。UDAF只返回一列。支持的數(shù)據(jù)類型為:BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME、DECIMAL、FLOAT、BINARY、DATE、DECIMAL(precision,scale)、復(fù)雜數(shù)據(jù)類型(ARRAY、MAP、STRUCT)或復(fù)雜數(shù)據(jù)類型嵌套。
合法函數(shù)簽名示例如下。
函數(shù)簽名示例 | 說明 |
@annotate('bigint,double->string') | 輸入?yún)?shù)類型為BIGINT、DOUBLE,返回值類型為STRING。 |
@annotate('*->string') | 輸入任意個(gè)參數(shù),返回值類型為STRING。 |
@annotate('->double') | 無(wú)輸入?yún)?shù),返回值類型為DOUBLE。 |
@annotate('array<bigint>->struct<x:string, y:int>') | 輸入?yún)?shù)類型為ARRAY<BIGINT>,返回值類型為STRUCT<x:STRING, y:INT>。 |
為確保編寫Python UDAF過程中使用的數(shù)據(jù)類型與MaxCompute支持的數(shù)據(jù)類型保持一致,您需要關(guān)注二者間的數(shù)據(jù)類型映射關(guān)系。具體映射關(guān)系如下。
MaxCompute SQL Type | Python 3 Type |
BIGINT | INT |
STRING | UNICODE |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
DATETIME | DATETIME.DATETIME |
FLOAT | FLOAT |
CHAR | UNICODE |
VARCHAR | UNICODE |
BINARY | BYTES |
DATE | DATETIME.DATE |
DECIMAL | DECIMAL.DECIMAL |
ARRAY | LIST |
MAP | DICT |
STRUCT | COLLECTIONS.NAMEDTUPLE |
引用資源
Python UDAF可以通過odps.distcache
模塊引用資源,支持引用文件資源和表資源。
odps.distcache.get_cache_file(resource_name)
:返回指定文件資源的內(nèi)容。resource_name
為STRING類型,對(duì)應(yīng)當(dāng)前MaxCompute項(xiàng)目中已存在的文件資源名。如果文件資源名非法或者沒有相應(yīng)的文件資源,會(huì)返回異常。說明 使用UDAF訪問資源,在創(chuàng)建UDAF時(shí)需要聲明引用的資源,否則會(huì)報(bào)錯(cuò)。- 返回值為File-like對(duì)象。在使用完此對(duì)象后,您需要調(diào)用
close
方法釋放打開的資源文件。
odps.distcache.get_cache_table(resource_name)
:返回指定表資源的內(nèi)容。resource_name
支持STRING類型,對(duì)應(yīng)當(dāng)前MaxCompute項(xiàng)目中已存在的表資源名。如果表資源名非法或者沒有相應(yīng)的表資源,會(huì)返回異常。- 返回值為GENERATOR類型,調(diào)用者以遍歷方式獲取表的內(nèi)容,每次遍歷可得到以數(shù)組形式存在的表中的一條記錄。
具體使用方法請(qǐng)參見引用資源(Python UDF 3)和引用資源(Python UDTF 3)。
使用說明
按照開發(fā)流程,完成Python 3 UDAF開發(fā)后,您即可通過MaxCompute SQL調(diào)用Python 3 UDAF。調(diào)用方法如下:
在歸屬M(fèi)axCompute項(xiàng)目中使用自定義函數(shù):使用方法與內(nèi)建函數(shù)類似,您可以參照內(nèi)建函數(shù)的使用方法使用自定義函數(shù)。
跨項(xiàng)目使用自定義函數(shù):即在項(xiàng)目A中使用項(xiàng)目B的自定義函數(shù),跨項(xiàng)目分享語(yǔ)句示例:
select B:udf_in_other_project(arg0, arg1) as res from table_t;
。更多跨項(xiàng)目分享信息,請(qǐng)參見基于Package跨項(xiàng)目訪問資源。
使用MaxCompute Studio完整開發(fā)及調(diào)用Python 3 UDAF的操作,請(qǐng)參見開發(fā)Python UDF。
UDAF的動(dòng)態(tài)參數(shù)說明
函數(shù)簽名
Python UDAF函數(shù)簽名格式請(qǐng)參見函數(shù)簽名及數(shù)據(jù)類型。
您可以在參數(shù)列表中使用
*
,表示接受任意長(zhǎng)度、任意類型的輸入?yún)?shù)。例如@annotate('double,*->string')
表示接受第一個(gè)參數(shù)是DOUBLE類型,后接任意長(zhǎng)度、任意類型的參數(shù)列表。此時(shí),您需要自己編寫代碼判斷輸入的個(gè)數(shù)和參數(shù)類型,然后對(duì)它們進(jìn)行相應(yīng)的操作(您可以對(duì)比C語(yǔ)言里面的printf
函數(shù)來理解此操作)。說明*
用在返回值列表中時(shí),表示的是不同的含義。UDAF的返回值可以使用
*
,表示返回任意個(gè)STRING類型。返回值的個(gè)數(shù)與調(diào)用函數(shù)時(shí)設(shè)置的別名個(gè)數(shù)有關(guān)。例如@annotate("bigint,string->double,*")
,調(diào)用方式是UDTF(x, y) as (a, b, c)
,此處as
后面設(shè)置了三個(gè)別名,即a
、b
、c
。編輯器會(huì)認(rèn)定a
為DOUBLE類型(Annotation中返回值第一列的類型是給定的),b
和c
為STRING類型。因?yàn)檫@里給出了三個(gè)返回值,所以UDTF在調(diào)用forward
時(shí),forward
必須是長(zhǎng)度為3的數(shù)組,否則會(huì)出現(xiàn)運(yùn)行時(shí)報(bào)錯(cuò)。說明這種錯(cuò)誤無(wú)法在編譯時(shí)報(bào)出,因此UDTF的調(diào)用者在SQL中設(shè)置alias個(gè)數(shù)時(shí),必須遵循該UDAF定義的規(guī)則。由于聚合函數(shù)的返回值個(gè)數(shù)固定是1,所以這個(gè)功能對(duì)UDAF來說并無(wú)意義。
UDAF示例
from odps.udf import annotate
from odps.udf import BaseUDAF
@annotate('bigint,*->string')
class MultiColSum(BaseUDAF):
def new_buffer(self):
return [0]
def iterate(self, buffer, *args):
for arg in args:
buffer[0] += int(arg)
def merge(self, buffer, pbuffer):
buffer[0] += pbuffer[0]
def terminate(self, buffer):
return str(buffer[0])
UDAF的返回值只能固定為1個(gè),以上UDAF示例中,返回值是多個(gè)輸入?yún)?shù)求和,然后多行聚合求和的結(jié)果,使用示例如下。
-- 根據(jù)輸入多個(gè)參數(shù)求和
SELECT my_multi_col_sum(a,b,c,d,e) from values (1,"2","3","4","5"), (6,"7","8","9","10") t(a,b,c,d,e);
-- 返回值為 55