本文為您介紹如何使用Hudi連接器。
背景信息
Apache Hudi是一種開源的數據湖表格式框架。Hudi基于對象存儲或者HDFS組織文件布局,保證ACID,支持行級別的高效更新和刪除,從而降低數據ETL開發門檻。同時該框架還支持自動管理及合并小文件,保持指定的文件大小,從而在處理數據插入和更新時,不會創建過多的小文件,引發查詢端性能降低,避免手動監控和合并小文件的運維負擔。詳情請參見Apache Hudi。
類別 | 詳情 |
支持類型 | 源表和結果表 |
運行模式 | 流模式和批模式 |
數據格式 | 暫不支持 |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | Datastream和SQL |
是否支持更新或刪除結果表數據 | 是 |
特色功能
類別 | 詳情 |
Hudi的核心特性 |
|
Hudi的典型場景 |
|
全托管Hudi優勢 | 相比開源社區Hudi,全托管Flink平臺集成Hudi具有的功能優勢詳情如下所示:
|
使用限制
僅Flink計算引擎vvr-4.0.11-flink-1.13及以上版本支持Hudi Connector。
文件系統僅支持HDFS或阿里云OSS和OSS-HDFS服務。
不支持以Session模式提交作業。
不支持修改字段,如需修改,請在DLF控制臺通過Spark SQL語句進行操作。
語法結構
CREATE TEMPORARY TABLE hudi_tbl (
uuid BIGINT,
data STRING,
ts TIMESTAMP(3),
PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = 'oss://<yourOSSBucket>/<自定義存儲位置>',
...
);
WITH參數
基礎參數
通用
參數
說明
數據類型
是否必填
默認值
備注
connector
表類型。
String
是
無
固定值為hudi。
path
表存儲路徑。
String
是
無
支持阿里云OSS、HDFS和OSS-HDFS和三種路徑。
OSS:路徑格式為
oss://<bucket>/<user-defined-dir>
。HDFS:路徑格式為
hdfs://<user-defined-dir>
。OSS-HDFS:路徑格式為
oss://<bucket>.<oss-hdfs-endpoint>/<user-defined-dir>
。說明僅Flink計算引擎VVR 8.0.3及以上版本支持該參數配置為OSS-HDFS路徑。
其中:
bucket:表示您創建的OSS Bucket名稱。
user-defined-dir:表示數據存放路徑。
oss-hdfs-endpoint:表示OSS-HDFS服務Endpoint。
您可以在OSS實例概覽頁面的訪問端口中查看HDFS的Endpoint信息。
hoodie.datasource.write.recordkey.field
主鍵字段。
String
否
uuid
支持通過PRIMARY KEY語法設置主鍵字段。
支持使用英文逗號(,)分隔多個字段。
precombine.field
版本字段。
String
否
ts
基于此字段的大小來判斷消息是否進行更新。
如果您沒有設置該參數,則系統默認會按照消息在引擎內部處理的先后順序進行更新。
oss.endpoint
阿里云對象存儲服務OSS或者OSS-HDFS的Endpoint。
String
否
無
如果使用OSS或者OSS-HDFS作為存儲,則必需填寫。
使用OSS時,參數取值詳情請參見OSS地域和訪問域名。
使用OSS-HDFS時,您可以在OSS實例概覽頁面的訪問端口中查看HDFS服務的Endpoint信息。
accessKeyId
阿里云賬號的AccessKey ID。
String
否
無
如果使用OSS或者OSS-HDFS作為存儲,則必需填寫。
詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量和密鑰管理。
accessKeySecret
阿里云賬號的AccessKey Secret。
String
否
無
如果使用OSS或者OSS-HDFS作為存儲,則必需填寫。
詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量和密鑰管理。
源表獨有
參數
說明
數據類型
是否必填
默認值
備注
read.streaming.enabled
是否開啟流讀。
boolean
否
false
參數取值如下:
true:開啟流讀。
false:不開啟流讀。
read.start-commit
讀取起始位點。
string
否
不填
參數取值如下:
yyyyMMddHHmmss:從指定時間點開始消費。
earliest:從最早位點開始消費。
不填:從最新時間開始消費。
結果表獨有
參數
說明
數據類型
是否必填
默認值
備注
write.operation
寫入操作模式。
String
否
UPSERT
參數取值如下:
insert模式:數據追加寫。
upsert模式:數據更新。
bulk_insert模式:數據批量追加寫。
hive_sync.enable
是否開啟同步元數據到Hive功能。
boolean
否
false
參數取值如下:
true:開啟同步元數據到Hive功能。
false:關閉同步元數據到Hive功能。
hive_sync.mode
Hive數據同步模式。
String
否
hms
參數取值如下:
hms:元數據同步到Hive Metastore或者DLF時,需要設置為hms。
jdbc:元數據同步到jdbc時,需要設置為jdbc。
hive_sync.db
同步到Hive的數據庫名稱。
String
否
default
無。
hive_sync.table
同步到Hive的表名稱。
String
否
當前table名
hudi同步到Hive的表名不能使用中劃線( -)。
dlf.catalog.region
DLF服務的地域名。
String
否
無
詳情請參見已開通的地域和訪問域名。
說明僅當hive_sync.mode設置為hms時,dlf.catalog.region參數設置才生效。
請和dlf.catalog.endpoint選擇的地域保持一致。
dlf.catalog.endpoint
DLF服務的Endpoint。
String
否
無
詳情請參見已開通的地域和訪問域名。
說明僅當hive_sync.mode設置為hms時,dlf.catalog.endpoint參數設置才生效。
推薦您為dlf.catalog.endpoint參數配置DLF的VPC Endpoint。例如,如果您選擇的地域為cn-hangzhou地域,則dlf.catalog.endpoint參數需要配置為dlf-vpc.cn-hangzhou.aliyuncs.com
如果您需要跨VPC訪問DLF,則請參見如何訪問跨VPC的其他服務?
高階參數
Hudi支持豐富的寫入和讀取場景,不同場景的參數如下表所示。
并發參數
名稱 | 說明 | 默認值 | 備注 |
write.tasks | writer的并發,每個writer順序寫1~N個buckets。 | 4 | 增加寫任務的并發對小文件個數沒影響。 |
write.bucket_assign.tasks | bucket assigner的并發。 | Flink并發度 | 增加寫任務的并發同時增加了寫任務的bucket數,也就是增加了小文件(小bucket)數。 |
write.index_bootstrap.tasks | Index bootstrap算子的并發。 | Flink并發度 |
|
read.tasks | 流和批讀算子的并發。 | 4 | 無。 |
compaction.tasks | online compaction算子的并發。 | 4 | online compaction比較耗費資源,建議走offline compaction。 |
在線壓縮參數
名稱 | 說明 | 默認值 | 備注 |
compaction.schedule.enabled | 是否階段性生成壓縮plan。 | true | 參數取值如下:
說明 建議階段性生成壓縮plan,即使compaction.async.enabled關閉的情況下。 |
compaction.async.enabled | 是否開啟異步壓縮。 | true | 參數取值如下:
說明 通過關閉compaction.async.enabled參數可關閉在線壓縮執行,但是調度compaction.schedule.enabled仍然建議開啟,之后可通過離線異步壓縮,執行階段性生成的壓縮plan。 |
compaction.tasks | 壓縮任務的并發數。 | 4 | 無。 |
compaction.trigger.strategy | 壓縮策略。 | num_commits | 支持以下壓縮策略:
|
compaction.delta_commits | 經過多少個commit觸發壓縮。 | 5 | 無。 |
compaction.delta_seconds | 經過多少秒后觸發壓縮。 | 3600 | 單位為秒。 |
compaction.max_memory | 用于壓縮去重的hashmap的可用內存大小。 | 100 MB | 資源夠用時,建議調整到1 GB。 |
compaction.target_io | 每個壓縮plan的IO上限。 | 500 GB | 無。 |
文件大小
文件參數控制了文件的大小,目前支持的參數詳情如下表所示。
名稱 | 說明 | 默認值 | 備注 |
hoodie.parquet.max.file.size | 最大可寫入的parquet文件大小。 超過可寫入的parquet文件大小時,將寫入到新的文件組。 | 120 * 1024 * 1024 byte (120 MB) | 單位是byte。 |
hoodie.parquet.small.file.limit | 小文件的大小閾值,小于該參數的文件被認為是小文件。 | 104857600 byte(100 MB) |
|
hoodie.copyonwrite.record.size.estimate | 預估的record大小。 | 1024 byte(1 KB) |
|
Hadoop參數
名稱 | 說明 | 默認值 | 備注 |
hadoop.${you option key} | 通過hadoop.前綴指定hadoop配置項。 | 無 | 支持同時指定多個hadoop配置項。 說明 從Hudi 0.12.0開始支持,針對跨集群提交執行的需求,可以通過DDL指定per-job級別的hadoop配置。 |
數據寫入
Hudi支持豐富的寫入方式,包括離線批量寫入、流式寫入等場景。支持豐富的數據類型,包括changelog以及log數據。同時支持不同的索引方案。
離線批量寫入
針對存量數據導入Hudi的需求,如果存量數據來源于其他數據源,可以使用批量導入功能,快速將存量數據導成Hoodie表格式。
名稱
說明
默認值
備注
write.operation
寫操作類型。
upsert
參數取值如下:
upsert:插入更新
insert:插入
bulk_insert:批量寫入
說明bulk_insert導入省去了avro的序列化以及數據的merge過程,沒有去重操作,數據的唯一性需要自己來保證。
bulk_insert需要在Batch Execution Mode下執行,Batch模式默認會按照分區名稱排序輸入消息再寫入Hoodie,避免file handle頻繁切換導致性能下降。
write.tasks
bulk_insert寫任務的并發。
Flink的并發度
bulk_insert寫任務的并發通過參數write.tasks指定,并發的數量會影響到小文件的數量。
理論上,bulk_insert寫任務的并發數就是劃分的bucket數,當每個bucket在寫到文件大小上限(parquet 120 MB)時,會滾動到新句柄,所以最終的寫文件數量大于等于bulk_insert寫任務的并發。
write.bulk_insert.shuffle_input
是否將數據按照partition字段shuffle再通過write task寫入。
true
從Hudi 0.11.0版本開始,開啟該參數將減少小文件的數量,但是可能有數據傾斜風險。
write.bulk_insert.sort_input
是否將數據先按照partition字段排序再寫入。
true
從Hudi 0.11.0版本開始支持,當一個write task寫多個partition,開啟可以減少小文件數量。
write.sort.memory
sort算子的可用managed memory。
128
單位是MB。
Changelog模式
該模式只有MOR表支持,在該模式下Hoodie會保留消息的所有變更(I/-U/U/D),之后再配合Flink引擎的有狀態計算實現全鏈路近實時數倉生產增量計算。Hoodie的MOR表通過行存原生支持保留消息的所有變更(format層面的集成),通過Flink全托管流讀單個MOR表可以消費到所有的變更記錄。
說明非changelog模式,流讀單次的batch數據集會merge中間變更;批讀(快照讀)會合并所有的中間結果,不管中間狀態是否已被寫入,都將被忽略。
名稱
說明
默認值
備注
changelog.enabled
是否消費所有變更。
false
參數取值如下:
true:支持消費所有變更。
false:不消費所有變更,即UPSERT語義,所有的消息僅保證最后一條合并消息,中間的變更可能會被merge掉。
說明開啟changelog.enabled參數后,異步的壓縮任務仍然會將中間變更合并成1條數據,所以如果流讀消費不夠及時,被壓縮后只能讀到最后一條記錄。但是,可以通過調整壓縮的頻率,預留一定的時間buffer給 reader,比如調整compaction.delta_commits:5和compaction.delta_seconds: 3600壓縮參數。
Append模式(從Hudi 0.10.0版本開始支持)
在該模式下:
MOR表會應用小文件策略:會追加寫avro log文件。
COW表沒有小文件策略:每次寫入COW表直接寫新的parquet文件。
Clustering策略
Hudi支持豐富的Clustering策略,從而優化INSERT模式下的小文件問題。
Inline Clustering(只有Copy On Write表支持該模式)
名稱
說明
默認值
備注
write.insert.cluster
是否在寫入時合并小文件。
false
參數取值如下:
true:在寫入時,合并小文件。
false:在寫入時,不合并小文件。
說明COW表默認insert寫不合并小文件,開啟該參數后,每次寫入會優先合并之前的小文件,但不會去重,吞吐會受影響。
Async Clustering(從Huid 0.12.0版本開始支持)
名稱
說明
默認值
備注
clustering.schedule.enabled
是否在寫入時定時調度Clustering plan。
false
開啟后周期性調度clustering plan。
clustering.delta_commits
經過多少個commits生成Clustering plan。
4
clustering.schedule.enabled為true時,生效。
clustering.async.enabled
是否異步執行Clustering plan。
false
開啟后周期性異步執行,合并小文件。
clustering.tasks
Clustering task執行并發。
4
無。
clustering.plan.strategy.target.file.max.bytes
Clustering單文件目標大小。
1024 * 1024 * 1024
單位是byte。
clustering.plan.strategy.small.file.limit
Clustering小文件閾值。
600
小于該大小的文件才會參與clustering。
clustering.plan.strategy.sort.columns
Clustering排序字段。
無
支持指定特殊的排序字段。
Clustering Plan Strategy
名稱
說明
默認值
備注
clustering.plan.partition.filter.mode
Clustering分區過濾模式。
NONE
支持的模式如下:
NONE:不過濾分區,所有分區都用于聚合,即不做限制。
RECENT_DAYS:數據按分區時,合并最近N天的數據。
SELECTED_PARTITIONS:指定固定的分區。
clustering.plan.strategy.daybased.lookback.partitions
采用RECENT_DAYS模式下的目標分區天數。
2
僅當clustering.plan.partition.filter.mode取值為RECENT_DAYS時生效。
clustering.plan.strategy.cluster.begin.partition
指定開始分區,用于過濾分區。
無
僅當clustering.plan.partition.filter.mode取值為SELECTED_PARTITIONS時有效。
clustering.plan.strategy.cluster.end.partition
指定結束分區,用于過濾分區。
無
僅當clustering.plan.partition.filter.mode取值為SELECTED_PARTITIONS時有效。
clustering.plan.strategy.partition.regex.pattern
通過正則表達式指定目標分區。
無
無。
clustering.plan.strategy.partition.selected
指定目標partitions。
無
支持通過英文逗號(,)分割多個partition。
Bucket索引
說明從Hudi 0.11.0版本開始支持以下表格中的參數。
名稱
說明
默認值
備注
index.type
索引類型。
FLINK_STATE
參數取值如下:
FLINK_STATE:使用flink state索引。
BUCKET:使用bucket索引。
當數據量比較大時(表的數據條目超過5 億),flink state的存儲開銷可能成為瓶頸。bucket索引通過固定的hash策略,將相同key的數據分配到同一個fileGroup中,可以避免索引的存儲和查詢開銷。bucket index和flink state索引對比有以下區別:
bucket index沒有flink state的存儲計算開銷,性能較好。
bucket index無法擴buckets,state index則可以依據文件的大小動態增加文件個數。
bucket index不支持跨partition的變更(如果輸入是cdc流則沒有這個限制),state index沒有限制。
hoodie.bucket.index.hash.field
bucket索引hash key字段。
主鍵
可以設置成主鍵的子集。
hoodie.bucket.index.num.buckets
bucket索引的bucket個數。
4
默認每個partition的bucket數,當前設置后則不可再變更。
數據讀取
Hudi支持豐富的讀取方案,包括批讀、流讀、增量拉取,同時支持消費、傳播changelog,實現端到端增量ETL。
流讀
當前表默認是快照讀取,即讀取最新的全量快照數據并一次性返回。通過read.streaming.enabled參數開啟流讀模式,通過read.start-commit參數指定起始消費位置,支持指定earliest從最早消費。
名稱
說明
默認值
備注
read.streaming.enabled
是否開啟流讀模式。
false
參數取值如下:
true:開啟流讀模式。
false:關閉流讀模式。
read.start-commit
流讀起始位點
不填
參數取值如下:
yyyyMMddHHmmss:從指定時間點開始消費。
earliest:從最早位點開始消費。
不填:從最新時間開始消費。
clean.retain_commits
cleaner最多保留的歷史commits數。
30
大于此數量的歷史commits會被清理掉,changelog模式下,該參數可以控制changelog的保留時間,例如checkpoint周期為5分鐘一次,默認最少保留150分鐘的時間。
重要僅從0.10.0開始支持流讀changelog。開啟changelog模式后,hudi會保留一段時間的changelog供下游consumer消費。
changelog有可能會被compaction合并掉,中間記錄會消除,可能會影響計算結果。
增量讀取(從Hudi 0.10.0版本開始支持)
支持通過Flink全托管DataStream方式增量消費、Batch增量消費和TimeTravel(Batch消費某個時間點的數據)。
名稱
說明
默認值
備注
read.start-commit
指定起始消費位點。
從最新位置commit
請按yyyyMMddHHmmss格式指定流讀的起始位點。
區間為閉區間,即包含起始和結束。
read.end-commit
指定結束消費位點。
從最新位置commit
代碼示例
源表
CREATE TEMPORARY TABLE blackhole (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'blackhole'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<自定義存儲位置>',
'table.type' = 'MERGE_ON_READ',
'read.streaming.enabled' = 'true'
);
-- 從最新的commit流讀寫入blackhole。
INSERT INTO blackhole SELECT * from hudi_tbl;
結果表
CREATE TEMPORARY TABLE datagen(
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'datagen' ,
'rows-per-second'='100'
);
CREATE TEMPORARY TABLE hudi_tbl (
id INT NOT NULL PRIMARY KEY NOT ENFORCED,
data STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'hudi',
'oss.endpoint' = '<yourOSSEndpoint>',
'accessKeyId' = '${secret_values.ak_id}',
'accessKeySecret' = '${secret_values.ak_secret}',
'path' = 'oss://<yourOSSBucket>/<自定義存儲位置>',
'table.type' = 'MERGE_ON_READ'
);
INSERT INTO hudi_tbl SELECT * from datagen;
Datastream API
通過DataStream的方式讀寫數據時,則需要使用對應的DataStream連接器連接Flink全托管,DataStream連接器設置方法請參見DataStream連接器設置方法。
maven pom
根據使用的VVR版本,指定Flink和Hudi版本。
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <flink.version>1.15.4</flink.version> <hudi.version>0.13.1</hudi.version> </properties> <dependencies> <!-- flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- hudi --> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-flink1.15-bundle</artifactId> <version>${hudi.version}</version> <scope>provided</scope> </dependency> <!-- oss --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-aliyun</artifactId> <version>3.3.2</version> <scope>provided</scope> </dependency> <!-- dlf --> <dependency> <groupId>com.aliyun.datalake</groupId> <artifactId>metastore-client-hive2</artifactId> <version>0.2.14</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.5.1</version> <scope>provided</scope> </dependency> </dependencies>
重要DLF使用的部分依賴與社區版本存在沖突,例如
hive-common
、hive-exec
。如果您有本地測試DLF的需求,可以下載hive-common和hive-execJAR包,然后在IDEA手動導入。寫入到Hudi
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.HoodiePipeline; import java.util.HashMap; import java.util.Map; public class FlinkHudiQuickStart { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String dbName = "test_db"; String tableName = "test_tbl"; String basePath = "oss://xxx"; Map<String, String> options = new HashMap<>(); // hudi conf options.put(FlinkOptions.PATH.key(), basePath); options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts"); options.put(FlinkOptions.DATABASE_NAME.key(), dbName); options.put(FlinkOptions.TABLE_NAME.key(), tableName); // oss conf options.put("hadoop.fs.oss.accessKeyId", "xxx"); options.put("hadoop.fs.oss.accessKeySecret", "xxx"); // 本地調試使用公網網端,例如oss-cn-hangzhou.aliyuncs.com;提交集群使用內網網端,例如oss-cn-hangzhou-internal.aliyuncs.com options.put("hadoop.fs.oss.endpoint", "xxx"); options.put("hadoop.fs.AbstractFileSystem.oss.impl", "org.apache.hadoop.fs.aliyun.oss.OSS"); options.put("hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"); // dlf conf options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); // 可選擇是否同步DLF options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms"); options.put(FlinkOptions.HIVE_SYNC_DB.key(), dbName); options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName); options.put("hadoop.dlf.catalog.id", "xxx"); options.put("hadoop.dlf.catalog.accessKeyId", "xxx"); options.put("hadoop.dlf.catalog.accessKeySecret", "xxx"); options.put("hadoop.dlf.catalog.region", "xxx"); // 本地調試使用公網網端,例如dlf.cn-hangzhou.aliyuncs.com,提交集群使用內網網端,例如dlf-vpc.cn-hangzhou.aliyuncs.com options.put("hadoop.dlf.catalog.endpoint", "xxx"); options.put("hadoop.hive.imetastoreclient.factory.class", "com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory"); DataStream<RowData> dataStream = env.fromElements( GenericRowData.of(StringData.fromString("id1"), StringData.fromString("name1"), 22, StringData.fromString("1001"), StringData.fromString("p1")), GenericRowData.of(StringData.fromString("id2"), StringData.fromString("name2"), 32, StringData.fromString("1002"), StringData.fromString("p2")) ); HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName) .column("uuid string") .column("name string") .column("age int") .column("ts string") .column("`partition` string") .pk("uuid") .partition("partition") .options(options); builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded env.execute("Flink_Hudi_Quick_Start"); } }