日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

數(shù)據(jù)攝入開發(fā)參考

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業(yè)務(wù)造成影響,請務(wù)必仔細(xì)閱讀。

本文為您介紹數(shù)據(jù)攝入有關(guān)的source、sink、transform、route和pipeline模塊的開發(fā)參考。

支持的連接器

連接器

支持類型

Source

Sink

消息隊列Kafka

說明

僅實時計算引擎VVR 8.0.10及以上版本支持。

實時數(shù)倉Hologres

×

MySQL

說明

支持連接RDS MySQL版、PolarDB MySQL版及自建MySQL。

×

Upsert Kafka

×

Print

×

StarRocks

×

流式數(shù)據(jù)湖倉Paimon

×

source模塊

source模塊定義數(shù)據(jù)攝入的數(shù)據(jù)源端,目前支持的系統(tǒng)包括消息隊列KafkaMySQL

語法結(jié)構(gòu)

source:
  type: mysql
  name: mysql source
  xxx: ...

具體配置請查看對應(yīng)連接器的數(shù)據(jù)攝入部分。

sink模塊

sink模塊定義數(shù)據(jù)攝入的目標(biāo)端,目前支持的系統(tǒng)包括消息隊列KafkaUpsert Kafka實時數(shù)倉Hologres流式數(shù)據(jù)湖倉PaimonStarRocksPrint

語法結(jié)構(gòu)

sink:
  type: hologres
  name: hologres sink
  xxx: ...

具體配置請查看對應(yīng)連接器的數(shù)據(jù)攝入部分。

transform模塊

您可以在YAML作業(yè)的transform語句塊中填寫若干規(guī)則信息,從而實現(xiàn)源表中數(shù)據(jù)的投影、計算和過濾等功能。

語法結(jié)構(gòu)

transform:
  - source-table: db.tbl1
    projection: ...
    filter: ...
  - source-table: db.tbl2
    projection: ...
    filter: ...

配置項

參數(shù)

含義

是否必填

備注

source-table

指定生效上游表。

支持使用正則表達(dá)式。

projection

指定用于保留部分上游列的投影規(guī)則。

使用的句法與SQL SELECT語句類似。

不填則不追加或刪除任何列。

說明

VVR 8.0.9版本,如需將上游表結(jié)構(gòu)變更同步到下游,則仍需手動定義 projection: \*規(guī)則。詳情請參見注意事項

filter

行過濾規(guī)則。

使用的句法與SQL WHERE語句類似。

不填則不過濾任何行。

primary-keys

設(shè)定transform后Schema的主鍵列表。

不填則保留原Schema的主鍵定義。主鍵列表使用英文逗號(,)分隔。

partition-keys

設(shè)定transform的分區(qū)鍵列表。

不填則保留原Schema的分區(qū)鍵定義,分區(qū)鍵列表使用英文逗號(,)分隔。

table-options

需要傳遞給Sink的額外配置信息。

Options選項,例如Paimon Sink的分桶數(shù)、注釋等信息。

不同配置項通過,分割,配置項的鍵與值通過=分割。

配置示例:

key1=value1,key2=value2

description

該transform塊的描述信息。

無。

計算列

您可以在projection規(guī)則中使用<Expression> AS <ColName>句法來定義計算列,表達(dá)式將對上游的每條數(shù)據(jù)分別求值后填入相應(yīng)列。

警告

計算列的表達(dá)式不可以引用其他計算列的值,即使被引用的列出現(xiàn)在該計算列之前。例如a, b AS c, c AS d不是合法的projection表達(dá)式。

例如,在接收到來自上游db.tbl表的[+I, id = 1]數(shù)據(jù)記錄時,將其轉(zhuǎn)化為[+I, id = 1, inc_id = 2]數(shù)據(jù)行并發(fā)送給下游。

transform:
  - source-table: db.tbl
    projection: id, id + 1 AS inc_id

通配符

如果您希望將源表中的所有列以及后續(xù)追加的新列按原樣發(fā)送給下游,則可以在projection規(guī)則中使用星號(*)通配符。

說明

如果一個projection規(guī)則中沒有使用通配符(*),則其產(chǎn)生的Schema就是固定的,并且始終與projection規(guī)則中寫出的版本保持一致。

例如,*, 'extras' AS extras表示會在上游Schema的列尾追加額外的列,并持續(xù)將上游的表結(jié)構(gòu)變更發(fā)送給下游。

transform:
  - source-table: db.tbl
    projection: \*, 'extras' AS extras

元數(shù)據(jù)列

在編寫projection規(guī)則時,可以將以下預(yù)先定義的元數(shù)據(jù)列作為普通數(shù)據(jù)列使用:

重要

請勿定義與元數(shù)據(jù)列同名的普通數(shù)據(jù)列。

元數(shù)據(jù)列名稱

數(shù)據(jù)類型

說明

__namespace_name__

String

這條數(shù)據(jù)變更記錄對應(yīng)源表的Namespace名稱。

__schema_name__

String

這條數(shù)據(jù)變更記錄對應(yīng)源表的Schema名稱。

__table_name__

String

這條數(shù)據(jù)變更記錄對應(yīng)源表的Table名稱。

__data_event_type__

String

這條數(shù)據(jù)變更記錄對應(yīng)的操作類型(+I-U+U-D)。

重要

由于CDC Event總是將一次更新對應(yīng)的Update Before和Update After打包為一條事件,因此__data_event_type__的內(nèi)容在同一條Update事件里分別為-U+U。請勿將其作為主鍵使用。

例如,將上游表的全限定名稱寫入計算列中,并發(fā)送給下游。

transform:
  - source-table: \.*.\.*
    projection: \*, __namespace_name__ || __schema_name__ || __table_name__ AS identifier

各個數(shù)據(jù)庫連接器對Namespace、Schema和Table名稱的映射關(guān)系如下表所示。

數(shù)據(jù)庫類型

Namespace名稱

Schema名稱

Table名稱

JDBC

Catalog

Schema

Table

Debezium

Catalog

Schema

Table

MySQL

Database

-

Table

Postgres

Database

Schema

Table

Oracle

-

Schema

Table

Microsoft SQL Server

Database

Schema

Table

StarRocks

Database

-

Table

Doris

Database

-

Table

注意事項

  • 修改transform模塊的語句后,不能從已有的狀態(tài)恢復(fù),需要進(jìn)行無狀態(tài)啟動。

  • 通常情況下,projection和filter語句無需使用引號包裹。

    transform:
      - projection: a, b, c
        # 等價于
      - projection: "a, b, c"

    然而,如果Projection表達(dá)式的第一個字符為*'等特殊字符,則整行表達(dá)式可能無法被作為合法的YAML字符串字面量解析。此時需要手動使用引號包裹整個表達(dá)式,或是使用\轉(zhuǎn)義:

    transform:
      - projection: *, 42      # 不是合法的YAML
      - projection: '*, 42'    # OK
      - projection: \*, 42     # OK  
  • 使用VVR 8.0.9版本編寫Transform規(guī)則時,不提供Projection表達(dá)式無法將表結(jié)構(gòu)變更同步到下游。可以使用以下方法編寫等價的規(guī)則:

    transform:
      - source-table: db.\.*
        projection: \*        # 將上游全部列及后續(xù)結(jié)構(gòu)變更同步到下游

route模塊

您可以在YAML作業(yè)的route模塊中定義包含若干條route規(guī)則的語句塊,描述上游表到下游表的復(fù)雜拓?fù)浣Y(jié)構(gòu)。

語法結(jié)構(gòu)

route:
  - source-table: db.tbl1
    sink-table: sinkdb.tbl1
  - source-table: db.tbl2
    sink-table: sinkdb.tbl2

配置項

參數(shù)

含義

是否必填

備注

source-table

指定生效上游表。

支持使用正則表達(dá)式。

sink-table

指定數(shù)據(jù)路由的目標(biāo)位置。

無。

replace-symbol

在使用模式匹配功能時,用于指代上游表名的字符串。

例如,當(dāng)replace-symbol設(shè)置為<>時,可以將sink-table配置為sinkdb.<>。這樣,來自上游的表table1會被寫入到sinkdb.table1表中。

description

該route塊的描述信息。

無。

使用方法

一對一路由

將上游表mydb.web_order中的數(shù)據(jù)路由到下游表mydb.ods_web_order

route:
  - source-table: mydb.web_order
    sink-table: mydb.ods_web_order
    description: sync table to one destination table with given prefix ods_

合并分庫分表

將上游mydb數(shù)據(jù)庫中的所有表合并到下游mydb.merged表中。

route:
  - source-table: mydb.\.*
    sink-table: mydb.merged
    description: sync sharding tables to one destination table

多路由規(guī)則

可以在一個route塊中使用YAML列表符號(-)定義多條規(guī)則,它們會同時生效。

route:
  - source-table: mydb.orders
    sink-table: ods_db.ods_orders
    description: sync orders table to orders
  - source-table: mydb.shipments
    sink-table: ods_db.ods_shipments
    description: sync shipments table to ods_shipments
  - source-table: mydb.products
    sink-table: ods_db.ods_products
    description: sync products table to ods_products

模式匹配

source_db數(shù)據(jù)庫中的全部表一一對應(yīng)地同步到sink_db中,并保持表名不變。

route:
  - source-table: source_db.\.*
    sink-table: sink_db.<>
    replace-symbol: <>
    description: route all tables in source_db to sink_db

使用replace-symbol參數(shù)定義的<>特殊字符串會被表名替代,從而實現(xiàn)源表到匯表的一一對應(yīng)。

數(shù)據(jù)分發(fā)

將同一張表的數(shù)據(jù)分發(fā)給多個下游表,只需定義多條路由規(guī)則即可。例如將mydb.orders的數(shù)據(jù)會被同時分發(fā)到sink_dbbackup_sink_db兩個數(shù)據(jù)庫中。

route:
  - source-table: mydb.orders
    sink-table: sink_db.orders
  - source-table: mydb.orders
    sink-table: backup_sink_db.orders

注意事項

修改route模塊的語句后,不能從已有的狀態(tài)恢復(fù),需要進(jìn)行無狀態(tài)啟動。

pipeline模塊

您可以在pipeline模塊配置數(shù)據(jù)攝入YAML作業(yè)的整體配置。

語法結(jié)構(gòu)

pipeline:
  name: CDC YAML job
  schema.change.behavior: LENIENT

配置項

參數(shù)

說明

是否必填

數(shù)據(jù)類型

默認(rèn)值

備注

name

數(shù)據(jù)攝入YAML名稱。

STRING

Flink CDC Pipeline Job

無。

schema.change.behavior

Schema變更行為配置。

STRING

LENIENT

可配置的值如下,詳見Schema變更行為配置

  • LENIENT(默認(rèn)值)

  • EXCEPTION

  • EVOLVE

  • TRY_EVOLVE

  • IGNORE

Schema變更行為配置

數(shù)據(jù)攝入YAML作業(yè)支持將數(shù)據(jù)源的Schema變更同步到下游目標(biāo)端,例如創(chuàng)建表、添加列、重命名列、更改列類型、刪除列和刪除表等。下游目標(biāo)端可能不支持全部的Schema變更,您可以通過schema.change.behavior配置來修改Schema變更發(fā)生時目標(biāo)端的處理方式。

Schema變更模式

模式

說明

LENIENT(默認(rèn))

數(shù)據(jù)攝入YAML作業(yè)會對Schema變更進(jìn)行轉(zhuǎn)換成目標(biāo)端可處理的變更并發(fā)送,遵循以下規(guī)則:

  • 不發(fā)送Drop table和Truncate table變更。

  • 列重命名時,改為發(fā)送更改列類型和新增列兩個事件。原有的列不刪除,更改列類型為nullable,同時新增一個列名為新名稱,數(shù)據(jù)類型改為nullable的列。

  • 刪除列時,改為發(fā)送更改列類型事件,將對應(yīng)字段類型變?yōu)閚ullable。

  • 新增列時仍發(fā)送新增列事件,但字段類型會變?yōu)閚ullable。

EXCEPTION

不允許任何Schema變更行為。

當(dāng)目標(biāo)端不支持處理Schema變更時,可以使用此模式。收到Schema變更事件時,數(shù)據(jù)攝入YAML作業(yè)會拋出異常。

EVOLVE

數(shù)據(jù)攝入YAML作業(yè)會將所有Schema更改應(yīng)用于目標(biāo)端。

如果Schema變更在目標(biāo)端應(yīng)用失敗,數(shù)據(jù)攝入YAML作業(yè)會拋出異常并觸發(fā)故障重啟。

TRY_EVOLVE

數(shù)據(jù)攝入YAML作業(yè)會嘗試將Schema變更應(yīng)用到目標(biāo)端,如果目標(biāo)端不支持處理發(fā)送的Schema變更,數(shù)據(jù)攝入YAML作業(yè)不會失敗重啟,嘗試通過轉(zhuǎn)換后續(xù)數(shù)據(jù)方式進(jìn)行處理。

警告

TRY_EVOLVE模式下,如果發(fā)生Schema變更應(yīng)用失敗,可能導(dǎo)致上游后續(xù)到來的數(shù)據(jù)出現(xiàn)部分列丟失、被截斷等情況。

IGNORE

所有Schema變更都不會應(yīng)用于目標(biāo)端。

當(dāng)您的目標(biāo)端尚未準(zhǔn)備好進(jìn)行任何Schema變更,想要繼續(xù)從未更改的列中接收數(shù)據(jù)時,可以使用此模式。

控制目標(biāo)端接收的Schema變更

在某些場景下,不需要所有Schema變更同步到目標(biāo)端。例如,允許新增列但禁止刪除列來避免刪除已有的數(shù)據(jù)。

您可以通過在sink模塊中設(shè)置include.schema.changesexclude.schema.changes選項來控制。

參數(shù)

說明

是否必填

數(shù)據(jù)類型

默認(rèn)值

備注

include.schema.changes

支持應(yīng)用的Schema變更。

List<String>

默認(rèn)支持所有變更。

exclude.schema.changes

不支持應(yīng)用的Schema變更。

List<String>

優(yōu)先級高于include.schema.changes

以下是可配置架構(gòu)變更事件類型的完整列表:

事件類型

說明

add.column

新增列。

alter.column.type

變更列類型。

create.table

創(chuàng)建表。

drop.column

刪除列。

drop.table

刪除表。

rename.column

修改列名。

truncate.table

清空數(shù)據(jù)。

說明

Schema變更支持部分匹配。例如,傳入drop相當(dāng)于同時傳入drop.columndrop.table

代碼示例

  • 示例1:Schema變更行為配置為EVOLVE

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  
pipeline:
  name: mysql to print job
  schema.change.pipeline: EVOLVE
  • 示例2:支持創(chuàng)建表和列相關(guān)事件,不支持刪除列

source:
  type: mysql
  name: MySQL Source
  hostname: ${mysql.hostname}
  port: ${mysql.port}
  username: ${mysql.username}
  password: ${mysql.password}
  tables: ${mysql.source.table}
  server-id: 7601-7604

sink:
  type: values
  name: Values Sink
  print.enabled: true
  sink.print.logger: true
  include.schema.changes: [create.table, column] # 匹配了 CreateTable、AddColumn、AlterColumnType、RenameColumn、和 DropColumn 事件
  exclude.schema.changes: [drop.column] # 排除了 DropColumn 事件
  
pipeline:
  name: mysql to print job
  schema.change.pipeline: EVOLVE

函數(shù)

內(nèi)置函數(shù)

CDC YAML提供了豐富的內(nèi)置函數(shù),可以直接在transform模塊中的projection和filter表達(dá)式中使用。

比較函數(shù)

說明

除非特別說明,否則以下內(nèi)置函數(shù)在輸入?yún)?shù)包含NULL時均返回NULL。

函數(shù)

說明

value1 = value2

如果value1等于value2,則返回TRUE;否則返回FALSE。

value1 <> value2

如果value1不等于value2,則返回TRUE;否則返回FALSE。

value1 > value2

如果value1大于value2,則返回TRUE;否則返回FALSE。

value1 >= value2

如果value1大于或等于value2,則返回TRUE;否則返回FALSE。

value1 < value2

如果value1小于value2,則返回TRUE;否則返回FALSE。

value1 <= value2

如果value1小于或等于value2,則返回TRUE;否則返回FALSE。

value IS NULL

如果value是NULL,則返回TRUE;否則返回FALSE。

value IS NOT NULL

如果value不是NULL,則返回TRUE;否則返回FALSE。

value1 BETWEEN value2 AND value3

如果value1的值介于value2和value3之間,則返回TRUE;否則返回FALSE。

value1 NOT BETWEEN value2 AND value3

如果value1的值并非介于value2和value3之間,則返回TRUE;否則返回FALSE。

string1 LIKE string2

如果string1的值與string2定義的模式匹配,則返回TRUE;否則返回FALSE。

string1 NOT LIKE string2

如果string1的值與string2定義的模式不匹配,則返回TRUE;否則返回FALSE。

value1 IN (value2 [, value3]* )

如果value1的值存在于[value2, value3, ...]列表中,則返回TRUE;否則返回FALSE。

value1 NOT IN (value2 [, value3]* )

如果value1的值不存在于[value2, value3, ...]列表中,則返回TRUE;否則返回FALSE。

邏輯函數(shù)

函數(shù)

說明

boolean1 OR boolean2

如果boolean1和boolean2至少有一個為TRUE,則返回TRUE。

boolean1 AND boolean2

如果boolean1和boolean2均為TRUE,則返回TRUE。

NOT boolean

如果boolean為TRUE,則返回FALSE;如果boolean是FALSE,則返回TRUE。

boolean IS FALSE

如果boolean為TRUE,則返回FALSE;如果boolean是FALSE,則返回TRUE。

boolean IS NOT FALSE

如果boolean為TRUE,則返回TRUE;如果boolean是FALSE,則返回FALSE。

boolean IS TRUE

如果boolean為TRUE,則返回TRUE;如果boolean是FALSE,則返回FALSE。

boolean IS NOT TRUE

如果boolean為TRUE,則返回FALSE;如果boolean是FALSE,則返回TRUE。

算數(shù)函數(shù)

函數(shù)

說明

numeric1 + numeric2

返回numeric1加上numeric2的值。

numeric1 - numeric2

返回numeric1減去numeric2的值。

numeric1 * numeric2

返回numeric1乘以numeric2的值。

numeric1 / numeric2

返回numeric1除以numeric2的值。

numeric1 % numeric2

返回numeric1對numeric2取模的值。

ABS(numeric)

返回numeric的絕對值。

CEIL(numeric)

返回numeric向上取整的值。

FLOOR(numeric)

返回numeric向下取整的值。

ROUND(numeric, int)

返回numeric四舍五入到小數(shù)點后n位的值。

UUID()

生成一個全局唯一ID(UUID)字符串(例如“3d3c68f7-f608-473f-b60c-b0c44ad4cc4e”)。

使用RFC 4122 type 4方法偽隨機(jī)生成。

字符串函數(shù)

函數(shù)

說明

string1 || string2

返回string1和string2拼接而成的字符串。

重要

請勿將其與邏輯或運(yùn)算符混淆。

CHAR_LENGTH(string)

返回string字符串中的字符數(shù)。

UPPER(string)

返回string的大寫形式字符串。

LOWER(string)

返回string的小寫形式字符串。

TRIM(string1)

刪除string兩側(cè)的空白字符。

REGEXP_REPLACE(string1, string2, string3)

將string1中所有滿足string2模式的子串替換為string3。

例如,REGEXP_REPLACE('foobar', 'oo|ar', '__')求值的結(jié)果為 f__b__

SUBSTRING(string FROM integer1 [ FOR integer2 ])

返回string從第integer1到第integer2個字符的子串。

說明

在不提供FOR integer2語句時,默認(rèn)提取到字符串尾部。

CONCAT(string1, string2,…)

返回將string1、string2、…拼接在一起形成的新字符串。

例如,CONCAT('AA', 'BB', 'CC')求值的結(jié)果為AABBCC

時間函數(shù)

函數(shù)

說明

LOCALTIME

返回當(dāng)前時區(qū)下的本地時間,返回類型為TIME(0)

LOCALTIMESTAMP

返回當(dāng)前時區(qū)下的本地時間戳,返回類型為TIMESTAMP(3)

CURRENT_TIME

返回當(dāng)前時區(qū)下的本地時間,與LOCAL_TIME相同。

CURRENT_DATE

返回當(dāng)前時區(qū)下的本地日期。

CURRENT_TIMESTAMP

返回當(dāng)前時區(qū)下的本地時間戳。返回類型為TIMESTAMP_LTZ(3)

NOW()

返回當(dāng)前時區(qū)下的本地時間戳,與CURRENT_TIMESTAMP相同。

DATE_FORMAT(timestamp, string)

將傳入的時間戳按指定的格式化字符串string進(jìn)行格式化。

說明

格式化字符串與Java中的SimpleDateFormat格式兼容。

TIMESTAMP_DIFF(timepointunit, timepoint1, timepoint2)

計算timepoint1和timepoint2之間差距多少timepointunit單位。

timepointunit可被指定為SECOND、MINUTE、HOUR、DAY、MONTH或YEAR。

TO_DATE(string1[, string2])

將傳入的日期字符串string1按string2指定的格式轉(zhuǎn)化為DATE類型。

說明

在不指定格式化字符串string2時,默認(rèn)采用yyyy-MM-dd格式。

TO_TIMESTAMP(string1[, string2])

將傳入的時間戳字符串string1按string2指定的格式轉(zhuǎn)化為不帶時區(qū)信息的TIMESTAMP類型。

說明

在不指定格式化字符串string2時,默認(rèn)采用yyyy-MM-ddHH:mm:ss格式。

說明

在進(jìn)行projection和filter表達(dá)式求值時,可以保證其中每個子表達(dá)式所得到的時間點都一致。例如,NOW() AS t1, NOW() AS t2, NOW() AS t3得到的t1t2t3一定對應(yīng)同一個時間戳,無論其求值時間和順序如何。

條件函數(shù)

函數(shù)

說明

CASE value WHEN value1_1 [, value1_2]* THEN RESULT1 (WHEN value2_1 [, value2_2 ]* THEN result_2)* (ELSE result_z) END

依次檢查value值是否等于WHEN子句給出的值,并返回第一個相等子句的RESULT值。

如果沒有任何子句滿足條件,則返回ELSE子句指定的值。如果沒有指定ELSE子句,則返回NULL。

CASE WHEN condition1 THEN result1 (WHEN condition2 THEN result2)* (ELSE result_z) END

依次檢查value值是否滿足每個WHEN子句給出的條件,并返回第一個滿足條件子句的RESULT值。

如果沒有任何子句滿足條件,則返回ELSE子句指定的值。如果沒有指定ELSE子句,則返回NULL。

COALESCE(value1 [, value2]*)

返回[value1、value2、……]列表中第一個不為NULL的元素。如果列表中所有元素均為NULL,則返回NULL。

IF(condition, true_value, false_value)

如果condition子句對應(yīng)的條件為真,則返回true_value;否則返回false_value。

UDF函數(shù)

CDC YAML也支持使用Java語言編寫自定義UDF函數(shù),并像內(nèi)置函數(shù)一樣調(diào)用。

UDF函數(shù)定義

滿足以下要求的Java類可以作為CDC YAML UDF函數(shù)使用:

  • 實現(xiàn)了org.apache.flink.cdc.common.udf.UserDefinedFunction接口。

  • 擁有一個公共無參構(gòu)造器。

  • 至少含有一個名為eval的公共方法。

UDF函數(shù)類可以通過@Override以下接口來實現(xiàn)更精細(xì)的語義控制:

  • 重寫getReturnType方法來手動指定方法的返回類型。

  • 重寫openclose方法來插入生命周期函數(shù)。

例如,將傳入的整型參數(shù)增加1后返回的UDF函數(shù)定義如下。

public class AddOneFunctionClass implements UserDefinedFunction {
    
    public Object eval(Integer num) {
        return num + 1;
    }
    
    @Override
    public DataType getReturnType() {
        // 由于eval函數(shù)的返回類型不明確,需要
        // 使用getReturnType寫明確指定類型
        return DataTypes.INT();
    }
    
    @Override
    public void open() throws Exception {
        // ...
    }

    @Override
    public void close() throws Exception {
        // ...
    }
}

UDF函數(shù)注冊

通過在CDC YAMLpipeline塊中加入如下所示的定義即可注冊UDF函數(shù):

pipeline:
  user-defined-function:
    - name: inc
      classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
    - name: format
      classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
說明
  • 此處類路徑對應(yīng)的JAR包需要作為外部依賴上傳。

  • UDF函數(shù)名稱可以在此處任意調(diào)整,無需與UDF類名一致。

UDF函數(shù)使用

在完成UDF函數(shù)注冊后,即可在projection和filter語句塊中,像內(nèi)置函數(shù)一樣直接調(diào)用UDF函數(shù)。代碼示例如下。

transform:
  - source-table: db.\.*
    projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
    filter: inc(id) < 100

Flink ScalarFucntion兼容性

繼承自ScalarFunction的Flink UDF函數(shù)也可以直接作為CDC YAML UDF函數(shù)注冊并使用,但存在以下限制:

  • 不支持帶參數(shù)的ScalarFunction

  • Flink風(fēng)格的TypeInformation類型標(biāo)注會被忽略。

  • openclose生命周期鉤子函數(shù)不會被調(diào)用。

相關(guān)文檔

數(shù)據(jù)攝入YAML作業(yè)開發(fā)的操作步驟,詳情請參見數(shù)據(jù)攝入YAML作業(yè)開發(fā)(公測中)