日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Hudi(退役中)

本文為您介紹如何使用Hudi連接器。

背景信息

Apache Hudi是一種開源的數據湖表格式框架。Hudi基于對象存儲或者HDFS組織文件布局,保證ACID,支持行級別的高效更新和刪除,從而降低數據ETL開發門檻。同時該框架還支持自動管理及合并小文件,保持指定的文件大小,從而在處理數據插入和更新時,不會創建過多的小文件,引發查詢端性能降低,避免手動監控和合并小文件的運維負擔。詳情請參見Apache Hudi

類別

詳情

支持類型

源表和結果表

運行模式

流模式和批模式

數據格式

暫不支持

特有監控指標

  • 源表

    • numRecordsIn

    • numRecordsInPerSecond

  • 結果表

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

說明

指標含義詳情,請參見監控指標說明

API種類

Datastream和SQL

是否支持更新或刪除結果表數據

特色功能

類別

詳情

Hudi的核心特性

  • 支持ACID:支持ACID語義,默認提供SNAPSHOT ISOLATION隔離級別。

  • 支持UPSERT語義:UPSERT語義是INSERT和UPDATE兩種語義的合并。在UPSERT語義時,如果記錄不存在則插入;如果記錄存在則更新。通過INSERT INTO語法可以大幅簡化開發代碼的復雜度,提升效率。

  • 支持Data Version:通過時間旅行(Time Travel)特性,提供任意時間點的數據版本歷史,便于數據運維,提升數據質量。

Hudi的典型場景

  • DB入湖加速

    相比昂貴且低效的傳統批量加載和Merge,Hudi提供超大數據集的實時流式更新寫入。通過實時的ETL,您可以直接將CDC(Change Data Capture)數據寫入數據湖,供下游業務使用。典型案例為采用Flink MySQL CDC Connector將RDBMS(MySQL)的Binlog寫入Hudi表。

  • 增量ETL

    通過增量拉取的方式獲取Hudi中的變更數據流,相對離線ETL調度,實時性更好且更輕量。典型場景是增量拉取在線服務數據到離線存儲中,通過Flink引擎寫入Hudi表,借助Presto或Spark引擎實現高效的OLAP分析。

  • 消息隊列

    在小體量的數據場景下,Hudi也可以作為消息隊列替代Kafka,簡化應用開發架構。

  • 數倉回填(backfill)

    針對歷史全量數據進行部分行、列的更新場景,通過數據湖極大減少計算資源消耗,提升了端到端的性能。典型案例是Hive場景下全量和增量的打寬。

全托管Hudi優勢

相比開源社區Hudi,全托管Flink平臺集成Hudi具有的功能優勢詳情如下所示:

  • 平臺側與Flink全托管集成,免運維

    Flink全托管內置Hudi連接器,降低運維復雜度,提供SLA保障。

  • 完善的數據連通性

    對接多個阿里云大數據計算分析引擎,數據與計算引擎解耦,可以在Flink、Spark、Presto或Hive間無縫流轉。

  • 深度打磨DB入湖場景

    與Flink CDC連接器聯動,降低開發門檻。

  • 提供企業級特性

    包括集成DLF統一元數據視圖、自動且輕量化的表結構變更。

  • 內置阿里云OSS存儲,低成本存儲,彈性擴展

    數據以開放的Parquet、Avro格式存儲在阿里云OSS,存儲計算分離,資源靈活彈性擴展。

使用限制

  • 僅Flink計算引擎vvr-4.0.11-flink-1.13及以上版本支持Hudi Connector。

  • 文件系統僅支持HDFS或阿里云OSS和OSS-HDFS服務。

  • 不支持以Session模式提交作業。

  • 不支持修改字段,如需修改,請在DLF控制臺通過Spark SQL語句進行操作。

語法結構

CREATE TEMPORARY TABLE hudi_tbl (
  uuid BIGINT,
  data STRING,
  ts   TIMESTAMP(3),
  PRIMARY KEY(uuid) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'path' = 'oss://<yourOSSBucket>/<自定義存儲位置>',
  ...
);

WITH參數

基礎參數

  • 通用

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    connector

    表類型。

    String

    固定值為hudi。

    path

    表存儲路徑。

    String

    支持阿里云OSS、HDFS和OSS-HDFS和三種路徑。

    • OSS:路徑格式為oss://<bucket>/<user-defined-dir>

    • HDFS:路徑格式為hdfs://<user-defined-dir>

    • OSS-HDFS:路徑格式為oss://<bucket>.<oss-hdfs-endpoint>/<user-defined-dir>

      說明

      僅Flink計算引擎VVR 8.0.3及以上版本支持該參數配置為OSS-HDFS路徑。

    其中:

    • bucket:表示您創建的OSS Bucket名稱。

    • user-defined-dir:表示數據存放路徑。

    • oss-hdfs-endpoint:表示OSS-HDFS服務Endpoint。

      您可以在OSS實例概覽頁面的訪問端口中查看HDFSEndpoint信息。

    hoodie.datasource.write.recordkey.field

    主鍵字段。

    String

    uuid

    • 支持通過PRIMARY KEY語法設置主鍵字段。

    • 支持使用英文逗號(,)分隔多個字段。

    precombine.field

    版本字段。

    String

    ts

    基于此字段的大小來判斷消息是否進行更新。

    如果您沒有設置該參數,則系統默認會按照消息在引擎內部處理的先后順序進行更新。

    oss.endpoint

    阿里云對象存儲服務OSS或者OSS-HDFS的Endpoint。

    String

    如果使用OSS或者OSS-HDFS作為存儲,則必需填寫。

    • 使用OSS時,參數取值詳情請參見OSS地域和訪問域名

    • 使用OSS-HDFS時,您可以在OSS實例概覽頁面的訪問端口中查看HDFS服務Endpoint信息。

    accessKeyId

    阿里云賬號的AccessKey ID。

    String

    如果使用OSS或者OSS-HDFS作為存儲,則必需填寫。

    詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量和密鑰管理

    accessKeySecret

    阿里云賬號的AccessKey Secret。

    String

    如果使用OSS或者OSS-HDFS作為存儲,則必需填寫。

    詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量和密鑰管理

  • 源表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    read.streaming.enabled

    是否開啟流讀。

    boolean

    false

    參數取值如下:

    • true:開啟流讀。

    • false:不開啟流讀。

    read.start-commit

    讀取起始位點。

    string

    不填

    參數取值如下:

    • yyyyMMddHHmmss:從指定時間點開始消費。

    • earliest:從最早位點開始消費。

    • 不填:從最新時間開始消費。

  • 結果表獨有

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    write.operation

    寫入操作模式。

    String

    UPSERT

    參數取值如下:

    • insert模式:數據追加寫。

    • upsert模式:數據更新。

    • bulk_insert模式:數據批量追加寫。

    hive_sync.enable

    是否開啟同步元數據到Hive功能。

    boolean

    false

    參數取值如下:

    • true:開啟同步元數據到Hive功能。

    • false:關閉同步元數據到Hive功能。

    hive_sync.mode

    Hive數據同步模式。

    String

    hms

    參數取值如下:

    • hms:元數據同步到Hive Metastore或者DLF時,需要設置為hms。

    • jdbc:元數據同步到jdbc時,需要設置為jdbc。

    hive_sync.db

    同步到Hive的數據庫名稱。

    String

    default

    無。

    hive_sync.table

    同步到Hive的表名稱。

    String

    當前table名

    hudi同步到Hive的表名不能使用中劃線( -)。

    dlf.catalog.region

    DLF服務的地域名。

    String

    詳情請參見已開通的地域和訪問域名

    說明
    • 僅當hive_sync.mode設置為hms時,dlf.catalog.region參數設置才生效。

    • 請和dlf.catalog.endpoint選擇的地域保持一致。

    dlf.catalog.endpoint

    DLF服務的Endpoint。

    String

    詳情請參見已開通的地域和訪問域名

    說明
    • 僅當hive_sync.mode設置為hms時,dlf.catalog.endpoint參數設置才生效。

    • 推薦您為dlf.catalog.endpoint參數配置DLF的VPC Endpoint。例如,如果您選擇的地域為cn-hangzhou地域,則dlf.catalog.endpoint參數需要配置為dlf-vpc.cn-hangzhou.aliyuncs.com

    • 如果您需要跨VPC訪問DLF,則請參見如何訪問跨VPC的其他服務?

高階參數

Hudi支持豐富的寫入和讀取場景,不同場景的參數如下表所示。

并發參數

名稱

說明

默認值

備注

write.tasks

writer的并發,每個writer順序寫1~N個buckets。

4

增加寫任務的并發對小文件個數沒影響

write.bucket_assign.tasks

bucket assigner的并發。

Flink并發度

增加寫任務的并發同時增加了寫任務的bucket數,也就是增加了小文件(小bucket)數。

write.index_bootstrap.tasks

Index bootstrap算子的并發。

Flink并發度

  • 只在index.bootstrap.enabled為true時生效。

  • 增加并發可以加快bootstrap階段的效率,bootstrap階段會阻塞checkpoint,因此需要設置多一些的checkpoint失敗容忍次數。

read.tasks

流和批讀算子的并發。

4

無。

compaction.tasks

online compaction算子的并發。

4

online compaction比較耗費資源,建議走offline compaction。

在線壓縮參數

名稱

說明

默認值

備注

compaction.schedule.enabled

是否階段性生成壓縮plan。

true

參數取值如下:

  • true:階段性生成壓縮plan。

  • false:不階段性生成壓縮plan。

說明

建議階段性生成壓縮plan,即使compaction.async.enabled關閉的情況下。

compaction.async.enabled

是否開啟異步壓縮。

true

參數取值如下:

  • true:開啟

  • false:關閉

說明

通過關閉compaction.async.enabled參數可關閉在線壓縮執行,但是調度compaction.schedule.enabled仍然建議開啟,之后可通過離線異步壓縮,執行階段性生成的壓縮plan。

compaction.tasks

壓縮任務的并發數。

4

無。

compaction.trigger.strategy

壓縮策略。

num_commits

支持以下壓縮策略:

  • num_commits:根據commit個數周期性觸發。

  • time_elapsed:根據時間間隔周期性觸發。

  • num_and_time:同時滿足commit個數和時間間隔。

  • num_or_time:滿足commit個數或者時間間隔。

compaction.delta_commits

經過多少個commit觸發壓縮。

5

無。

compaction.delta_seconds

經過多少秒后觸發壓縮。

3600

單位為秒。

compaction.max_memory

用于壓縮去重的hashmap的可用內存大小。

100 MB

資源夠用時,建議調整到1 GB。

compaction.target_io

每個壓縮plan的IO上限。

500 GB

無。

文件大小

文件參數控制了文件的大小,目前支持的參數詳情如下表所示。

名稱

說明

默認值

備注

hoodie.parquet.max.file.size

最大可寫入的parquet文件大小。

超過可寫入的parquet文件大小時,將寫入到新的文件組。

120 * 1024 * 1024 byte

(120 MB)

單位是byte。

hoodie.parquet.small.file.limit

小文件的大小閾值,小于該參數的文件被認為是小文件。

104857600 byte(100 MB)

  • 單位是byte。

  • 在寫入時,hudi會嘗試先追加寫已存小文件。

hoodie.copyonwrite.record.size.estimate

預估的record大小。

1024 byte(1 KB)

  • 單位為byte。

  • 如果沒有顯示指定,hudi會根據提交元數據動態估計record大小.

Hadoop參數

名稱

說明

默認值

備注

hadoop.${you option key}

通過hadoop.前綴指定hadoop配置項。

支持同時指定多個hadoop配置項。

說明

從Hudi 0.12.0開始支持,針對跨集群提交執行的需求,可以通過DDL指定per-job級別的hadoop配置。

數據寫入

Hudi支持豐富的寫入方式,包括離線批量寫入、流式寫入等場景。支持豐富的數據類型,包括changelog以及log數據。同時支持不同的索引方案。

  • 離線批量寫入

    針對存量數據導入Hudi的需求,如果存量數據來源于其他數據源,可以使用批量導入功能,快速將存量數據導成Hoodie表格式。

    名稱

    說明

    默認值

    備注

    write.operation

    寫操作類型。

    upsert

    參數取值如下:

    • upsert:插入更新

    • insert:插入

    • bulk_insert:批量寫入

      說明
      • bulk_insert導入省去了avro的序列化以及數據的merge過程,沒有去重操作,數據的唯一性需要自己來保證。

      • bulk_insert需要在Batch Execution Mode下執行,Batch模式默認會按照分區名稱排序輸入消息再寫入Hoodie,避免file handle頻繁切換導致性能下降。

    write.tasks

    bulk_insert寫任務的并發。

    Flink的并發度

    bulk_insert寫任務的并發通過參數write.tasks指定,并發的數量會影響到小文件的數量。

    理論上,bulk_insert寫任務的并發數就是劃分的bucket數,當每個bucket在寫到文件大小上限(parquet 120 MB)時,會滾動到新句柄,所以最終的寫文件數量大于等于bulk_insert寫任務的并發。

    write.bulk_insert.shuffle_input

    是否將數據按照partition字段shuffle再通過write task寫入。

    true

    從Hudi 0.11.0版本開始,開啟該參數將減少小文件的數量,但是可能有數據傾斜風險。

    write.bulk_insert.sort_input

    是否將數據先按照partition字段排序再寫入。

    true

    從Hudi 0.11.0版本開始支持,當一個write task寫多個partition,開啟可以減少小文件數量

    write.sort.memory

    sort算子的可用managed memory。

    128

    單位是MB。

  • Changelog模式

    該模式只有MOR表支持,在該模式下Hoodie會保留消息的所有變更(I/-U/U/D),之后再配合Flink引擎的有狀態計算實現全鏈路近實時數倉生產增量計算。Hoodie的MOR表通過行存原生支持保留消息的所有變更(format層面的集成),通過Flink全托管流讀單個MOR表可以消費到所有的變更記錄。

    說明

    非changelog模式,流讀單次的batch數據集會merge中間變更;批讀(快照讀)會合并所有的中間結果,不管中間狀態是否已被寫入,都將被忽略。

    名稱

    說明

    默認值

    備注

    changelog.enabled

    是否消費所有變更。

    false

    參數取值如下:

    • true:支持消費所有變更。

    • false:不消費所有變更,即UPSERT語義,所有的消息僅保證最后一條合并消息,中間的變更可能會被merge掉。

    說明

    開啟changelog.enabled參數后,異步的壓縮任務仍然會將中間變更合并成1條數據,所以如果流讀消費不夠及時,被壓縮后只能讀到最后一條記錄。但是,可以通過調整壓縮的頻率,預留一定的時間buffer給 reader,比如調整compaction.delta_commits:5和compaction.delta_seconds: 3600壓縮參數。

  • Append模式(從Hudi 0.10.0版本開始支持)

    在該模式下:

    • MOR表會應用小文件策略:會追加寫avro log文件。

    • COW表沒有小文件策略:每次寫入COW表直接寫新的parquet文件。

Clustering策略

Hudi支持豐富的Clustering策略,從而優化INSERT模式下的小文件問題。

  • Inline Clustering(只有Copy On Write表支持該模式)

    名稱

    說明

    默認值

    備注

    write.insert.cluster

    是否在寫入時合并小文件。

    false

    參數取值如下:

    • true:在寫入時,合并小文件。

    • false:在寫入時,不合并小文件。

    說明

    COW表默認insert寫不合并小文件,開啟該參數后,每次寫入會優先合并之前的小文件,但不會去重,吞吐會受影響。

  • Async Clustering(從Huid 0.12.0版本開始支持)

    名稱

    說明

    默認值

    備注

    clustering.schedule.enabled

    是否在寫入時定時調度Clustering plan。

    false

    開啟后周期性調度clustering plan。

    clustering.delta_commits

    經過多少個commits生成Clustering plan。

    4

    clustering.schedule.enabled為true時,生效。

    clustering.async.enabled

    是否異步執行Clustering plan。

    false

    開啟后周期性異步執行,合并小文件。

    clustering.tasks

    Clustering task執行并發。

    4

    無。

    clustering.plan.strategy.target.file.max.bytes

    Clustering單文件目標大小。

    1024 * 1024 * 1024

    單位是byte。

    clustering.plan.strategy.small.file.limit

    Clustering小文件閾值。

    600

    小于該大小的文件才會參與clustering。

    clustering.plan.strategy.sort.columns

    Clustering排序字段。

    支持指定特殊的排序字段。

  • Clustering Plan Strategy

    名稱

    說明

    默認值

    備注

    clustering.plan.partition.filter.mode

    Clustering分區過濾模式。

    NONE

    支持的模式如下:

    • NONE:不過濾分區,所有分區都用于聚合,即不做限制。

    • RECENT_DAYS:數據按分區時,合并最近N天的數據。

    • SELECTED_PARTITIONS:指定固定的分區。

    clustering.plan.strategy.daybased.lookback.partitions

    采用RECENT_DAYS模式下的目標分區天數。

    2

    僅當clustering.plan.partition.filter.mode取值為RECENT_DAYS時生效。

    clustering.plan.strategy.cluster.begin.partition

    指定開始分區,用于過濾分區。

    僅當clustering.plan.partition.filter.mode取值為SELECTED_PARTITIONS時有效。

    clustering.plan.strategy.cluster.end.partition

    指定結束分區,用于過濾分區。

    僅當clustering.plan.partition.filter.mode取值為SELECTED_PARTITIONS時有效。

    clustering.plan.strategy.partition.regex.pattern

    通過正則表達式指定目標分區。

    無。

    clustering.plan.strategy.partition.selected

    指定目標partitions。

    支持通過英文逗號(,)分割多個partition。

  • Bucket索引

    說明

    從Hudi 0.11.0版本開始支持以下表格中的參數。

    名稱

    說明

    默認值

    備注

    index.type

    索引類型。

    FLINK_STATE

    參數取值如下:

    • FLINK_STATE:使用flink state索引。

    • BUCKET:使用bucket索引。

    當數據量比較大時(表的數據條目超過5 億),flink state的存儲開銷可能成為瓶頸。bucket索引通過固定的hash策略,將相同key的數據分配到同一個fileGroup中,可以避免索引的存儲和查詢開銷。bucket index和flink state索引對比有以下區別:

    • bucket index沒有flink state的存儲計算開銷,性能較好。

    • bucket index無法擴buckets,state index則可以依據文件的大小動態增加文件個數。

    • bucket index不支持跨partition的變更(如果輸入是cdc流則沒有這個限制),state index沒有限制。

    hoodie.bucket.index.hash.field

    bucket索引hash key字段。

    主鍵

    可以設置成主鍵的子集。

    hoodie.bucket.index.num.buckets

    bucket索引的bucket個數。

    4

    默認每個partition的bucket數,當前設置后則不可再變更。

數據讀取

  • Hudi支持豐富的讀取方案,包括批讀、流讀、增量拉取,同時支持消費、傳播changelog,實現端到端增量ETL。

    • 流讀

      當前表默認是快照讀取,即讀取最新的全量快照數據并一次性返回。通過read.streaming.enabled參數開啟流讀模式,通過read.start-commit參數指定起始消費位置,支持指定earliest從最早消費。

      名稱

      說明

      默認值

      備注

      read.streaming.enabled

      是否開啟流讀模式。

      false

      參數取值如下:

      • true:開啟流讀模式。

      • false:關閉流讀模式。

      read.start-commit

      流讀起始位點

      不填

      參數取值如下:

      • yyyyMMddHHmmss:從指定時間點開始消費。

      • earliest:從最早位點開始消費。

      • 不填:從最新時間開始消費。

      clean.retain_commits

      cleaner最多保留的歷史commits數。

      30

      大于此數量的歷史commits會被清理掉,changelog模式下,該參數可以控制changelog的保留時間,例如checkpoint周期為5分鐘一次,默認最少保留150分鐘的時間。

      重要
      • 僅從0.10.0開始支持流讀changelog。開啟changelog模式后,hudi會保留一段時間的changelog供下游consumer消費。

      • changelog有可能會被compaction合并掉,中間記錄會消除,可能會影響計算結果。

    • 增量讀取(從Hudi 0.10.0版本開始支持)

      支持通過Flink全托管DataStream方式增量消費、Batch增量消費和TimeTravel(Batch消費某個時間點的數據)。

      名稱

      說明

      默認值

      備注

      read.start-commit

      指定起始消費位點。

      從最新位置commit

      請按yyyyMMddHHmmss格式指定流讀的起始位點。

      區間為閉區間,即包含起始和結束。

      read.end-commit

      指定結束消費位點。

      從最新位置commit

代碼示例

  • 源表

CREATE TEMPORARY TABLE blackhole (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'blackhole'      
);

CREATE TEMPORARY TABLE hudi_tbl (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'hudi', 
  'oss.endpoint' = '<yourOSSEndpoint>', 
  'accessKeyId' = '${secret_values.ak_id}', 
  'accessKeySecret' = '${secret_values.ak_secret}', 
  'path' = 'oss://<yourOSSBucket>/<自定義存儲位置>',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true'
);

-- 從最新的commit流讀寫入blackhole。
INSERT INTO blackhole SELECT * from hudi_tbl;
  • 結果表

CREATE TEMPORARY TABLE datagen(
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data  STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'datagen' ,
  'rows-per-second'='100' 
);

CREATE TEMPORARY TABLE hudi_tbl (
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  data STRING,
  ts TIMESTAMP(3)
) WITH (
  'connector' = 'hudi', 
  'oss.endpoint' = '<yourOSSEndpoint>', 
  'accessKeyId' = '${secret_values.ak_id}', 
  'accessKeySecret' = '${secret_values.ak_secret}', 
  'path' = 'oss://<yourOSSBucket>/<自定義存儲位置>',
  'table.type' = 'MERGE_ON_READ'
);

INSERT INTO hudi_tbl SELECT * from datagen;

Datastream API

重要

通過DataStream的方式讀寫數據時,則需要使用對應的DataStream連接器連接Flink全托管,DataStream連接器設置方法請參見DataStream連接器設置方法

  • maven pom

    根據使用的VVR版本,指定Flink和Hudi版本。

    <properties>
      <maven.compiler.source>8</maven.compiler.source>
      <maven.compiler.target>8</maven.compiler.target>
      <flink.version>1.15.4</flink.version>
      <hudi.version>0.13.1</hudi.version>
    </properties>
    
    <dependencies>
      <!-- flink -->
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-common</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- hudi -->
      <dependency>
        <groupId>org.apache.hudi</groupId>
        <artifactId>hudi-flink1.15-bundle</artifactId>
        <version>${hudi.version}</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- oss -->
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.3.2</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aliyun</artifactId>
        <version>3.3.2</version>
        <scope>provided</scope>
      </dependency>
    
      <!-- dlf -->
      <dependency>
        <groupId>com.aliyun.datalake</groupId>
        <artifactId>metastore-client-hive2</artifactId>
        <version>0.2.14</version>
        <scope>provided</scope>
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>2.5.1</version>
        <scope>provided</scope>
      </dependency>
    </dependencies>
    重要

    DLF使用的部分依賴與社區版本存在沖突,例如hive-commonhive-exec。如果您有本地測試DLF的需求,可以下載hive-commonhive-execJAR包,然后在IDEA手動導入。

  • 寫入到Hudi

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.data.GenericRowData;
    import org.apache.flink.table.data.RowData;
    import org.apache.flink.table.data.StringData;
    import org.apache.hudi.common.model.HoodieTableType;
    import org.apache.hudi.configuration.FlinkOptions;
    import org.apache.hudi.util.HoodiePipeline;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class FlinkHudiQuickStart {
    
      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        String dbName = "test_db";
        String tableName = "test_tbl";
        String basePath = "oss://xxx";
    
        Map<String, String> options = new HashMap<>();
        // hudi conf
        options.put(FlinkOptions.PATH.key(), basePath);
        options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
        options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
        options.put(FlinkOptions.DATABASE_NAME.key(), dbName);
        options.put(FlinkOptions.TABLE_NAME.key(), tableName);
        // oss conf
        options.put("hadoop.fs.oss.accessKeyId", "xxx");
        options.put("hadoop.fs.oss.accessKeySecret", "xxx");
        // 本地調試使用公網網端,例如oss-cn-hangzhou.aliyuncs.com;提交集群使用內網網端,例如oss-cn-hangzhou-internal.aliyuncs.com
        options.put("hadoop.fs.oss.endpoint", "xxx");
        options.put("hadoop.fs.AbstractFileSystem.oss.impl", "org.apache.hadoop.fs.aliyun.oss.OSS");
        options.put("hadoop.fs.oss.impl", "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem");
        // dlf conf
        options.put(FlinkOptions.HIVE_SYNC_ENABLED.key(), "true"); // 可選擇是否同步DLF
        options.put(FlinkOptions.HIVE_SYNC_MODE.key(), "hms");
        options.put(FlinkOptions.HIVE_SYNC_DB.key(), dbName);
        options.put(FlinkOptions.HIVE_SYNC_TABLE.key(), tableName);
        options.put("hadoop.dlf.catalog.id", "xxx");
        options.put("hadoop.dlf.catalog.accessKeyId", "xxx");
        options.put("hadoop.dlf.catalog.accessKeySecret", "xxx");
        options.put("hadoop.dlf.catalog.region", "xxx");
        //  本地調試使用公網網端,例如dlf.cn-hangzhou.aliyuncs.com,提交集群使用內網網端,例如dlf-vpc.cn-hangzhou.aliyuncs.com
        options.put("hadoop.dlf.catalog.endpoint", "xxx");
        options.put("hadoop.hive.imetastoreclient.factory.class", "com.aliyun.datalake.metastore.hive2.DlfMetaStoreClientFactory");
    
        DataStream<RowData> dataStream = env.fromElements(
            GenericRowData.of(StringData.fromString("id1"), StringData.fromString("name1"), 22,
                StringData.fromString("1001"), StringData.fromString("p1")),
            GenericRowData.of(StringData.fromString("id2"), StringData.fromString("name2"), 32,
                StringData.fromString("1002"), StringData.fromString("p2"))
        );
    
        HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName)
            .column("uuid string")
            .column("name string")
            .column("age int")
            .column("ts string")
            .column("`partition` string")
            .pk("uuid")
            .partition("partition")
            .options(options);
    
        builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded
        env.execute("Flink_Hudi_Quick_Start");
      }
    }

常見問題