本文為您介紹如何創建Dynamic Table。
使用限制
Dynamic Table的使用限制請參見Dynamic Table支持范圍和限制。
語法
建表語法
Dynamic Table的建表語法如下:
CREATE DYNAMIC TABLE [IF NOT EXISTS] <schema.tablename>(
[col_name],
[col_name]
) [PARTITION BY LIST (col_name)]
WITH (
[refresh_mode='[full|incremental]',]
[auto_refresh_enable='[true|false',]
--增量刷新專用參數:
[incremental_auto_refresh_schd_start_time='[immediate|<timestamptz>]',]
[incremental_auto_refresh_interval='[<num> {minute|minutes|hour|hours]',]
[incremental_guc_hg_computing_resource='[ local | serverless]',]
[incremental_guc_hg_experimental_serverless_computing_required_cores='<num>',]
--全量刷新專用參數:
[full_auto_refresh_schd_start_time='[immediate|<timestamptz>]',]
[full_auto_refresh_interval='[<num> {minute|minutes|hour|hours]',]
[full_guc_hg_computing_resource='[ local | serverless]',]--hg_full_refresh_computing_resource默認serverless,可以db級別設置,用戶可以不設置
[full_guc_hg_experimental_serverless_computing_required_cores='<num>',]
--共用參數,允許設置guc:
[refresh_guc_<guc>='xxx]',]
-- Dynamic Table的通用屬性:
[orientation = '[column | row | row,column]',]
[table_group = '[tableGroupName]',]
[distribution_key = 'columnName[,...]]',]
[clustering_key = '[columnName{:asc]} [,...]]',]
[event_time_column = '[columnName [,...]]',]
[bitmap_columns = '[columnName [,...]]',]
[dictionary_encoding_columns = '[columnName [,...]]',]
[time_to_live_in_seconds = '<non_negative_literal>',]
[storage_mode = '[hot | cold]']
)
AS
<query> --query的定義
參數說明
刷新模式與資源
參數分類 | 參數名 | 描述 | 是否必填 | 默認值 |
共用刷新參數 | refresh_mode | 指定數據的刷新模式,支持full、incremental兩種刷新模式。 若不設置該參數,則表示不進行刷新。 | 否 | 無 |
auto_refresh_enable | 是否開啟自動刷新。取值為:
| 否 | false | |
refresh_guc_<guc> | 支持對Refresh設置GUC參數,支持的GUC請參見GUC參數。 | 否 | 無 | |
增量刷新(incremental) | incremental_auto_refresh_schd_start_time | 增量刷新的開始時間。取值為:
| 否 | immediate |
incremental_auto_refresh_interval | 增量刷新的時間間隔,單位有minute、minutes、hour和hours。
| 否 | 無 | |
incremental_guc_hg_computing_resource | 指定執行增量刷新的計算資源,取值為:
說明 支持使用 | 否 | serverless | |
incremental_guc_hg_experimental_serverless_computing_required_cores | 如果使用Serverless資源執行刷新,則需要設置刷新的計算資源量。 說明 不同規格的實例可使用的Serverless資源有一定的限制,詳情請參見Serverless Computing使用指南。 | 否 | 無 | |
全量刷新(full) | full_auto_refresh_schd_start_time | 全量刷新的開始時間。取值為:
| 否 | immediate |
full_auto_refresh_interval | 全量刷新的時間間隔,單位有minute、minutes、hour和hours。
| 否 | 無 | |
full_guc_hg_computing_resource | 指定執行全量刷新的計算資源,取值為:
說明 支持使用 | 否 | serverless | |
full_guc_hg_experimental_serverless_computing_required_cores | 如果使用Serverless執行刷新,則需要設置刷新的計算資源量。 說明 不同規格的實例可使用的Serverless資源有一定的限制,詳情請參見Serverless Computing使用指南。 | 否 | 無 |
表屬性參數
參數 | 描述 | 是否必填 | 默認值 | |
full | incremental | |||
col_name | Dynamic Table的字段名。 可以顯式指定Dynamic Table的列名,但是不能指定列的屬性和數據類型,引擎會自動推導。 說明 若指定了列的屬性和數據類型,會導致引擎推導不正確。 | 否 | Query列名 | Query列名 |
orientation | 指定Dynamic Table的存儲模式,取值如下:
| 否 | column | column |
table_group | 指定Dynamic Table所在的Table Group,默認為當前數據庫下的Default Table Group,詳情請參見Table Group與Shard Count操作指南。 | 否 | 默認Table Group名稱 | 默認Table Group名稱 |
distribution_key | 指定Dynamic Table的Distribution Key,詳情請參見分布鍵Distribution Key。 | 否 | 無 | 無 |
clustering_key | 指定Dynamic Table的Clustering Key,詳情請參見聚簇索引Clustering Key。 | 否 | 允許設置,有默認推導值。 | 允許設置,有默認推導值。 |
event_time_column | 指定Dynamic Table的event_time_column,詳情請參見Event Time Column(Segment Key)。 | 否 | 無 | 無 |
bitmap_columns | 指定Dynamic Table的bitmap_columns,詳情請參見位圖索引Bitmap。 | 否 | TEXT類型字段 | TEXT類型字段 |
dictionary_encoding_columns | 指定Dynamic Table的dictionary_encoding_columns,詳情請參見字典編碼Dictionary Encoding。 | 否 | TEXT類型字段 | TEXT類型字段 |
time_to_live_in_seconds | 指定Dynamic Table數據的生命周期。 | 否 | 永久 | 永久 |
storage_mode | Dynamic Table的存儲模式,取值如下:
說明 存儲模式詳情請參見數據分層存儲。 | 否 | hot | hot |
Query
Dynamic Table中數據生成的Query,設置的刷新模式不同,支持的Query類型和基表類型也不同,詳情請參見Dynamic Table支持范圍和限制。
全量刷新
全量刷新會將Query中的數據以全量的方式寫入Dynamic Table。相比于增量刷新,全量刷新的優勢在于:
支持更多的基表類型。
支持更豐富的Query類型、算子支持等。
全量刷新相比增量刷新,處理的數據量更多,消耗的資源可能更多,因此更推薦的應用場景包括:定期報表查看、定期回刷數據等。
更多關于全量刷新的信息請參見全量刷新。
增量刷新
增量刷新會感知基表數據的變化,然后將Query中的數據以增量的方式寫入到Dynamic Table。相比于全量刷新,增量刷新處理的數據量更少,處理時效性會更高。在實際應用中,如果有分鐘級數據查詢的需求,更推薦使用增量刷新模式,但在使用增量Dynamic Table時需要注意如下幾點:
增量刷新Dynamic Table需要為基表開啟Binlog,如果是維表JOIN,維表無需開啟Binlog。
基表開啟Binlog后,會有一定存儲開銷,您可參考查看表存儲明細查詢Binlog占用存儲。
開啟增量刷新后,系統會在后臺生成一張狀態表,用于記錄中間聚合結果,關于狀態表技術原理請參見增量刷新(Incremental)。狀態表會存儲中間聚合數據,因此會占用一定的存儲,查看存儲請參見查看Dynamic Table表結構和血緣。
當前Hologres增量刷新支持新增數據刷新以及全增量一體刷新,來滿足業務的不同訴求,詳情使用如下:
增量刷新
對于已經存在的基表,開啟Binlog并為其創建增量Dynamic Table后,基表中已經存在的數據不會被同步到Dynamic Table中,只有新增的數據會被同步到Dynamic Table,因此可能會存在數據一致性問題。
全增量一體刷新
當前增量Dynamic Table也支持全增量一體消費,即首先消費基表的全量數據,再消費基表新增的數據,該功能需要通過GUC來控制:
-- 打開全增量數據一體消費的GUC
SET hg_experimental_enable_hybrid_incremental_mode = true;
示例如下:
--全增量一體化增量Dynamic table
-- 未開Binlog的基表
CREATE TABLE base_sales(
day TEXT NOT NULL,
hour INT,
user_id BIGINT,
ts TIMESTAMPTZ,
amount FLOAT,
pk text NOT NULL PRIMARY key
);
-- 導入數據
INSERT INTO base_sales VALUES ('2024-08-29',1,222222,'2024-08-29 16:41:19.141528+08',5,'ddd');
-- 為基表打開Binlog
ALTER TABLE base_sales SET (binlog_level = replica);
-- 再為基表導入增量數據
INSERT INTO base_sales VALUES ('2024-08-29',2,3333,'2024-08-29 17:44:19.141528+08',100,'aaaaa');
-- 創建增量Dynamic Table
CREATE DYNAMIC TABLE sales_incremental
WITH (refresh_mode='incremental')
AS
SELECT day, hour, SUM(amount), COUNT(1)
FROM base_sales
GROUP BY day, hour;
-- 打開全增量數據一體消費的GUC,然后刷新dynamic table
SET hg_experimental_enable_hybrid_incremental_mode = true;
REFRESH TABLE sales_incremental;
對比數據一致性:
查詢基表
SELECT day, hour, SUM(amount), COUNT(1) FROM base_sales GROUP BY day, hour;
返回結果:
day hour sum count 2024-08-29 1 5 1 2024-08-29 2 100 1
查詢Dynamic Table
SELECT * FROM sales_incremental;
返回結果:
day hour sum count 2024-08-29 1 5 1 2024-08-29 2 100 1
使用示例
示例1:創建全量刷新Dynamic Table并自動開始執行刷新
執行下述操作前,請先參考一鍵導入公共數據集將tpch_10g公共數據集的數據導入至Hologres。
--創建單表全量刷新的dynamic table,并立即開始刷新,每1小時刷新一次。
CREATE DYNAMIC TABLE test.thch_q1_full
WITH (
refresh_mode='full',
auto_refresh_enable='true',
full_auto_refresh_interval='1 hours',
full_guc_hg_computing_resource='serverless',
full_guc_hg_experimental_serverless_computing_required_cores='32'
)
AS
SELECT
l_returnflag,
l_linestatus,
SUM(l_quantity) AS sum_qty,
SUM(l_extendedprice) AS sum_base_price,
SUM(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
SUM(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
AVG(l_quantity) AS avg_qty,
AVG(l_extendedprice) AS avg_price,
AVG(l_discount) AS avg_disc,
COUNT(*) AS count_order
FROM
hologres_dataset_tpch_10.lineitem.lineitem
WHERE
l_shipdate <= DATE '1998-12-01' - INTERVAL '120' DAY
GROUP BY
l_returnflag,
l_linestatus;
示例2:創建增量刷新的Dynamic Table并指定開始刷新時間
執行下述操作前,請先參考一鍵導入公共數據集將tpch_10g公共數據集的數據導入至Hologres。
創建增量Dynamic Table示例如下:
在創建增量Dynamic Table之前,需要為基表開啟Binlog(維表不需要開啟)。
--為基表開binlog:
BEGIN;
CALL set_table_property('hologres_dataset_tpch_100g.lineitem', 'binlog.level', 'replica');
COMMIT;
--創建單表贈量刷新的dynamic table,并指定開始刷新時間,每3min刷新一次
CREATE DYNAMIC TABLE public.tpch_q1_incremental
WITH (
refresh_mode='incremental',
auto_refresh_enable='true',
incremental_auto_refresh_schd_start_time='2024-09-15 23:50:0',
incremental_auto_refresh_interval='3 minutes',
incremental_guc_hg_computing_resource='serverless',
incremental_guc_hg_experimental_serverless_computing_required_cores='30'
) AS SELECT
l_returnflag,
l_linestatus,
COUNT(*) AS count_order
FROM
hologres_dataset_tpch_100g.lineitem
WHERE
l_shipdate <= DATE '1998-12-01' - INTERVAL '120' DAY
GROUP BY
l_returnflag,
l_linestatus
;
示例3:創建多表JOIN的全量刷新Dynamic Table
--創建query為多表join的dynamic table,全量刷新模式,每3小時刷新一次。
CREATE DYNAMIC TABLE test.dt_q_full
WITH (
refresh_mode='full',
auto_refresh_enable='true',
full_auto_refresh_schd_start_time='immediate'
full_auto_refresh_interval='3 hours',
full_guc_hg_computing_resource='serverless',
full_guc_hg_experimental_serverless_computing_required_cores='64'
)
AS
SELECT
o_orderpriority,
COUNT(*) AS order_count
FROM
public.orders
WHERE
o_orderdate >= DATE '1996-07-01'
AND o_orderdate < DATE '1996-07-01' + INTERVAL '3' MONTH
AND EXISTS (
SELECT
*
FROM
public.lineitem
WHERE
l_orderkey = o_orderkey
AND l_commitdate < l_receiptdate
)
GROUP BY
o_orderpriority;
示例4:創建維表JOIN的增量刷新Dynamic Table
創建維表JOIN的增量Dynamic Table示例如下:
在創建增量Dynamic Table之前,需要為基表開啟Binlog(維表不需要開啟)。
維表JOIN的語義是:對每條數據,只會關聯當時維表的最新版本數據,即JOIN行為只發生在處理時間(Processing Time)。如果JOIN行為發生后,維表中的數據發生了變化(新增、更新或刪除),則已關聯的維表數據不會被同步變化。
--為基表開binlog,維表不需要開啟
BEGIN;
CALL set_table_property('hm.sale_detail ', 'binlog.level', 'replica');
COMMIT;
CREATE DYNAMIC TABLE dt_sales_incremental
WITH (
refresh_mode='incremental',
auto_refresh_enable='true',
incremental_auto_refresh_schd_start_time='2024-09-15 00:00:00'
incremental_auto_refresh_interval='5 minutes',
incremental_guc_hg_computing_resource='serverless',
incremental_guc_hg_experimental_serverless_computing_required_cores='128')
AS
SELECT
sale_detail.app_id,
sale_detail.uid,
product,
SUM(sale_detail.gmv) AS sum_gmv,
sale_detail.order_time,
user_info.province,
user_info.city
FROM hm.sale_detail
INNER JOIN hm.user_info FOR SYSTEM_TIME AS OF PROCTIME()
ON sale_detail.uid =user_info.uid
GROUP BY sale_detail.app_id,sale_detail.uid,sale_detail.product,sale_detail.order_time,user_info.province,user_info.city;
示例5:創建分區Dynamic Table
準備基表和數據。
-- 創建分區源表 CREATE TABLE base_sales( uid INT, opreate_time TIMESTAMPTZ, amount FLOAT, pk text NOT NULL, ds text, PRIMARY key(ds) ) PARTITION BY LIST (ds) ; CREATE TABLE base_sales_20240616 PARTITION OF base_sales FOR VALUES IN ('20240616'); INSERT INTO base_sales_20240616 VALUES (1,'2024-06-16 16:08:25.387466+08','2','1','20240616');
創建分區Dynamic Table父表,父表只設置Query的定義,不設置刷新模式。
CREATE DYNAMIC TABLE test.partition_dt_base_sales PARTITION BY LIST (ds) AS SELECT public.RB_BUILD_AGG(uid), opreate_time, amount, pk, ds, COUNT(1) FROM base_sales GROUP BY opreate_time ,amount,pk,ds;
手動創建分區子表,并為分區子表設置刷新模式。
-- 創建Dynamic Table分區子表,并指定刷新模式為全量刷新,建表成功后立即開始啟動刷新,刷新間隔為30mins,并使用本實例資源刷新。 CREATE DYNAMIC TABLE test.partition_dt_base_sales_20240616 PARTITION OF partition_dt_base_sales FOR VALUES IN ('20240616') WITH ( refresh_mode='full', auto_refresh_enable='true', full_auto_refresh_schd_start_time='immediate', full_auto_refresh_interval='30 minutes' );
下一步:管理Dynamic Table
Dynamic Table創建成功后,您可以執行如下操作:
查看Dynamic Table DDL、血緣關系等。詳情請參見查看Dynamic Table表結構和血緣。
修改Dynamic Table相關屬性,詳情請參見ALTER DYNAMIC TABLE。