自定義標(biāo)量函數(shù)(UDSF)
本文為您介紹如何開發(fā)、注冊(cè)和使用Flink自定義標(biāo)量函數(shù)(UDSF)。
定義
自定義標(biāo)量函數(shù)(UDSF)將0個(gè)、1個(gè)或多個(gè)標(biāo)量值映射到一個(gè)新的標(biāo)量值。輸入與輸出是一對(duì)一的關(guān)系,即讀入一行數(shù)據(jù),寫出一條輸出值。詳情參見User-defined Functions。
UDSF開發(fā)
Flink為您提供了UDF示例,便于您快速開發(fā)業(yè)務(wù)。Flink UDF示例中包含UDSF、UDAF和UDTF的實(shí)現(xiàn),示例中已為您配置對(duì)應(yīng)版本的開發(fā)環(huán)境,您無(wú)需進(jìn)行環(huán)境搭建。
下載并解壓ASI_UDX_Demo示例到本地。
說(shuō)明ASI_UDX_Demo屬于第三方搭建的網(wǎng)站,訪問(wèn)時(shí)可能會(huì)存在無(wú)法打開或訪問(wèn)延遲的問(wèn)題。
解壓完成后,會(huì)生成ASI_UDX-main文件夾。其中:
pom.xml:項(xiàng)目級(jí)別的配置文件,主要描述了項(xiàng)目的Maven坐標(biāo)、依賴關(guān)系,開發(fā)者需要遵循的規(guī)則、缺陷管理系統(tǒng),組織和Licenses,以及其他所有的項(xiàng)目相關(guān)因素。
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDF.java:自定義標(biāo)量函數(shù)(UDSF)示例的Java代碼。
在IntelliJ IDEA中,單擊ASI_UDX-main。
,打開剛才解壓縮完成的雙擊打開\ASI_UDX-main\src\main\java\ASI_UDF后,根據(jù)您的業(yè)務(wù),配置ASI_UDF.java。
該示例中,ASI_UDF.java已配置了獲取每條數(shù)據(jù)中從begin~end位的字符的代碼。
package ASI_UDF; import org.apache.flink.table.functions.ScalarFunction; public class ASI_UDF extends ScalarFunction { public String eval(String s, Integer begin, Integer end) { return s.substring(begin, end); } }
雙擊打開\ASI_UDX-main\后,配置pom.xml。
該示例中,pom.xml文件已配置了Flink 1.11版依賴的主要JAR包信息。如果您的業(yè)務(wù):
不依賴其他JAR包:不用配置pom.xml文件,繼續(xù)下一步。
依賴其他JAR包:在pom.xml文件中添加您所需依賴的JAR包信息。
Flink 1.11版依賴的主要JAR包如下。
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.11.0</version> <type>pom</type> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.0</version> </dependency> </dependencies>
在下載文件中pom.xml所在的目錄執(zhí)行如下命令打包文件。
mvn package -Dcheckstyle.skip
\ASI_UDX-main\target\目錄下會(huì)出現(xiàn)ASI_UDX-1.0-SNAPSHOT.jar的JAR包,即代表完成了UDSF開發(fā)工作。
UDSF注冊(cè)
UDSF注冊(cè)過(guò)程,請(qǐng)參見管理自定義函數(shù)(UDF)。
UDSF使用
在注冊(cè)UDSF完成后,您就可以使用UDSF,詳細(xì)的操作步驟如下。
Flink SQL作業(yè)開發(fā)。詳情請(qǐng)參見SQL作業(yè)開發(fā)。
獲取ASI_UDSF_Source表中a字段中每行字符串中第2~4位的字符,代碼示例如下。
CREATE TEMPORARY TABLE ASI_UDSF_Source ( a VARCHAR, b INT, c INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDSF_Sink ( a VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDSF_Sink SELECT ASI_UDSF(a,2,4) FROM ASI_UDSF_Source;
在
頁(yè)面,單擊目標(biāo)作業(yè)名稱操作列的啟動(dòng)。啟動(dòng)成功后,ASI_UDSF_Sink表每行會(huì)被插入ASI_UDSF_Source表中a字段每行字符串的第2~4位字符。