日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

自定義聚合函數(UDAF)

本文為您介紹Python自定義聚合函數(UDAF)開發、注冊和使用流程。

定義

自定義聚合函數(UDAF),將多條記錄聚合成1條記錄。其輸入與輸出是多對一的關系,即將多條輸入記錄聚合成一條輸出值。

使用限制

由于實時計算Flink版受部署環境和網絡環境等因素的影響,開發Python自定義函數時,需要注意以下限制:

  • 僅支持開源Flink V1.12及以上版本。

  • Flink工作空間已預裝了Python 3.7.9,因此需要您在Python 3.7.9版本開發代碼。

  • Flink運行環境僅支持JDK 8和JDK 11,如果Python作業中依賴第三方JAR包,請確保JAR包兼容。

  • 僅支持開源Scala V2.11版本,如果Python作業中依賴第三方JAR包,請確保使用Scala V2.11對應的JAR包依賴。

UDAF開發

說明

Flink為您提供了Python UDX示例,便于您快速開發UDX。Flink Python UDX示例中包含了Python UDF、Python UDAF和Python UDTF的實現。本文以Windows操作系統為例,為您介紹如何進行UDAF開發。

  1. 下載并解壓python_demo-master示例到本地。
  2. 在PyCharm中,單擊file > open,打開剛才解壓縮完成的python_demo-master。
  3. 雙擊打開\python_demo-master\udx\udafs.py后,根據您的業務,配置udafs.py。

    該示例中,weighted_avg定義了當前數據和歷史數據求含權重的均值的代碼。

    from pyflink.common import Row
    from pyflink.table import AggregateFunction, DataTypes
    from pyflink.table.udf import udaf
    
    
    class WeightedAvg(AggregateFunction):
    
        def create_accumulator(self):
            # Row(sum, count)
            return Row(0, 0)
    
        def get_value(self, accumulator: Row) -> float:
            if accumulator[1] == 0:
                return 0
            else:
                return accumulator[0] / accumulator[1]
    
        def accumulate(self, accumulator: Row, value, weight):
            accumulator[0] += value * weight
            accumulator[1] += weight
    
        def retract(self, accumulator: Row, value, weight):
            accumulator[0] -= value * weight
            accumulator[1] -= weight
    
    
    weighted_avg = udaf(f=WeightedAvg(),
                        result_type=DataTypes.DOUBLE(),
                        accumulator_type=DataTypes.ROW([
                            DataTypes.FIELD("f0", DataTypes.BIGINT()),
                            DataTypes.FIELD("f1", DataTypes.BIGINT())]))
  4. 在下載文件中udx所在的目錄(即\python_demo-master目錄)下執行如下命令打包文件。

    zip -r python_demo.zip udx

    \python_demo-master\目錄下會出現python_demo.zip的ZIP包,即代表完成了UDAF開發工作。

UDAF注冊

UDAF注冊過程,請參見管理自定義函數(UDF)

UDAF使用

在完成注冊UDAF后,您就可以使用UDAF,詳細的操作步驟如下。

  1. Flink SQL作業開發。詳情請參見SQL作業開發。

    獲取ASI_UDAF_Source表中a字段以b字段為權重的值,代碼示例如下。

    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a   BIGINT,
      b   BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      avg_value  DOUBLE
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDAF_Sink
    SELECT weighted_avg(a, b)
    FROM ASI_UDAF_Source;
  2. 運維中心 > 作業運維頁面,單擊目標作業名稱操作列的啟動

    啟動成功后,ASI_UDAF_Sink表每行會被插入ASI_UDAF_Source表中以b字段為權重的a字段當前數據和歷史數據的均值。