Hologres V2.0版本開始支持INSERT OVERWRITE存儲過程,方便用戶進行大批量數據的全量寫入或者分區級數據批量寫入。本文為您介紹在Hologres中如何使用存儲過程實現INSERT OVERWRITE功能,如您實例版本低于V2.0,請升級實例,詳情請參見實例升級。如您暫時不方便升級,也可使用臨時表方式實現INSERT OVERWRITE功能。
功能說明
Hologres V3.0版本增強了hg_insert_overwrite能力,支持通過INSERT OVERWRITE命令直接導入數據至分區父表。
Hologres從V2.0.15版本開始,支持通過
set hg_experimental_hg_insert_overwrite_enable_view=on;
命令開啟GUC,實現向有視圖依賴的表中導入數據;暫不支持向有物化視圖依賴的表中導入數據。V3.0版本起,無需設置上述GUC,即可支持向有視圖依賴的表中導入數據;暫不支持向有物化視圖依賴的表中導入數據。
對于Hologres V2.0.11以前的版本,導入失敗需要手動清理臨時表;自V2.0.11版本開始,系統會自動清理臨時表。
使用限制
如果選擇部分字段導入,字段順序需要與源表保持一致且一一對應。
由于hg_insert_overwrite需要以表Owner的身份新建一張臨時表,因此僅Superuser和表的Owner有權限執行hg_insert_overwrite操作。
目標表的分區鍵支持INT、TEXT或VARCHAR類型。
Hologres V3.0版本起,明確要求不能在事務中使用hg_insert_overwrite,執行會報錯。
說明在舊版本的事務中使用hg_insert_overwrite也會有潛在問題,但很多時候可以正常正確執行,新版本將更加嚴格。
行為變更
Hologres V3.0版本起,hg_insert_overwrite有如下行為變更:
僅有target_table和sql兩個入參時,如果目標表是分區父表,則V3.0版本前會直接報錯,V3.0版本起可能寫入成功(當select_query執行結果對應的分區子表都已存在時),也可能報錯(當select_query執行結果對應的分區子表不存在時)。
如果hg_insert_overwrite執行中途被Cancel,則V3.0版本起需要執行如下SQL清理臨時表,V3.0版本前不需要清理臨時表。
--- 刪除before_time之前系統創建的臨時表 CALL hg_clean_insert_overwrite_tmp_tables(before_time::timestamptz);
使用存儲過程實現INSERT OVERWRITE功能
命令格式
-- V3.0版本前的hg_insert_overwrite語法
CALL hg_insert_overwrite('<target_table>' regclass, ['<partition_value>' text], '<sql>' text);
-- V3.0及以上版本的hg_insert_overwrite語法
CALL hg_insert_overwrite('<target_table>' regclass, ['<partition_value>' array], '<sql>' text, ['<auto_create_partition>' bool]);
參數說明
Hologres V3.0版本起,hg_insert_overwrite語句中的partition_value數據類型改為ARRAY類型,即支持寫入分區父表并指定多個分區子表。您仍可使用TEXT類型作為partition_value的入參,但此時只支持寫入一張分區子表。
參數 | 說明 |
target_table | Hologres的內部表。 即數據目標存儲表,表必須已經存在。 |
partition_value | 分區表的分區值。
|
sql | 標準的SELECT語句。 可用來查詢MaxCompute或者Hologres的表,需確保SELECT出來的分區字段值必須完全等于
|
auto_create_partition | 是否自動創建分區。僅V3.0及以上版本支持該參數。
|
V3.0版本起,針對INSERT OVERWRITE分區父表,即target_table
為分區父表的情況,不同參數設置的行為如下:
不指定
partition_value
參數時:auto_create_partition
值說明
TRUE
sql
執行結果對應的target_table
分區,全部執行數據覆寫。如果有不存在的分區子表,則先自動創建分區。與
sql
執行結果無關的target_table
分區,忽略。
FALSE
sql
執行結果中,如果對應的target_table
分區都存在:執行結果對應的
target_table
分區,全部執行數據覆寫。與執行結果無關的
target_table
分區,忽略。
sql
執行結果中,如果有對應的target_table
分區不存在,則直接報錯,其余已存在的分區也不執行覆寫。
指定
partition_value
參數時:auto_create_partition
值說明
TRUE
對于
partition_value
指定的target_table
分區:如果分區實際不存在:自動創建分區。
sql
執行結果對應的分區:執行數據覆寫。與
sql
執行結果無關的分區:直接清空。
對于
partition_value
未指定的target_table
分區:sql
執行結果如果包含未指定的分區:不處理。與
sql
執行結果無關的分區:不處理。
FALSE
對于
partition_value
指定的target_table
分區:如果分區實際不存在:直接報錯,其余分區也不執行覆寫。
sql
執行結果對應的分區:執行數據覆寫。與
sql
執行結果無關的分區:直接清空。
對于
partition_value
未指定的target_table
分區:sql
執行結果如果包含未指定的分區:不處理。與
sql
執行結果無關的分區:不處理。
使用示例
場景一:使用存儲過程將Hologres內部表數據導入Hologres非分區表
-- 創建表A作為目標表
BEGIN;
CREATE TABLE public.tablea (
cid integer NOT NULL,
cname text,
code integer
,PRIMARY KEY (cid)
);
CALL set_table_property('public.tablea', 'orientation', 'column');
CALL set_table_property('public.tablea', 'storage_format', 'orc');
CALL set_table_property('public.tablea', 'bitmap_columns', 'cname');
CALL set_table_property('public.tablea', 'dictionary_encoding_columns', 'cname:auto');
CALL set_table_property('public.tablea', 'distribution_key', 'cid');
CALL set_table_property('public.tablea', 'time_to_live_in_seconds', '3153600000');
COMMIT;
-- 創建表B作為數據輸入
CREATE TABLE public.tableb (
cid integer NOT NULL,
cname text,
code integer
,PRIMARY KEY (cid)
);
INSERT INTO public.tableb VALUES(1,'aaa',10001),(2,'bbb','10002');
-- 使用hg_insert_overwrite 將表B數據插入表A
CALL hg_insert_overwrite('public.tablea' , 'SELECT * FROM public.tableb');
場景二:使用存儲過程將Hologres內部表數據導入Hologres分區表
-- 創建表A作為目標表
BEGIN;
CREATE TABLE public.tableA(
a text ,
b int,
c timestamp,
d text,
ds text,
PRIMARY key(ds,b)
)
PARTITION BY LIST(ds);
CALL set_table_property('public.tableA', 'orientation', 'column');
CREATE TABLE public.holo_child_1 PARTITION OF public.tableA FOR VALUES IN('20201215');
CREATE TABLE public.holo_child_2 PARTITION OF public.tableA FOR VALUES IN('20201216');
CREATE TABLE public.holo_child_3 PARTITION OF public.tableA FOR VALUES IN('20201217');
COMMIT;
-- 創建表B作為數據輸入
BEGIN;
CREATE TABLE public.tableB(
a text ,
b int,
c timestamp,
d text,
ds text,
PRIMARY key(ds,b)
)
PARTITION BY LIST(ds);
CALL set_table_property('public.tableB', 'orientation', 'column');
CREATE TABLE public.holo_child_3a PARTITION OF public.tableB FOR VALUES IN('20201215');
CREATE TABLE public.holo_child_3b PARTITION OF public.tableB FOR VALUES IN('20201216');
CREATE TABLE public.holo_child_3c PARTITION OF public.tableB FOR VALUES IN('20201217');
COMMIT;
INSERT INTO public.holo_child_3a VALUES('a',1,'2034-10-19','a','20201215');
INSERT INTO public.holo_child_3b VALUES('b',2,'2034-10-20','b','20201216');
INSERT INTO public.holo_child_3c VALUES('c',3,'2034-10-21','c','20201217');
-- 使用insert overwrite 將表B數據插入表A
CALL hg_insert_overwrite('public.tableA' , '20201215',$$SELECT * FROM public.tableB WHERE ds='20201215'$$);
場景三:使用存儲過程將MaxCompute非分區表數據導入Hologres
-- 在MaxCompute中創建一張非分區表。示例選用MaxCompute公告數據集public_data項目下的customer表數據,其表DDL如下。
CREATE TABLE IF NOT EXISTS public_data.customer(
c_customer_sk BIGINT,
c_customer_id STRING,
c_current_cdemo_sk BIGINT,
c_current_hdemo_sk BIGINT,
c_current_addr_sk BIGINT,
c_first_shipto_date_sk BIGINT,
c_first_sales_date_sk BIGINT,
c_salutation STRING,
c_first_name STRING,
c_last_name STRING,
c_preferred_cust_flag STRING,
c_birth_day BIGINT,
c_birth_month BIGINT,
c_birth_year BIGINT,
c_birth_country STRING,
c_login STRING,
c_email_address STRING,
c_last_review_date STRING,
useless STRING);
-- 在Hologres中創建一張外部表,用于映射MaxCompute中的源頭數據表。
CREATE FOREIGN TABLE customer (
"c_customer_sk" int8,
"c_customer_id" text,
"c_current_cdemo_sk" int8,
"c_current_hdemo_sk" int8,
"c_current_addr_sk" int8,
"c_first_shipto_date_sk" int8,
"c_first_sales_date_sk" int8,
"c_salutation" text,
"c_first_name" text,
"c_last_name" text,
"c_preferred_cust_flag" text,
"c_birth_day" int8,
"c_birth_month" int8,
"c_birth_year" int8,
"c_birth_country" text,
"c_login" text,
"c_email_address" text,
"c_last_review_date" text,
"useless" text
)
SERVER odps_server
OPTIONS (project_name 'public_data', table_name 'customer');
-- 在Hologres中建立一張內部表(以列存表為例),用于接收MaxCompute源頭表數據。
BEGIN;
CREATE TABLE public.holo_customer (
"c_customer_sk" int8,
"c_customer_id" text,
"c_current_cdemo_sk" int8,
"c_current_hdemo_sk" int8,
"c_current_addr_sk" int8,
"c_first_shipto_date_sk" int8,
"c_first_sales_date_sk" int8,
"c_salutation" text,
"c_first_name" text,
"c_last_name" text,
"c_preferred_cust_flag" text,
"c_birth_day" int8,
"c_birth_month" int8,
"c_birth_year" int8,
"c_birth_country" text,
"c_login" text,
"c_email_address" text,
"c_last_review_date" text,
"useless" text
);
COMMIT;
-- 導入數據至Hologres。
IMPORT FOREIGN SCHEMA <project_name> LIMIT TO
(customer) FROM server odps_server INTO PUBLIC options(if_table_exist 'update');--更新外部表
SELECT pg_sleep(30);--等待一些時間再導入Hologres,以防Hologres meta信息更新緩存慢導致的數據不一致而同步不成功
CALL hg_insert_overwrite('holo_customer', 'SELECT * FROM customer where c_birth_year > 1980');
-- 在Hologres中查詢MaxCompute源表中的數據。
SELECT * FROM holo_customer limit 10;
場景四:使用存儲過程將MaxCompute分區表數據導入Hologres
-- 在MaxCompute中創建一張分區表。
DROP TABLE IF EXISTS odps_sale_detail;
CREATE TABLE IF NOT EXISTS odps_sale_detail
(
shop_name STRING
,customer_id STRING
,total_price DOUBLE
)
PARTITIONED BY
(
sale_date STRING
)
;
-- 向源表增加分區20210815
ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20210815')
;
-- 向分區寫入數據
INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20210815') VALUES
('s1','c1',100.1),
('s2','c2',100.2),
('s3','c3',100.3)
;
-- 在Hologres中創建一張外部表,用于映射MaxCompute中的源頭數據表。
DROP FOREIGN TABLE IF EXISTS odps_sale_detail;
-- 創建外部表
IMPORT FOREIGN SCHEMA <maxcompute_project> LIMIT TO
(
odps_sale_detail
)
FROM SERVER odps_server INTO public
OPTIONS(if_table_exist 'error',if_unsupported_type 'error');
-- 在Hologres中建立一張內部表,用于接收MaxCompute源頭表數據。
DROP TABLE IF EXISTS holo_sale_detail;
-- 創建Hologres分區表(內部表)
BEGIN ;
CREATE TABLE IF NOT EXISTS holo_sale_detail
(
shop_name TEXT
,customer_id TEXT
,total_price FLOAT8
,sale_date TEXT
)
PARTITION BY LIST(sale_date);
COMMIT;
-- 導入數據至Hologres。
CALL hg_insert_overwrite('holo_sale_detail', '20210815', $$SELECT * FROM public.odps_sale_detail WHERE sale_date='20210815'$$);
-- 在Hologres中查詢MaxCompute源表中的數據。
SELECT * FROM holo_sale_detail;
場景五:使用存儲過程將MaxCompute分區表數據導入Hologres分區父表
-- 在MaxCompute中創建一張分區表。
DROP TABLE IF EXISTS odps_sale_detail;
CREATE TABLE IF NOT EXISTS odps_sale_detail
(
shop_name STRING
,customer_id STRING
,total_price DOUBLE
)
PARTITIONED BY
(
sale_date STRING
)
;
-- 向源表增加分區20210815和20210816
ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20210815')
;
ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20210816')
;
-- 向分區寫入數據
INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20210815') VALUES
('s1','c1',100.1),
('s2','c2',100.2),
('s3','c3',100.3)
;
INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20210816') VALUES
('s1','c1',100.1),
('s2','c2',100.2),
('s3','c3',100.3)
;
-- 在Hologres中創建一張外部表,用于映射MaxCompute中的源頭數據表。
DROP FOREIGN TABLE IF EXISTS odps_sale_detail;
-- 創建外部表
IMPORT FOREIGN SCHEMA <maxcompute_project> LIMIT TO
(
odps_sale_detail
)
FROM SERVER odps_server INTO public
OPTIONS(if_table_exist 'error',if_unsupported_type 'error');
-- 在Hologres中建立一張內部表,用于接收MaxCompute源頭表數據。
DROP TABLE IF EXISTS holo_sale_detail;
-- 創建Hologres分區表(內部表)
BEGIN ;
CREATE TABLE IF NOT EXISTS holo_sale_detail
(
shop_name TEXT
,customer_id TEXT
,total_price FLOAT8
,sale_date TEXT
)
PARTITION BY LIST(sale_date);
COMMIT;
-- 導入數據至Hologres。不指定分區子表且auto_create_partition為TRUE,系統會自動創建兩個分區子表并導入數據
CALL hg_insert_overwrite ('holo_sale_detail', $$SELECT * FROM public.odps_sale_detail$$, TRUE);
-- 在Hologres中查詢數據。
SELECT * FROM holo_sale_detail;
參數說明:
maxcompute_project:MaxCompute分區表所在的項目名稱。
使用臨時表實現INSERT OVERWRITE功能
命令格式
您可以使用如下SQL語句實現INSERT OVERWRITE的功能。
BEGIN ;
-- 清理潛在的臨時表
DROP TABLE IF EXISTS <table_new>;
-- 創建臨時表
SET hg_experimental_enable_create_table_like_properties=on;
CALL HG_CREATE_TABLE_LIKE ('<table_new>', 'select * from <table>');
COMMIT ;
-- 向臨時表插入數據
INSERT INTO <table_new> [( <column> [, ...] )]
VALUES ( {<expression>} [, ...] )
[, ...] | <query>}
ANALYZE <table_new>;
BEGIN ;
-- 刪除舊表
DROP TABLE IF EXISTS <table>;
-- 臨時表改名
ALTER TABLE <table_new> RENAME TO <table>;
COMMIT ;
參數說明
參數 | 說明 |
table_new | 新創建的臨時表名稱。 表名稱也可以使用 |
table | 已存在的表名稱。 表名稱也可以使用 |
臨時表DDL | 創建臨時表有如下兩種方式。
|
使用示例
場景一:MaxCompute向Hologres的非分區表導入數據
在MaxCompute向Hologres導入數據的場景中,希望將數據全量覆蓋,常見于離線加工后的結果表導出為線上服務表。此場景使用示例如下所示,將MaxCompute中的odps_region_10g表的數據寫入Hologres的region表中,且將Hologres中region表的數據全量覆蓋。
BEGIN ;
-- 清理潛在的臨時表
DROP TABLE IF EXISTS public.region_new;
-- 創建臨時表
SET hg_experimental_enable_create_table_like_properties=on;
CALL HG_CREATE_TABLE_LIKE ('public.region_new', 'select * from public.region');
COMMIT ;
-- 向臨時表插入數據
INSERT INTO public.region_new
SELECT *
FROM public.odps_region_10g;
ANALYZE public.region_new;
BEGIN ;
-- 刪除舊表
DROP TABLE IF EXISTS public.region;
-- 臨時表改名
ALTER TABLE IF EXISTS public.region_new RENAME TO region;
COMMIT ;
場景二:MaxCompute向Hologres的分區表導入數據
在每天定期更新MaxCompute分區表的數據,且需要將MaxCompute分區表向Hologres的分區表導入數據的場景中,希望將數據全量覆蓋,實現離線數據對實時數據的修正。此場景使用示例如下所示,將MaxCompute中的odps_lineitem_10g表的數據寫入Hologres的lineitem表中,且全量覆蓋Hologres中lineitem表的數據,兩個表都是按照ds字段按天分區。
BEGIN ;
-- 清理潛在的臨時表
DROP TABLE IF EXISTS public.lineitem_new_20210101;
-- 創建臨時表
SET hg_experimental_enable_create_table_like_properties=on;
CALL HG_CREATE_TABLE_LIKE ('public.lineitem_new_20210101', 'select * from public.lineitem');
COMMIT ;
-- 向臨時表插入數據
INSERT INTO public.lineitem_new_20210101
SELECT *
FROM public.odps_lineitem_10g
WHERE DS = '20210101'
ANALYZE public.lineitem_new_20210101;
BEGIN ;
-- 刪除舊分區
DROP TABLE IF EXISTS public.lineitem_20210101;
-- 臨時表改名
ALTER TABLE public.lineitem_new_20210101 RENAME TO lineitem_20210101;
-- 將臨時表綁定至指定分區表
ALTER TABLE public.lineitem ATTACH PARTITION lineitem_20210101 FOR VALUES IN ('20210101');
COMMIT ;
場景三:Hologres向MaxCompute的非分區表導入數據
如果您需要從Hologres向MaxCompute的非分區表導入數據,建議采用臨時表導入的方式,導入完成后將臨時表改名為正式表即可。此場景使用示例如下所示,將Hologres中holotable表的數據寫入MaxCompute的mc_holotable表中,且將MaxCompute的mc_holotable表數據全量覆蓋。
-- 在MC中創建目標表的臨時表
CREATE TABLE if not exists mc_holotable_temp(
age int,
job string,
name string
);
-- 在Hologres中創建臨時表的映射
CREATE FOREIGN TABLE "public"."mapping_holotable_temp" (
"age" int,
"job" text,
"name" text
)
SERVER odps_server
OPTIONS (project_name 'DLF_test',table_name 'mc_holotable_temp');
-- 在Hologres中更新原始表
UPDATE holotable SET "job" = 'president' WHERE "name" = 'Lily';
-- 將更新后的數據寫入臨時表的映射
INSERT INTO mapping_holotable_temp SELECT * FROM holotable;
-- 在MaxCompute中刪除舊的目標表
DROP TABLE IF EXISTS mc_holotable;
-- 臨時表更名為目標表即可
ALTER TABLE mc_holotable_temp RENAME TO mc_holotable;
導入數據支持部分導入和全表導入兩種方式:
導出部分字段示例:
INSERT INTO mapping_holotable_temp SELECT x,x,x FROM holotable; --x,x,x可以替換為您需要導出的字段名
導出全部字段示例:
INSERT INTO mapping_holotable_temp SELECT * FROM holotable;