Python
實時計算Flink版支持在Flink SQL作業中使用Python自定義函數,本文為您介紹Flink Python自定義函數的分類、Python依賴使用方法和調優方式。
自定義函數分類
分類 | 描述 |
UDSF(User Defined Scalar Function) | 用戶自定義標量值函數,將0個、1個或多個標量值映射到一個新的標量值。其輸入與輸出是一對一的關系,即讀入一行數據,寫出一條輸出值。詳情請參見自定義標量函數(UDSF)。 |
UDAF(User Defined Aggregation Function) | 自定義聚合函數,將多條記錄聚合成1條記錄。其輸入與輸出是多對一的關系,即將多條輸入記錄聚合成一條輸出值。詳情請參見自定義聚合函數(UDAF)。 |
UDTF(User Defined Table-valued Function) | 自定義表值函數,將0個、1個或多個標量值作為輸入參數(可以是變長參數)。與自定義的標量函數類似,但與標量函數不同。表值函數可以返回任意數量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。調用一次函數輸出多行或多列數據。詳情請參見自定義表值函數(UDTF)。 |
使用Python依賴
實時計算Flink版集群已預裝了Pandas、NumPy和PyArrow等常用的Python包,您可以在Python作業開發頁面,了解實時計算Flink版中已安裝的第三方Python包列表。預裝的Python包使用時需要在Python函數內部導入。示例如下。
@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
import numpy as np
return np.percentile(values, percentile)
此外,您也可以在Python自定義函數中使用其他類型的第三方Python包。需要注意的是,如果使用了非預裝的第三方Python包,在注冊Python UDF時,需要將其作為依賴文件上傳,詳情請參見管理自定義函數(UDF)和使用Python依賴。
代碼調試
您可以在Python自定義函數的代碼實現中,通過Logging的方式,輸出日志信息,方便問題定位,示例如下。
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("hello world")
return i + j
日志輸出后,您可以在TaskManager的日志文件中查看日志,詳情請參見查看運行日志。
性能調優
預先加載資源
預先加載資源可以在UDF初始化時提前加載資源,無需在每一次執行計算(即eval)時重新加載資源。例如,您可能只想加載一次大型深度學習模型,然后對模型多次運行批量預測。代碼示例如下。
from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf
class Predict(ScalarFunction):
def open(self, function_context):
import pickle
with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f)
def eval(self, x):
return self.model.predict(x)
predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
關于如何上傳Python數據文件,可以參考文檔使用Python依賴。
使用Pandas庫
除了普通Python自定義函數之外,實時計算Flink版也支持您使用Pandas自定義函數。對于Pandas自定義函數,輸入數據的類型是Pandas中定義的數據結構,例如pandas.Series和pandas.DataFrame等,您可以在Pandas自定義函數中使用Pandas和NumPy等高性能的Python庫,開發出高性能的Python自定義函數,詳情請參見Vectorized User-defined Functions。
配置參數
Python自定義函數的性能在很大程度取決于Python自定義函數自身的實現,如果遇到性能問題,您需要盡可能優化Python自定義函數的實現。除此之外,Python自定義函數的性能也受以下參數取值的影響。
參數 | 說明 |
python.fn-execution.bundle.size | Python UDF的計算是異步的,在執行過程中,Java算子將數據異步發送給Python進程進行處理。Java算子在將數據發送給Python進程之前,會先將數據緩存起來,到達一定閾值之后,再發送給Python進程。python.fn-execution.bundle.size參數可用來控制可緩存的數據最大條數。 默認值為100000,單位是條數。 |
python.fn-execution.bundle.time | 用來控制數據的最大緩存時間。當緩存的數據條數到達python.fn-execution.bundle.size定義的閾值或緩存時間到達python.fn-execution.bundle.time定義的閾值時,會觸發緩存數據的計算。 默認值為1000,單位是毫秒。 |
python.fn-execution.arrow.batch.size | 使用Pandas UDF時,一個arrow batch可容納的數據最大條數,默認值為10000。 說明 python.fn-execution.arrow.batch.size參數值不能大于python.fn-execution.bundle.size參數值。 |
以上3個參數并不是配置的越大越好,當這些參數取值配置過大時,可能會導致Checkpoint時,需要處理過多的數據,從而導致Checkpoint時間過長,甚至會導致Checkpoint失敗。以上參數的更多詳情,請參見Configuration。
相關文檔
自定義函數的注冊、更新及刪除方法,請參見管理自定義函數(UDF)。
Python自定義函數的開發和使用demo,請參見自定義聚合函數(UDAF)、自定義標量函數(UDSF)和自定義表值函數(UDTF)。
如何在Flink Python作業中使用自定義的Python虛擬環境、第三方Python包、JAR包和數據文件等,請參見使用Python依賴。
JAVA自定義函數的開發和使用demo,請參見自定義聚合函數(UDAF)、自定義標量函數(UDSF)和自定義表值函數(UDTF)。
JAVA自定義函數的調試和調優方法,請參見概述。