日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

開源Flink 1.11及以上版本實時寫入

本文為您介紹開源Flink 1.11如何實時寫入數據至Hologres。

前提條件

  • 開通Hologres實例,并連接開發工具,詳情請參見連接HoloWeb

  • 搭建Flink集群(本次示例使用的是1.15版本),可以前往Flink官網下載二進制包,啟動一個Standalone集群,詳情請參見文檔集群搭建

背景信息

從開源Flink1.11版本開始,Hologres代碼已開源,相應版本的Connector已經在中央倉庫發布Release包,可在項目中參照如下pom文件進行配置。詳細內容請參見Hologres GitHub官方庫

<dependency>
    <groupId>com.alibaba.hologres</groupId>
    <artifactId>hologres-connector-flink-1.15</artifactId>
    <version>1.4.0</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>

開源Flink版本與hologres-connector-flink最新版本對應關系如下,建議使用1.15及以上版本,功能更豐富:

Flink版本

Connector版本

Flink 1.11

hologres-connector-flink-1.11:1.0.1

Flink 1.12

hologres-connector-flink-1.12:1.0.1

Flink 1.13

hologres-connector-flink-1.13:1.3.2

Flink 1.14

hologres-connector-flink-1.14:1.3.2

Flink 1.15

hologres-connector-flink-1.15:1.4.1

Flink 1.17

hologres-connector-flink-1.17:1.4.1

Flink SQL寫入數據至Hologres代碼示例

您可以參照如下代碼示例,通過將Flink SQL將數據寫入Hologres。其中,更多詳細的代碼示例請參見Hologres GitHub官方庫

        String createHologresTable =
                String.format(
                        "create table sink("
                                + "  user_id bigint,"
                                + "  user_name string,"
                                + "  price decimal(38,2),"
                                + "  sale_timestamp timestamp"
                                + ") with ("
                                + "  'connector'='hologres',"
                                + "  'dbname' = '%s',"
                                + "  'tablename' = '%s',"
                                + "  'username' = '%s',"
                                + "  'password' = '%s',"
                                + "  'endpoint' = '%s'"
                                + ")",
                        database, tableName, userName, password, endPoint);
        tEnv.executeSql(createHologresTable);

        createScanTable(tEnv);

        tEnv.executeSql("insert into sink select * from source");

更多詳盡的代碼示例請參見hologres-connector-flink-examples,包括如下示例。

  • FlinkSQLToHoloExample:一個使用純Flink SQL接口實現的應用,將數據寫入至Hologres。

  • FlinkDSAndSQLToHoloExample:一個混合Flink DataStream以及SQL 接口實現的應用,寫入Hologres前,將DataStream轉換成Table,之后再用Flink SQL寫入Hologres。

  • FlinkDataStreamToHoloExample:一個使用純Flink DataStream接口實現的應用,將數據寫入至Hologres。

  • FlinkRoaringBitmapAggJob:一個使用Flink及RoaringBitmap,結合Hologres維表,實現實時去重統計UV的應用,并將統計結果寫入Hologres。

  • 通過Flink DataStream接口的實時數據寫入方法,可以對數據進行基于Hologres Shard的Repartition操作,有效減少寫入Hologres實例時的小文件數量,從而提升寫入性能并降低系統負載。該方法適用于需要批量導入具有主鍵的空表,實現類似Insert Overwrite的場景。

Hologres Flink Connector參數說明

您可以將Flink數據寫入Hologres,Hologres Flink Connector相關參數具體內容如下:

參數

是否必填

說明

connector

結果表類型,固定值為hologres。

dbname

Hologres的數據庫名稱。

tablename

Hologres接收數據的表名稱。

username

當前阿里云賬號的AccessKey ID。

您可以單擊AccessKey 管理,獲取AccessKey ID。

password

當前阿里云賬號的AccessKey Secret。

您可以單擊AccessKey 管理,獲取AccessKey Secret。

endpoint

Hologres的VPC網絡地址。進入Hologres管理控制臺的實例詳情頁,獲取Endpoint。

說明

endpoint需包含端口號,格式為ip:port。同一個區域使用VPC網絡地址,跨區域請使用公共網絡。

  • 連接參數

    參數

    是否必填

    說明

    connectionSize

    單個Flink Hologres Task所創建的JDBC連接池大小。

    默認值:3,和吞吐成正比。

    connectionPoolName

    連接池名稱,同一個TaskManager中,表配置同名的連接池名稱可以共享連接池。

    無默認值,每個表默認使用自己的連接池。如果設置連接池名稱,則所有表的connectionSize需要相同

    fixedConnectionMode

    寫入和點查不占用連接數(beta功能,需要connector版本>=1.2.0,hologres引擎版本>=1.3)

    默認值:false

    jdbcRetryCount

    當連接故障時,寫入和查詢的重試次數。

    默認值:10。

    jdbcRetrySleepInitMs

    每次重試的等待時間=retrySleepInitMs+retry*retrySleepStepMs。

    默認值:1000ms。

    jdbcRetrySleepStepMs

    每次重試的等待時間=retrySleepInitMs+retry*retrySleepStepMs。

    默認值:5000ms。

    jdbcConnectionMaxIdleMs

    寫入線程和點查線程數據庫連接的最大Idle時間,超過連接將被釋放。

    默認值:60000ms。

    jdbcMetaCacheTTL

    TableSchema信息的本地緩存時間。

    默認值:60000ms。

    jdbcMetaAutoRefreshFactor

    當TableSchema cache剩余存活時間短于metaCacheTTL/metaAutoRefreshFactor 將自動刷新cache。

    默認值:-1,表示不自動刷新。

    connection.ssl.mode

    是否啟用數據傳輸加密。取值說明如下:

    • disable(默認值):不啟用傳輸加密。

    • require:啟用SSL,只對數據鏈路加密。

    • verify-ca:啟用SSL,加密數據鏈路,同時使用CA證書驗證Hologres服務端的真實性。

    • verify-full:啟用SSL,加密數據鏈路,使用CA證書驗證Hologres服務端的真實性,同時比對證書內的CN或DNS與連接時配置的Hologres連接地址是否一致。

    connection.ssl.root-cert.location

    CA證書的路徑,且確保已經將CA證書上傳至Flink集群環境。

    說明

    當connection.ssl.mode參數配置為verify-ca或verify-full時,需要配置此參數。

    jdbcDirectConnect

    是否開啟直連。取值說明如下:

    • false(默認值):不開啟。

    • true:開啟。

      Flink批量寫入的瓶頸是VIP Endpoint的網絡吞吐。開啟此參數會測試當前環境能否直連Hologres FE,若支持默認使用直連。

  • 寫入參數

    參數

    是否必填

    說明

    mutatetype

    數據寫入模式,詳情請參見流式語義

    默認值:insertorignore。

    ignoredelete

    是否忽略撤回消息。通常Flink的Group By會產生回撤消息,回撤消息到Hologres connector會產生Delete請求。

    默認值:true,僅在使用流式語義時生效。

    createparttable

    當寫入分區表時,是否自動根據分區值自動創建分區表。

    • false(默認值):不會自動創建。

    • true:自動創建。

    建議慎用該功能,確保分區值不會出現臟數據導致創建錯誤的分區表。

    ignoreNullWhenUpdate

    mutatetype='insertOrUpdate'時,是否忽略更新寫入數據中的Null值。

    默認值:false。

    jdbcWriteBatchSize

    Hologres Sink節點數據攢批的最大批大小。

    默認值:256

    jdbcWriteBatchByteSize

    Hologres Sink節點單個線程數據攢批的最大字節大小。

    默認值:2097152(2 * 1024 * 1024),2MB。

    jdbcWriteBatchTotalByteSize

    Hologres Sink節點所有數據攢批的最大字節大小。

    默認值:20971520(20 * 1024 * 1024),20MB。

    jdbcWriteFlushInterval

    Hologres Sink節點數據攢批的最長Flush等待時間。

    默認值:10000,即10秒。

    jdbcUseLegacyPutHandler

    寫入SQL格式,取值說明如下:

    • true:寫入SQL格式為insert into xxx(c0,c1,...) values (?,?,...),... on conflict;

    • false(默認值):寫入SQL格式為insert into xxx(c0,c1,...) select unnest(?),unnest(?),... on conflict

    jdbcEnableDefaultForNotNullColumn

    設置為true時,not null且未在表上設置default的字段傳入null時,將以默認值寫入。String類型默認為"",Number類型默認為0,Date、timestamp、timestamptz 類型默認為1970-01-01 00:00:00。

    默認值:true。

    remove-u0000-in-text.enabled

    是否自動替換TEXT數據類型中的非UTF-8的u0000字符。取值說明如下:

    • true:替換。

    • false(默認值):不替換。

    deduplication.enabled

    若一批寫入的數據中有主鍵相同的數據,是否進行去重。取值說明如下:

    • true(默認值):進行去重,只保留后到達的這條數據。

    • false:不進行去重。

      首先將批處理數據寫入,待寫入完成后再繼續寫入后到達的數據。

      說明

      在不進行去重時,極端情況下(例如所有數據的主鍵都相同),寫入操作會退化為非批量寫入,對性能會產生一定影響。

    aggressive.enabled

    是否啟用激進提交模式。取值說明如下:

    • true:啟用。

      啟用后,即使批量處理沒有達到預期的條數,只要檢測到連接空閑,系統就會強制提交數據。在流量較小時,這種方法可以有效減少數據傳輸的延時。

    • false(默認值):不啟用。

    jdbcCopyWriteMode

    數據寫入方式。取值說明如下:

    • true:使用Copy方式寫入,Copy寫入方式分為流式Copy(Fixed Copy)和批量Copy,當前默認使用流式Copy(Fixed Copy)方式寫入。

      說明

      與使用INSERT方式寫入相比,Fixed Copy方式可以實現更高的吞吐(因為采用流模式),更低的數據延時以及更低的客戶端內存消耗(因為不需要攢批數據),但不支持數據回撤功能。

    • false(默認值):使用INSERT方式寫入。

    說明

    僅Hologres V1.3.1及以上版本支持此參數。

    jdbcCopyWriteFormat

    底層是否采用二進制協議。

    • binary(默認值):表示使用二進制模式,二進制會更快。

    • text:為文本模式。

    說明

    僅Hologres V1.3.1及以上版本支持此參數。

    bulkLoad

    是否使用批量Copy方式寫入。取值說明如下:

    • true:使用批量Copy方式寫入,僅jdbcCopyWriteMode參數也設置為true時,該參數才會生效,否則使用Fixed Copy方式寫入。

      說明
      • 批量Copy相較于流式Copy(Fixed Copy),具備更高的效率,能更好地利用Hologres的資源,從而在數據寫入過程中提供更優的性能,您可以根據業務需要,選擇合適的數據寫入方式。

      • 在對主鍵表進行批量Copy寫入時,通常會出現表鎖的情況,您可以通過配置target-shards.enabled參數為true,將寫入鎖粒度降至Shard級別,從而允許并發執行多個批量導入任務,減少了表鎖的發生。相比Fixed Copy模式,批量Copy寫入有主鍵表時,通過這種方式能夠顯著降低Hologres實例的負載,實測顯示,可以減少約66.7%的負載。

      • 批量Copy寫入時,如果目標表包含主鍵,要求在寫入之前目標表為空表,否則寫入過程中進行主鍵去重會影響寫入性能。

    • false(默認值):不使用。

    說明

    僅Hologres V1.4.0及以上版本支持此參數。

    target-shards.enabled

    是否啟用Target Shard批量寫入。取值說明如下:

    • true:啟用Target Shard批量寫入,當源數據已按Shard重新分區時,可以將寫入鎖粒度降至Shard級別。

    • false(默認值):不啟用。

    說明

    僅Hologres V1.4.1及以上版本支持此參數。

  • 點查參數

    參數

    是否必填

    說明

    jdbcReadBatchSize

    維表點查最大批次大小。

    默認值:128。

    jdbcReadBatchQueueSize

    維表點查請求緩沖隊列大小。

    默認值:256。

    async

    是否采用異步方式同步數據。

    默認值:false。異步模式可以并發地處理多個請求和響應,從而連續的請求之間不需要阻塞等待,提高查詢的吞吐。但在異步模式下,無法保證請求的絕對順序。

    cache

    緩存策略。

    默認值:None。Hologres僅支持以下兩種緩存策略:None:無緩存。LRU:緩存維表里的部分數據。源表的每條數據都會觸發系統先在Cache中查找數據,如果未找到,則去物理維表中查找。

    cachesize

    緩存大小。

    默認值:10000。選擇LRU緩存策略后,可以設置緩存大小。

    cachettlms

    更新緩存的時間間隔,單位為毫秒。

    當選擇LRU緩存策略后,可以設置緩存失效的超時時間,默認不過期。

    cacheempty

    是否緩存join結果為空的數據。

    默認值:true,表示緩存join結果為空的數據。false:表示不緩存join結果為空的數據。

  • 數據類型映射

    當前Flink全托管與Hologres的數據類型映射請參見Blink/Flink與Hologres的數據類型映射