日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Flink批量導(dǎo)入Hologres

Hologres推出的新版Flink Connector插件,支持通過Flink將數(shù)據(jù)批量導(dǎo)入到Hologres,實(shí)現(xiàn)高效且低負(fù)載的數(shù)據(jù)導(dǎo)入。

背景信息

在大數(shù)據(jù)處理領(lǐng)域,Hologres作為一款強(qiáng)大的在線分析處理(OLAP)系統(tǒng),與Flink的集成提供了強(qiáng)大的實(shí)時數(shù)據(jù)流處理能力。然而,對數(shù)據(jù)時效性要求不高的場景,如歷史數(shù)據(jù)的批量加載、離線數(shù)據(jù)處理或日志聚合等任務(wù),推薦使用Flink批量導(dǎo)入Hologres。批量導(dǎo)入能夠以更高效、節(jié)約資源的方式將大量數(shù)據(jù)一次性寫入Hologres,不僅提升了導(dǎo)入效率,還兼顧了資源利用率。您可以根據(jù)自身的業(yè)務(wù)特性和資源狀況,靈活選擇實(shí)時導(dǎo)入或批量導(dǎo)入。關(guān)于實(shí)時導(dǎo)入詳情,請參見Flink全托管

前提條件

阿里云實(shí)時計(jì)算Flink版批量導(dǎo)入

  1. 通過連接HoloWeb并執(zhí)行查詢創(chuàng)建Hologres結(jié)果表,用于接收Flink導(dǎo)入的數(shù)據(jù)。本文以test_sink_customer表為例。

    -- 創(chuàng)建Hologres結(jié)果表
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
    
      orientation="column"
    );
    說明

    Flink源表需要與Hologres的結(jié)果表的字段名稱和類型保持一致。

  2. 登錄實(shí)時計(jì)算控制臺,在作業(yè)運(yùn)維頁面單擊部署作業(yè),配置部署作業(yè)并單擊部署。配置參數(shù)詳情,請參見部署JAR作業(yè)

    其中主要參數(shù)介紹,如下表所示。

    參數(shù)

    說明

    部署作業(yè)類型

    選擇為JAR。

    部署模式

    支持流模式或批模式,本文選擇批模式。

    引擎版本

    引擎版本詳情請參見引擎版本介紹生命周期策略。本文以vvr-8.0.7-flink-1.17版本為例。

    JAR URI

    上傳開源Flink Connector:hologres-connector-flink-repartition.jar

    說明

    通過開源Flink Connector支持將數(shù)據(jù)批量導(dǎo)入Hologres,F(xiàn)link Connector插件相關(guān)開源代碼,詳情請參見Hologres GitHub官方庫

    Entry Point Class

    程序的入口類。Flink Connector指定主類名稱為:com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample

    Entry Point Main Arguments

    傳入repartition.sql文件的路徑參數(shù)。實(shí)時計(jì)算 Flink運(yùn)行時,附加依賴文件的路徑在/flink/usrlib/下,因此完整的參數(shù)為:--sqlFilePath="/flink/usrlib/repartition.sql"

    附加依賴文件

    上傳repartition.sql文件。repartition.sql是Flink SQL腳本的文件,主要用于定義數(shù)據(jù)源、結(jié)果表聲明以及Hologres的連接信息,本文repartition.sql文件的示例內(nèi)容如下。

    --sourceDDL,本文使用了Flink DataGen公共測試數(shù)據(jù)作為源數(shù)據(jù)。
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    --sourceDql,源表查詢語句應(yīng)當(dāng)保證查詢結(jié)果與sinkDDL聲明的結(jié)果表對應(yīng),包括字段數(shù)量和字段類型。
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- sinkDDL,結(jié)果表聲明以及配置連接Hologres信息。
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'LTAI5tJCNqeCY3DtKw8c****'
      ,'password' = 'tjxLtsXV8LRKOlmBQ3I0LkbHnm****'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );
    說明

    repartition.sql文件中連接Hologres的更多參數(shù)介紹詳情,請參見Hologres Flink Connector參數(shù)說明

  3. 單擊作業(yè)名稱,進(jìn)入部署詳情看板,編輯資源配置,修改并發(fā)度

    說明

    建議與Hologres結(jié)果表的ShardCount數(shù)保持一致。

  4. 查詢Hologres結(jié)果表。

    Flink作業(yè)提交成功后,您可以在Hologres中查詢寫入的數(shù)據(jù)。示例語句如下。

    SELECT * FROM test_sink_customer;

開源Flink批量導(dǎo)入

  1. 通過連接HoloWeb并執(zhí)行查詢創(chuàng)建Hologres結(jié)果表,用于接收Flink導(dǎo)入的數(shù)據(jù)。本文以test_sink_customer表為例。

    -- 創(chuàng)建Hologres結(jié)果表
    CREATE TABLE test_sink_customer
    (
      c_custkey     BIGINT,
      c_name        TEXT,
      c_address     TEXT,
      c_nationkey   INT,
      c_phone       TEXT,
      c_acctbal     NUMERIC(15,2),
      c_mktsegment  TEXT,
      c_comment     TEXT,
      "date"        DATE
    ) WITH (
      distribution_key="c_custkey,date", 
    
      orientation="column"
    );
    說明

    您可以根據(jù)數(shù)據(jù)量合理設(shè)置Shard數(shù),關(guān)于Shard詳情,請參見Table Group與Shard Count操作指南

  2. 創(chuàng)建repartition.sql文件并上傳至Flink集群環(huán)境中的任意位置。本文以上傳至/flink-1.15.4/src/repartition.sql路徑為例,repartition.sql文件的示例內(nèi)容如下。

    說明

    repartition.sql是Flink SQL腳本的文件,主要用于定義數(shù)據(jù)源、結(jié)果表聲明以及Hologres的連接信息。

    --sourceDDL,本文使用了Flink DataGen公共測試數(shù)據(jù)作為源數(shù)據(jù)。
    CREATE TEMPORARY TABLE source_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'number-of-rows' = '1000000'
    );
    
    --sourceDql,源表查詢語句應(yīng)當(dāng)保證查詢結(jié)果與sinkDDL聲明的結(jié)果表對應(yīng),包括字段數(shù)量和字段類型。
    SELECT *, cast('2024-04-21' as DATE) FROM source_table;
    
    -- sinkDDL,結(jié)果表聲明以及配置連接Hologres信息。
    CREATE TABLE sink_table
    (
      c_custkey     BIGINT
      ,c_name       STRING
      ,c_address    STRING
      ,c_nationkey  INTEGER
      ,c_phone      STRING
      ,c_acctbal    NUMERIC(15, 2)
      ,c_mktsegment STRING
      ,c_comment    STRING
      ,`date`       DATE
    )
    WITH (
      'connector' = 'hologres'
      ,'dbname' = 'doc_****'
      ,'tablename' = 'test_sink_customer'
      ,'username' = 'LTAI5tJCNqeCY3DtKw8c****'
      ,'password' = 'tjxLtsXV8LRKOlmBQ3I0LkbHnm****'
      ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80'
      ,'jdbccopywritemode' = 'true'
      ,'bulkload' = 'true'
      ,'target-shards.enabled'='true'
    );

    其中主要參數(shù)解釋如下:

    參數(shù)

    是否必填

    說明

    connector

    結(jié)果表類型,固定值為hologres。

    dbname

    Hologres的數(shù)據(jù)庫名稱。

    tablename

    Hologres接收數(shù)據(jù)的表名稱。

    username

    當(dāng)前阿里云賬號的AccessKey ID。

    您可以單擊AccessKey 管理,獲取AccessKey ID。

    password

    當(dāng)前阿里云賬號AccessKey ID對應(yīng)的AccessKey Secret。

    endpoint

    Hologres的VPC網(wǎng)絡(luò)地址。進(jìn)入Hologres管理控制臺的實(shí)例詳情頁,從實(shí)例配置獲取Endpoint。

    說明

    endpoint需包含端口號,格式為ip:port同一個區(qū)域使用VPC網(wǎng)絡(luò)地址,跨區(qū)域請使用公共網(wǎng)絡(luò)。

    jdbccopywritemode

    數(shù)據(jù)寫入方式。取值說明如下:

    • false(默認(rèn)值):使用INSERT方式寫入。

    • true:使用Copy方式寫入,Copy寫入方式分為流式Copy(Fixed Copy)和批量Copy,當(dāng)前默認(rèn)使用流式Copy(Fixed Copy)方式寫入。

      說明

      與使用INSERT方式寫入相比,F(xiàn)ixed Copy方式可以實(shí)現(xiàn)更高的吞吐(因?yàn)椴捎昧髂J剑偷臄?shù)據(jù)延時以及更低的客戶端內(nèi)存消耗(因?yàn)椴恍枰獢€批數(shù)據(jù)),但不支持?jǐn)?shù)據(jù)回撤功能。

    bulkload

    是否使用批量Copy方式寫入。取值說明如下:

    • true:使用批量Copy方式寫入,僅jdbccopywritemode參數(shù)也設(shè)置為true時,該參數(shù)才會生效,否則使用Fixed Copy方式寫入。

      說明
      • 批量Copy相較于流式Copy(Fixed Copy),具備更高的效率,能更好地利用Hologres的資源,從而在數(shù)據(jù)寫入過程中提供更優(yōu)的性能,您可以根據(jù)業(yè)務(wù)需要,選擇合適的數(shù)據(jù)寫入方式。

      • 在對主鍵表進(jìn)行批量Copy寫入時,通常會出現(xiàn)表鎖的情況,您可以通過配置target-shards.enabled參數(shù)為true,將寫入鎖粒度降至Shard級別,從而允許并發(fā)執(zhí)行多個批量導(dǎo)入任務(wù),減少了表鎖的發(fā)生。相比Fixed Copy模式,批量Copy寫入有主鍵表時,通過這種方式能夠顯著降低Hologres實(shí)例的負(fù)載,實(shí)測顯示,可以減少約66.7%的負(fù)載。

      • 批量Copy寫入時,如果目標(biāo)表包含主鍵,要求在寫入之前目標(biāo)表為空表,否則寫入過程中進(jìn)行主鍵去重會影響寫入性能。

    • false(默認(rèn)值):不使用。

    target-shards.enabled

    是否啟用Target Shard批量寫入。取值說明如下:

    • true:啟用Target Shard批量寫入,當(dāng)源數(shù)據(jù)已按Shard重新分區(qū)時,可以將寫入鎖粒度降至Shard級別。

    • false(默認(rèn)值):不啟用。

    說明

    repartition.sql文件中連接Hologres的更多參數(shù)介紹詳情,請參見Hologres Flink Connector參數(shù)說明

  3. 在Flink集群環(huán)境中,上傳開源Flink Connector:hologres-connector-flink-repartition.jar至任意目錄下。本文以上傳至根目錄為例。

    說明

    通過開源Flink Connector支持將數(shù)據(jù)批量導(dǎo)入Hologres,F(xiàn)link Connector插件相關(guān)開源代碼,詳情請參見Hologres GitHub官方庫

  4. 提交Flink作業(yè),代碼示例如下。

    ./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"

    上述參數(shù)說明:

    • Dexecution.runtime-mode:Flink作業(yè)的執(zhí)行模式,詳情請參見執(zhí)行模式

    • p:作業(yè)并發(fā)數(shù)。建議在配置作業(yè)并發(fā)數(shù)時取值與結(jié)果表的ShardCount相同,或者可以被ShardCount整除。

    • c:hologres-connector-flink-repartition.jar的主類名稱以及所在的路徑。

    • sqlFilePath:repartition.sql文件的路徑。

  5. 查詢Hologres結(jié)果表。

    Flink作業(yè)提交成功后,您可以在Hologres中查詢寫入的數(shù)據(jù)。示例語句如下。

    SELECT * FROM test_sink_customer;