Flink批量導(dǎo)入Hologres
Hologres推出的新版Flink Connector插件,支持通過Flink將數(shù)據(jù)批量導(dǎo)入到Hologres,實(shí)現(xiàn)高效且低負(fù)載的數(shù)據(jù)導(dǎo)入。
背景信息
在大數(shù)據(jù)處理領(lǐng)域,Hologres作為一款強(qiáng)大的在線分析處理(OLAP)系統(tǒng),與Flink的集成提供了強(qiáng)大的實(shí)時數(shù)據(jù)流處理能力。然而,對數(shù)據(jù)時效性要求不高的場景,如歷史數(shù)據(jù)的批量加載、離線數(shù)據(jù)處理或日志聚合等任務(wù),推薦使用Flink批量導(dǎo)入Hologres。批量導(dǎo)入能夠以更高效、節(jié)約資源的方式將大量數(shù)據(jù)一次性寫入Hologres,不僅提升了導(dǎo)入效率,還兼顧了資源利用率。您可以根據(jù)自身的業(yè)務(wù)特性和資源狀況,靈活選擇實(shí)時導(dǎo)入或批量導(dǎo)入。關(guān)于實(shí)時導(dǎo)入詳情,請參見Flink全托管。
前提條件
已購買Hologres實(shí)例。具體操作,請參見購買Hologres。
已部署Flink 1.15及以上版本集群環(huán)境。具體操作,詳情請參見
開源Flink:部署Flink。
阿里云實(shí)時計(jì)算Flink版:開通實(shí)時計(jì)算Flink版。
阿里云實(shí)時計(jì)算Flink版批量導(dǎo)入
通過連接HoloWeb并執(zhí)行查詢創(chuàng)建Hologres結(jié)果表,用于接收Flink導(dǎo)入的數(shù)據(jù)。本文以
test_sink_customer
表為例。-- 創(chuàng)建Hologres結(jié)果表 CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );
說明Flink源表需要與Hologres的結(jié)果表的字段名稱和類型保持一致。
登錄實(shí)時計(jì)算控制臺,在作業(yè)運(yùn)維頁面單擊部署作業(yè),配置部署作業(yè)并單擊部署。配置參數(shù)詳情,請參見部署JAR作業(yè)。
其中主要參數(shù)介紹,如下表所示。
參數(shù)
說明
部署作業(yè)類型
選擇為JAR。
部署模式
支持流模式或批模式,本文選擇批模式。
引擎版本
JAR URI
上傳開源Flink Connector:hologres-connector-flink-repartition.jar。
說明通過開源Flink Connector支持將數(shù)據(jù)批量導(dǎo)入Hologres,F(xiàn)link Connector插件相關(guān)開源代碼,詳情請參見Hologres GitHub官方庫。
Entry Point Class
程序的入口類。Flink Connector指定主類名稱為:
com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample
。Entry Point Main Arguments
傳入
repartition.sql
文件的路徑參數(shù)。實(shí)時計(jì)算 Flink運(yùn)行時,附加依賴文件的路徑在/flink/usrlib/
下,因此完整的參數(shù)為:--sqlFilePath="/flink/usrlib/repartition.sql"
。附加依賴文件
上傳
repartition.sql
文件。repartition.sql
是Flink SQL腳本的文件,主要用于定義數(shù)據(jù)源、結(jié)果表聲明以及Hologres的連接信息,本文repartition.sql
文件的示例內(nèi)容如下。--sourceDDL,本文使用了Flink DataGen公共測試數(shù)據(jù)作為源數(shù)據(jù)。 CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); --sourceDql,源表查詢語句應(yīng)當(dāng)保證查詢結(jié)果與sinkDDL聲明的結(jié)果表對應(yīng),包括字段數(shù)量和字段類型。 SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- sinkDDL,結(jié)果表聲明以及配置連接Hologres信息。 CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'LTAI5tJCNqeCY3DtKw8c****' ,'password' = 'tjxLtsXV8LRKOlmBQ3I0LkbHnm****' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );
說明repartition.sql
文件中連接Hologres的更多參數(shù)介紹詳情,請參見Hologres Flink Connector參數(shù)說明。單擊作業(yè)名稱,進(jìn)入部署詳情看板,編輯資源配置,修改并發(fā)度。
說明建議與Hologres結(jié)果表的ShardCount數(shù)保持一致。
查詢Hologres結(jié)果表。
Flink作業(yè)提交成功后,您可以在Hologres中查詢寫入的數(shù)據(jù)。示例語句如下。
SELECT * FROM test_sink_customer;
開源Flink批量導(dǎo)入
通過連接HoloWeb并執(zhí)行查詢創(chuàng)建Hologres結(jié)果表,用于接收Flink導(dǎo)入的數(shù)據(jù)。本文以
test_sink_customer
表為例。-- 創(chuàng)建Hologres結(jié)果表 CREATE TABLE test_sink_customer ( c_custkey BIGINT, c_name TEXT, c_address TEXT, c_nationkey INT, c_phone TEXT, c_acctbal NUMERIC(15,2), c_mktsegment TEXT, c_comment TEXT, "date" DATE ) WITH ( distribution_key="c_custkey,date", orientation="column" );
說明您可以根據(jù)數(shù)據(jù)量合理設(shè)置Shard數(shù),關(guān)于Shard詳情,請參見Table Group與Shard Count操作指南。
創(chuàng)建
repartition.sql
文件并上傳至Flink集群環(huán)境中的任意位置。本文以上傳至/flink-1.15.4/src/repartition.sql
路徑為例,repartition.sql
文件的示例內(nèi)容如下。說明repartition.sql
是Flink SQL腳本的文件,主要用于定義數(shù)據(jù)源、結(jié)果表聲明以及Hologres的連接信息。--sourceDDL,本文使用了Flink DataGen公共測試數(shù)據(jù)作為源數(shù)據(jù)。 CREATE TEMPORARY TABLE source_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ) WITH ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'number-of-rows' = '1000000' ); --sourceDql,源表查詢語句應(yīng)當(dāng)保證查詢結(jié)果與sinkDDL聲明的結(jié)果表對應(yīng),包括字段數(shù)量和字段類型。 SELECT *, cast('2024-04-21' as DATE) FROM source_table; -- sinkDDL,結(jié)果表聲明以及配置連接Hologres信息。 CREATE TABLE sink_table ( c_custkey BIGINT ,c_name STRING ,c_address STRING ,c_nationkey INTEGER ,c_phone STRING ,c_acctbal NUMERIC(15, 2) ,c_mktsegment STRING ,c_comment STRING ,`date` DATE ) WITH ( 'connector' = 'hologres' ,'dbname' = 'doc_****' ,'tablename' = 'test_sink_customer' ,'username' = 'LTAI5tJCNqeCY3DtKw8c****' ,'password' = 'tjxLtsXV8LRKOlmBQ3I0LkbHnm****' ,'endpoint' = 'hgpostcn-cn-7pp2e1k7****-cn-hangzhou.hologres.aliyuncs.com:80' ,'jdbccopywritemode' = 'true' ,'bulkload' = 'true' ,'target-shards.enabled'='true' );
其中主要參數(shù)解釋如下:
參數(shù)
是否必填
說明
connector
是
結(jié)果表類型,固定值為hologres。
dbname
是
Hologres的數(shù)據(jù)庫名稱。
tablename
是
Hologres接收數(shù)據(jù)的表名稱。
username
是
當(dāng)前阿里云賬號的AccessKey ID。
您可以單擊AccessKey 管理,獲取AccessKey ID。
password
是
當(dāng)前阿里云賬號AccessKey ID對應(yīng)的AccessKey Secret。
endpoint
是
Hologres的VPC網(wǎng)絡(luò)地址。進(jìn)入Hologres管理控制臺的實(shí)例詳情頁,從實(shí)例配置獲取Endpoint。
說明endpoint需包含端口號,格式為
ip:port
同一個區(qū)域使用VPC網(wǎng)絡(luò)地址,跨區(qū)域請使用公共網(wǎng)絡(luò)。jdbccopywritemode
否
數(shù)據(jù)寫入方式。取值說明如下:
false(默認(rèn)值):使用INSERT方式寫入。
true:使用Copy方式寫入,Copy寫入方式分為流式Copy(Fixed Copy)和批量Copy,當(dāng)前默認(rèn)使用流式Copy(Fixed Copy)方式寫入。
說明與使用INSERT方式寫入相比,F(xiàn)ixed Copy方式可以實(shí)現(xiàn)更高的吞吐(因?yàn)椴捎昧髂J剑偷臄?shù)據(jù)延時以及更低的客戶端內(nèi)存消耗(因?yàn)椴恍枰獢€批數(shù)據(jù)),但不支持?jǐn)?shù)據(jù)回撤功能。
bulkload
否
是否使用批量Copy方式寫入。取值說明如下:
true:使用批量Copy方式寫入,僅jdbccopywritemode參數(shù)也設(shè)置為true時,該參數(shù)才會生效,否則使用Fixed Copy方式寫入。
說明批量Copy相較于流式Copy(Fixed Copy),具備更高的效率,能更好地利用Hologres的資源,從而在數(shù)據(jù)寫入過程中提供更優(yōu)的性能,您可以根據(jù)業(yè)務(wù)需要,選擇合適的數(shù)據(jù)寫入方式。
在對主鍵表進(jìn)行批量Copy寫入時,通常會出現(xiàn)表鎖的情況,您可以通過配置target-shards.enabled參數(shù)為true,將寫入鎖粒度降至Shard級別,從而允許并發(fā)執(zhí)行多個批量導(dǎo)入任務(wù),減少了表鎖的發(fā)生。相比Fixed Copy模式,批量Copy寫入有主鍵表時,通過這種方式能夠顯著降低Hologres實(shí)例的負(fù)載,實(shí)測顯示,可以減少約66.7%的負(fù)載。
批量Copy寫入時,如果目標(biāo)表包含主鍵,要求在寫入之前目標(biāo)表為空表,否則寫入過程中進(jìn)行主鍵去重會影響寫入性能。
false(默認(rèn)值):不使用。
target-shards.enabled
否
是否啟用Target Shard批量寫入。取值說明如下:
true:啟用Target Shard批量寫入,當(dāng)源數(shù)據(jù)已按Shard重新分區(qū)時,可以將寫入鎖粒度降至Shard級別。
false(默認(rèn)值):不啟用。
說明repartition.sql
文件中連接Hologres的更多參數(shù)介紹詳情,請參見Hologres Flink Connector參數(shù)說明。在Flink集群環(huán)境中,上傳開源Flink Connector:hologres-connector-flink-repartition.jar至任意目錄下。本文以上傳至根目錄為例。
說明通過開源Flink Connector支持將數(shù)據(jù)批量導(dǎo)入Hologres,F(xiàn)link Connector插件相關(guān)開源代碼,詳情請參見Hologres GitHub官方庫。
提交Flink作業(yè),代碼示例如下。
./bin/flink run -Dexecution.runtime-mode=BATCH -p 3 -c com.alibaba.ververica.connectors.hologres.example.FlinkToHoloRePartitionExample hologres-connector-flink-repartition.jar --sqlFilePath="/flink-1.15.4/src/repartition.sql"
上述參數(shù)說明:
Dexecution.runtime-mode:Flink作業(yè)的執(zhí)行模式,詳情請參見執(zhí)行模式。
p:作業(yè)并發(fā)數(shù)。建議在配置作業(yè)并發(fā)數(shù)時取值與結(jié)果表的ShardCount相同,或者可以被ShardCount整除。
c:hologres-connector-flink-repartition.jar的主類名稱以及所在的路徑。
sqlFilePath:
repartition.sql
文件的路徑。
查詢Hologres結(jié)果表。
Flink作業(yè)提交成功后,您可以在Hologres中查詢寫入的數(shù)據(jù)。示例語句如下。
SELECT * FROM test_sink_customer;