日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

自定義表值函數(UDTF)

本文為您介紹如何為Flink自定義表值函數(UDTF)開發、注冊和使用流程。

定義

自定義表值函數(UDTF),自定義表值函數,將0個、1個或多個標量值作為輸入參數(可以是變長參數)。與自定義的標量函數類似,但與標量函數不同。表值函數可以返回任意數量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。調用一次函數輸出多行或多列數據。詳情參見User-defined Functions。

UDTF開發

說明

Flink為您提供了UDF示例,便于您快速開發業務。Flink UDF示例中包含UDSF、UDAF和UDTF的實現,示例中已為您配置對應版本的開發環境,您無需進行環境搭建。

  1. 下載并解壓ASI_UDX_Demo示例到本地。

    說明

    ASI_UDX_Demo屬于第三方搭建的網站,訪問時可能會存在無法打開或訪問延遲的問題。

    解壓完成后,會生成ASI_UDX-main文件夾。其中:

    • pom.xml:項目級別的配置文件,主要描述了項目的Maven坐標、依賴關系,開發者需要遵循的規則、缺陷管理系統,組織和Licenses,以及其他所有的項目相關因素。

    • \ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java:自定義表值函數(UDTF)示例的Java代碼。

  2. 在IntelliJ IDEA中,單擊file > open,打開剛才解壓縮完成的ASI_UDX-main。

  3. 雙擊打開\ASI_UDX-main\src\main\java\ASI_UDTF后,根據您的業務,修改ASI_UDTF.java文件內容。

    該示例中,ASI_UDTF.java已配置了將一行字符串按照豎線(|)分割成多列字符串的代碼。

    package ASI_UDTF;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.table.functions.TableFunction;
    
    public class ASI_UDTF extends TableFunction<Tuple2<String,String>> {
        public void eval(String str){
            String[] split = str.split("\\|");
            String name = split[0];
            String place = split[1];
            Tuple2<String,String> tuple2 = Tuple2.of(name,place);
            collect(tuple2);
        }
    }

    TableFunction支持的數據類型及類型推導機制請參見Flink支持的數據類型類型推導。

    說明

    以上兩個文檔鏈接為Flink 1.15版本對應的文檔,不同Flink大版本中TableFunction支持的數據類型及推導機制可能會存在差異,請您通過VVR和Flink版本的映射關系去參考對應版本的Flink文檔。Flink版本查看方法請參見控制臺操作

    常用的復合類型Tuple和Row示例如下:

    • Tuple類型

      TableFunction<Tuple2<String,Integer>
    • Row類型

      @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
      public static class SplitFunction extends TableFunction<Row> {
      
        public void eval(String str) {
          for (String s : str.split(" ")) {
            // use collect(...) to emit a row
            collect(Row.of(s, s.length()));
          }
        }
      }
  4. 雙擊打開\ASI_UDX-main\后,配置pom.xml

    該示例中,pom.xml文件已配置了Flink 1.11版依賴的主要JAR包信息。如果您的業務:

    • 不依賴其他JAR包:不用配置pom.xml文件,繼續下一步。

    • 依賴其他JAR包:在pom.xml文件中添加您所需依賴的JAR包信息。

    Flink 1.11版依賴的主要JAR包如下。

    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>1.11.0</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table</artifactId>
                <version>1.11.0</version>
                <type>pom</type>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>1.11.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>1.11.0</version>
            </dependency>
        </dependencies>
  5. 在下載文件中pom.xml所在的目錄執行如下命令打包文件。

    mvn package -Dcheckstyle.skip

    \ASI_UDX-main\target\目錄下會出現ASI_UDX-1.0-SNAPSHOT.jar的JAR包,即代表完成了UDTF開發工作。

UDTF注冊

UDTF注冊過程,請參見管理自定義函數(UDF)。

UDTF使用

在注冊UDTF完成后,您就可以使用UDTF,詳細的操作步驟如下。

  1. Flink SQL作業開發。詳情請參見SQL作業開發。

    ASI_UDTF_Source表中message字段每行字符串按照豎線(|)分割成多列,代碼示例如下。

    CREATE TEMPORARY TABLE ASI_UDTF_Source (
      `message`  VARCHAR
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDTF_Sink (
      name  VARCHAR,
      place  VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDTF_Sink
    SELECT name,place
    FROM ASI_UDTF_Source,lateral table(ASI_UDTF(`message`)) as T(name,place);
  2. 運維中心 > 作業運維頁面,單擊目標作業名稱操作列的啟動。

    啟動成功后,ASI_UDTF_Sink表會被插入ASI_UDTF_Source表中message字段按照豎線(|)分割成多列的字符。