日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Java

更新時間: 2024-11-27 17:20:58

實時計算Flink版支持在Flink SQL作業中使用Java自定義函數,本文介紹Flink Java自定義函數的分類、參數傳遞及調用注意事項。

注意事項

  • 為了避免JAR包依賴沖突,在開發自定義函數時您需要注意以下幾點:

    • SQL開發頁面選擇的Flink版本,請和Pom依賴中的Flink版本保持一致。

    • Flink相關依賴,scope請使用provided,即在依賴中添加<scope>provided</scope>。

    • 其他第三方依賴請采用Shade方式打包,Shade打包詳情請參見Apache Maven Shade Plugin

    Flink依賴沖突問題,詳情請參見如何解決Flink依賴沖突問題?

  • 為避免UDF在SQL作業文本里被頻繁調用導致超時的情況,推薦您將UDF的JAR包作為依賴文件上傳,并且通過CRETATE TEMPORARY FUNCTION語法在作業中聲明函數,例如CREATE TEMPORARY FUNCTION 'GetJson' AS 'com.soybean.function.udf.MyGetJsonObjectFunction';

自定義函數分類

Flink支持以下3類自定義函數。

分類

描述

UDSF(User Defined Scalar Function)

用戶自定義標量值函數,將0個、1個或多個標量值映射到一個新的標量值。其輸入與輸出是一對一的關系,即讀入一行數據,寫出一條輸出值。詳情請參見自定義標量函數(UDSF)。

UDAF(User Defined Aggregation Function)

自定義聚合函數,將多條記錄聚合成1條記錄。其輸入與輸出是多對一的關系,即將多條輸入記錄聚合成一條輸出值。詳情請參見自定義聚合函數(UDAF)。

UDTF(User Defined Table-valued Function)

自定義表值函數,將0個、1個或多個標量值作為輸入參數(可以是變長參數)。與自定義的標量函數類似,但與標量函數不同。表值函數可以返回任意數量的行作為輸出,而不僅是1個值。返回的行可以由1個或多個列組成。調用一次函數輸出多行或多列數據。詳情請參見自定義表值函數(UDTF)

自定義函數注冊

自定義函數參數傳遞

您可以在Flink開發控制臺配置自定義函數中的參數并在UDF代碼中使用。這樣,后續可以直接在控制臺上修改參數值,實現快速修改UDF參數值的目的。

自定義函數中提供了可選的open(FunctionContext context)方法,FunctionContext具備參數傳遞功能,自定義配置項通過此對象來傳遞。具體步驟如下:

  1. 在Flink開發控制臺運維中心 > 作業運維頁面部署詳情頁簽運行參數配置其他配置中,添加pipeline.global-job-parameters配置項,代碼示例如下。

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'

    FunctionContext#getJobParameter只能獲取pipeline.global-job-parameters這一配置項的值。因此需要將UDF用到的所有配置項全部寫入到pipeline.global-job-parameters中。pipeline.global-job-parameters配置項填寫的具體操作步驟如下。

    步驟

    動作

    具體操作

    示例

    步驟1

    定義key-value。

    將key和value之間通過冒號(:)分隔,并將每一對key-value用單引號(')包圍起來。

    說明
    • 如果key或value中含有半角冒號(:),則需要用雙引號(")將key或value包圍起來。

    • 如果key或value中含有半角冒號(:)和雙引號("),則需要通過連寫兩個雙引號("")進行轉義。

    • key = k1,value = {hi,hello},則定義為'k1:{hi,hello}'。

    • key = k2,value = str:ing,str:ing,則定義為'k2:"str:ing,str:ing"'

    • key = k3,value = str"ing,str:ing,則定義為'k3:"str""ing,str:ing"'

    步驟2

    按照YAML文件的格式,形成最終的pipeline.global-job-parameters。

    將不同的key-value放在不同的行里,并將所有key-value用逗號(,)連接。

    說明
    • YAML文件的多行字符串以豎線(| )開始。

    • YAML文件的多行字符串,每一行需要有相同的縮進。

    pipeline.global-job-parameters: | 
      'k1:{hi,hello}',
      'k2:"str:ing,str:ing"',
      'k3:"str""ing,str:ing"'
  2. 在自定義函數代碼中,通過FunctionContext#getJobParameter獲取map的各項內容。

    代碼示例如下。

    context.getJobParameter("k1", null); // 獲得字符串 {hi,hello}。
    context.getJobParameter("k2", null); // 獲得字符串 str:ing,str:ing。
    context.getJobParameter("k3", null); // 獲得字符串 str"ing,str:ing。
    context.getJobParameter("pipeline.global-job-parameters", null); // null,只能獲得pipeline.global-job-parameters里定義的內容,而不能獲得任意的作業配置項。

命名參數

說明

僅實時計算引擎VVR 8.0.7及以上版本支持使用命名參數來實現自定義函數。

在SQL中調用函數時必須按順序指定所有參數字段。當參數較多時,容易出現傳參個數、順序錯誤,而且不能省略非必填參數。通過使用命名參數,可以按需指定所需的參數,減少出錯概率,使用起來也更加方便。我們通過一個自定義標量函數(ScalarFunction)的例子來介紹下命名參數的使用。

// 實現一個自定義標量函數,后兩個入參為可選參數(isOptional = true)
public class MyFuncWithNamedArgs extends ScalarFunction  {
	private static final long serialVersionUID = 1L;

	public String eval(@ArgumentHint(name = "f1", isOptional = false, type = @DataTypeHint("STRING")) String f1,
			@ArgumentHint(name = "f2", isOptional = true, type = @DataTypeHint("INT")) Integer i2,
			@ArgumentHint(name = "f3", isOptional = true, type = @DataTypeHint("LONG")) Long l3) {

		if (i2 != null) {
			return "i2#" + i2;
		}
		if (l3 != null) {
			return "l3#" + l3;
		}
		return "default#" + f1;
	}
}

在SQL中使用該自定義函數時,您可以只指定第一個必選參數,或選擇性指定可選參數,代碼示例如下。

CREATE TEMPORARY FUNCTION MyNamedUdf AS 'com.aliyun.example.MyFuncWithNamedArgs';

CREATE temporary TABLE s1 (
    a INT,
    b BIGINT,
    c VARCHAR,
    d VARCHAR,
    PRIMARY KEY(a) NOT ENFORCED
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='1'
);

CREATE temporary TABLE sink (
    a INT,
    b VARCHAR,
    c VARCHAR,
    d VARCHAR
) WITH (
    'connector' = 'print'
);

INSERT INTO sink
SELECT a,
    -- 僅指定第一個必選參數
    MyNamedUdf(f1 => c) arg1_res,
    -- 指定第一個必選參數及第二個可選參數
    MyNamedUdf(f1 => c, f2 => a) arg2_res,
    -- 指定第一個必選參數及第三個可選參數
    MyNamedUdf(f1 => c, f3 => d) arg3_res
FROM s1;

相關文檔

上一篇: 自定義函數 下一篇: 自定義標量函數(UDSF)
阿里云首頁 實時計算 Flink版 相關技術圈