本文為您介紹如何為Flink自定義表值函數(UDTF)開發、注冊和使用流程。
定義
自定義表值函數(UDTF),自定義表值函數,將0個、1個或多個標量值作為輸入參數(可以是變長參數)。與自定義的標量函數類似,但與標量函數不同。表值函數可以返回任意數量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。調用一次函數輸出多行或多列數據。詳情參見User-defined Functions。
UDTF開發
Flink為您提供了UDF示例,便于您快速開發業務。Flink UDF示例中包含UDSF、UDAF和UDTF的實現,示例中已為您配置對應版本的開發環境,您無需進行環境搭建。
下載并解壓ASI_UDX_Demo示例到本地。
說明ASI_UDX_Demo屬于第三方搭建的網站,訪問時可能會存在無法打開或訪問延遲的問題。
解壓完成后,會生成ASI_UDX-main文件夾。其中:
pom.xml:項目級別的配置文件,主要描述了項目的Maven坐標、依賴關系,開發者需要遵循的規則、缺陷管理系統,組織和Licenses,以及其他所有的項目相關因素。
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java:自定義表值函數(UDTF)示例的Java代碼。
在IntelliJ IDEA中,單擊ASI_UDX-main。
,打開剛才解壓縮完成的雙擊打開\ASI_UDX-main\src\main\java\ASI_UDTF后,根據您的業務,修改ASI_UDTF.java文件內容。
該示例中,ASI_UDTF.java已配置了將一行字符串按照豎線(|)分割成多列字符串的代碼。
package ASI_UDTF; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.functions.TableFunction; public class ASI_UDTF extends TableFunction<Tuple2<String,String>> { public void eval(String str){ String[] split = str.split("\\|"); String name = split[0]; String place = split[1]; Tuple2<String,String> tuple2 = Tuple2.of(name,place); collect(tuple2); } }
TableFunction支持的數據類型及類型推導機制請參見Flink支持的數據類型和類型推導。
說明以上兩個文檔鏈接為Flink 1.15版本對應的文檔,不同Flink大版本中TableFunction支持的數據類型及推導機制可能會存在差異,請您通過VVR和Flink版本的映射關系去參考對應版本的Flink文檔。Flink版本查看方法請參見控制臺操作。
常用的復合類型Tuple和Row示例如下:
Tuple類型
TableFunction<Tuple2<String,Integer>
Row類型
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>")) public static class SplitFunction extends TableFunction<Row> { public void eval(String str) { for (String s : str.split(" ")) { // use collect(...) to emit a row collect(Row.of(s, s.length())); } } }
雙擊打開\ASI_UDX-main\后,配置pom.xml。
該示例中,pom.xml文件已配置了Flink 1.11版依賴的主要JAR包信息。如果您的業務:
不依賴其他JAR包:不用配置pom.xml文件,繼續下一步。
依賴其他JAR包:在pom.xml文件中添加您所需依賴的JAR包信息。
Flink 1.11版依賴的主要JAR包如下。
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.11.0</version> <type>pom</type> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.0</version> </dependency> </dependencies>
在下載文件中pom.xml所在的目錄執行如下命令打包文件。
mvn package -Dcheckstyle.skip
\ASI_UDX-main\target\目錄下會出現ASI_UDX-1.0-SNAPSHOT.jar的JAR包,即代表完成了UDTF開發工作。
UDTF注冊
UDTF注冊過程,請參見管理自定義函數(UDF)。
UDTF使用
在注冊UDTF完成后,您就可以使用UDTF,詳細的操作步驟如下。
Flink SQL作業開發。詳情請參見SQL作業開發。
ASI_UDTF_Source表中message字段每行字符串按照豎線(|)分割成多列,代碼示例如下。
CREATE TEMPORARY TABLE ASI_UDTF_Source ( `message` VARCHAR ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE ASI_UDTF_Sink ( name VARCHAR, place VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDTF_Sink SELECT name,place FROM ASI_UDTF_Source,lateral table(ASI_UDTF(`message`)) as T(name,place);
在
頁面,單擊目標作業名稱操作列的啟動。啟動成功后,ASI_UDTF_Sink表會被插入ASI_UDTF_Source表中message字段按照豎線(|)分割成多列的字符。