使用Flink+Hologres搭建實時數倉可以充分利用Flink強大的實時處理能力和Hologres提供的Binlog、行列共存和資源強隔離等能力,實現高效、可擴展的實時數據處理和分析,幫助您更好地應對不斷增長的數據量和實時業務需求。本文介紹如何通過實時計算Flink版和實時數倉Hologres搭建實時數倉。
背景信息
隨著社會數字化發展,企業對數據時效性的需求越來越強烈。除傳統的面向海量數據加工場景設計的離線場景外,大量業務需要解決面向實時加工、實時存儲、實時分析的實時場景問題。傳統離線數倉搭建的方法論比較明確,通過定時調度實現數倉分層(ODS->DWD->DWS->ADS);但對于實時數倉的搭建,目前缺乏明確的方法體系。基于Streaming Warehouse理念,實現數倉分層之間實時數據的高效流動,可以解決實時數倉分層問題。
方案架構
實時計算Flink版是強大的流式計算引擎,支持對海量實時數據高效處理。Hologres是一站式實時數倉,支持數據實時寫入與更新,實時數據寫入即可查。Hologres與Flink深度集成,能夠提供一體化的實時數倉聯合解決方案。本文基于Flink+Hologres搭建實時數倉的方案架構如下:
Flink將數據源寫入Hologres,形成ODS層。
Flink訂閱ODS層的Binlog進行加工,形成DWD層再次寫入Hologres。
Flink訂閱DWD層的Binlog,通過計算形成DWS層,再次寫入Hologres。
最后由Hologres對外提供應用查詢。
該方案有如下優勢:
Hologres的每一層數據都支持高效更新與修正、寫入即可查,解決了傳統實時數倉解決方案的中間層數據不易查、不易更新、不易修正的問題。
Hologres的每一層數據都可單獨對外提供服務,數據的高效復用,真正實現數倉分層復用的目標。
模型統一,架構簡化。實時ETL鏈路的邏輯是基于Flink SQL實現的;ODS層、DWD層和DWS層的數據統一存儲在Hologres中,可以降低架構復雜度,提高數據處理效率。
該方案依賴于Hologres的3個核心能力,詳情如下表所示。
Hologres核心能力 | 詳情 |
Binlog | Hologres提供Binlog能力,用于驅動Flink進行實時計算,以此作為流式計算的上游。Hologres的Binlog能力詳情請參見訂閱Hologres Binlog。 |
行列共存 | Hologres支持行列共存的存儲格式。一張表同時存儲行存數據和列存數據,并且兩份數據強一致。該特性保證中間層表不僅可以作為Flink的源表,也可以作為Flink的維表進行主鍵點查與維表Join,還可以供其他應用(OLAP、線上服務等)查詢。Hologres的行列共存能力詳情請參見表存儲格式:列存、行存、行列共存。 |
資源強隔離 | Hologres實例的負載較高時,可能影響中間層的點查性能。Hologres支持通過主從實例讀寫分離部署(共享存儲)或計算組實例架構實現資源強隔離,從而保證Flink對Hologres Binlog的數據拉取不影響線上服務。 |
實踐場景
本文以某個電商平臺為例,通過搭建一套實時數倉,實現數據的實時加工清洗和對接上層應用數據查詢,形成實時數據的分層和復用,支撐各個業務方的報表查詢(交易大屏、行為數據分析、用戶畫像標簽)以及個性化推薦等多個業務場景。
構建ODS層:業務數據庫實時入倉
MySQL有orders(訂單表),orders_pay(訂單支付表),product_catalog(商品類別字典表)3張業務表,這3張表通過Flink實時同步到Hologres中作為ODS層。
構建DWD層:實時主題寬表
將訂單表、商品類別字典表、訂單支付表進行實時打寬,生成DWD層寬表。
構建DWS層:實時指標計算
實時消費寬表的binlog,事件驅動地聚合出相應的DWS層指標表。
前提條件
實時計算Flink版、RDS MySQL和Hologres需要在同一VPC。如果不在同一VPC,需要先打通跨VPC的網絡或者使用公網的形式訪問,詳情請參見如何訪問跨VPC的其他服務?和Flink全托管如何訪問公網?。
通過RAM用戶或RAM角色等身份訪問Hologres和RDS MySQL資源時,需要其具備對應資源的權限。
已開通實時計算Flink版,詳情請參見開通實時計算Flink版。
已購買獨享通用型Hologres實例,詳情請參見購買Hologres。
購買實例后,需要創建order_dw數據庫和用戶(為用戶賦予admin權限),推薦使用簡單權限模型創建數據庫,詳情請參見簡單權限模型的使用和DB管理。
如果在被授權賬號的下拉列表找不到對應的賬號,則說明該賬號并未添加至當前實例,您需要前往用戶管理頁面添加用戶為SuperUser。
說明Hologres1.3版本在創建完數據庫后,需要執行create extension hg_binlog命令才能開啟binlog擴展。
Hologres2.0之后版本默認開啟binlog擴展,無需手動執行。
已創建RDS MySQL實例,并準備MySQL CDC數據源(order_dw數據庫中的三張業務表的建表DDL以及插入的數據如下),詳情請參見創建RDS MySQL實例和管理數據庫。
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee numeric(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- 準備數據 INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
使用限制
僅實時計算引擎VVR 6.0.7及以上版本支持該實時數倉方案。
僅1.3及以上版本的Hologres支持該實時數倉方案。
構建實時數倉
創建Catalog
創建Hologres Catalog。
在實時計算控制臺SQL開發頁面的查詢腳本頁簽,將如下代碼拷貝到查詢腳本,并修改目標參數取值,選中目標片段后單擊左側代碼行上的運行。
CREATE CATALOG dw WITH ( 'type' = 'hologres', 'endpoint' = '<ENDPOINT>', 'username' = '<USERNAME>', 'password' = '${secret_values.ak_holo}', 'dbname' = 'order_dw', 'binlog' = 'true', -- 創建catalog時可以設置源表、維表和結果表支持的with參數,之后在使用此catalog下的表時會默認添加這些默認參數。 'sdkMode' = 'jdbc', -- 推薦使用jdbc模式。 'cdcmode' = 'true', 'connectionpoolname' = 'the_conn_pool', 'ignoredelete' = 'true', -- 寬表merge需要開啟,防止回撤。 'partial-insert.enabled' = 'true', -- 寬表merge需要開啟此參數,實現部分列更新。 'mutateType' = 'insertOrUpdate', -- 寬表merge需要開啟此參數,實現部分列更新。 'table_property.binlog.level' = 'replica', --也可以在創建catalog時傳入持久化的hologres表屬性,之后創建表時,默認都開啟binlog。 'table_property.binlog.ttl' = '259200' );
您需要修改以下參數取值為您實際Hologres服務信息。
參數
說明
備注
endpoint
Hologres的Endpoint地址。
詳情請參見實例配置。
username
阿里云賬號的AccessKey ID。
當前配置的AccessKey對應的用戶需要能夠訪問所有的Hologres數據庫,Hologres數據庫權限請參見Hologres權限模型概述。
為了避免您的AK信息泄露,本示例通過使用名為ak_holo密鑰的方式填寫AccessKey Secret取值,詳情請參見變量和密鑰管理。
password
阿里云賬號的AccessKey Secret。
說明創建Catalog時可以設置默認的源表、維表和結果表的WITH參數,也可以設置創建Hologres物理表的默認屬性,例如上方table_property開頭的參數。詳情請參見管理Hologres Catalog和實時數倉Hologres WITH參數。
創建MySQL Catalog。
將如下代碼拷貝到查詢腳本,并修改目標參數取值,選中目標片段后單擊左側代碼行上的運行。
CREATE CATALOG mysqlcatalog WITH( 'type' = 'mysql', 'hostname' = '<hostname>', 'port' = '<port>', 'username' = '<username>', 'password' = '${secret_values.mysql_pw}', 'default-database' = 'order_dw' );
您需要修改以下參數取值為您實際的MySQL服務信息。
參數
說明
hostname
MySQL數據庫的IP地址或者Hostname。
port
MySQL數據庫服務的端口號,默認值為3306。
username
MySQL數據庫服務的用戶名。
password
MySQL數據庫服務的密碼。
本示例通過使用名為mysql_pw密鑰的方式填寫密碼取值,避免信息泄露,詳情請參見變量和密鑰管理。
構建ODS層:業務數據庫實時入倉
基于Catalog的CREATE DATABASE AS(CDAS)語句功能,可以一次性把ODS層建出來。ODS層一般不直接做OLAP或SERVING(KV點查),主要作為流式作業的事件驅動,開啟binlog即可滿足需求。Binlog是Hologres的核心能力之一,Hologres連接器也支持先全量讀取再增量消費Binlog的全增量模式。
創建CDAS同步作業ODS。
在實時計算控制臺上,新建名為ODS的SQL流作業,并將如下代碼拷貝到SQL編輯器。
CREATE DATABASE IF NOT EXISTS dw.order_dw -- 創建catalog時設置了table_property.binlog.level參數,因此通過CDAS創建的所有表都開啟了binlog。 AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- 可以根據需要選擇上游數據庫需要入倉的表。 /*+ OPTIONS('server-id'='8001-8004') */ ; -- 指定mysql-cdc源表。
說明本示例默認將數據同步到數據庫order_dw的Public Schema下。您也可以將數據同步到Hologres目標庫的指定Schema中,詳情請參見作為CDAS的目標端Catalog,指定后使用Catalog時的表名格式也會發生變化,詳情請參見使用Hologres Catalog。
單擊右上方的部署,進行作業部署。
單擊左側導航欄的作業運維,單擊剛剛部署的ODS作業操作列的啟動,選擇無狀態啟動啟動作業。
查看MySQL同步到Hologres的3張表數據。
在HoloWeb開發頁面連接Hologres實例并登錄目標數據庫后,在SQL編輯器上執行如下命令。
---查orders中的數據。 SELECT * FROM orders; ---查orders_pay中的數據。 SELECT * FROM orders_pay; ---查product_catalog中的數據。 SELECT * FROM product_catalog;
構建DWD層:實時主題寬表
構建DWD層用到了Hologres連接器特有的部分列更新能力,可以使用INSERT DML方便地表達部分列更新的語義。作業中需要對不同的維表進行查詢,是基于Hologres行存以及行列共存表提供的高性能的點查能力。同時,Hologres資源強隔離的架構,可以保證寫入、讀取、分析等作業之間互不干擾。
通過Flink Catalog功能在Hologres中建DWD層的寬表dwd_orders。
在實時計算控制臺SQL開發頁面的查詢腳本頁簽,將如下代碼拷貝到查詢腳本后,選中目標片段后單擊左側代碼行上的運行。
-- 寬表字段要nullable,因為不同的流寫入到同一張結果表,每一列都可能出現null的情況。 CREATE TABLE dw.order_dw.dwd_orders ( order_id bigint not null, order_user_id string, order_shop_id bigint, order_product_id bigint, order_product_catalog_name string, order_fee numeric(20,2), order_create_time timestamp, order_update_time timestamp, order_state int, pay_id bigint, pay_platform int comment 'platform 0: phone, 1: pc', pay_create_time timestamp, PRIMARY KEY(order_id) NOT ENFORCED ); -- 支持通過catalog修改Hologres物理表屬性。 ALTER TABLE dw.order_dw.dwd_orders SET ( 'table_property.binlog.ttl' = '604800' --修改binlog的超時時間為一周。 );
實現實時消費ODS層orders、orders_pay表的binlog。
在實時計算控制臺上,新建名為DWD的SQL作業,并將如下代碼拷貝到SQL編輯器后,部署并啟動作業。通過如下SQL作業,orders表會與product_catalog表進行維表關聯,將最終結果寫入dwd_orders表中,實現數據的實時打寬。
BEGIN STATEMENT SET; INSERT INTO dw.order_dw.dwd_orders ( order_id, order_user_id, order_shop_id, order_product_id, order_fee, order_create_time, order_update_time, order_state, order_product_catalog_name ) SELECT o.*, dim.catalog_name FROM dw.order_dw.orders as o LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim ON o.product_id = dim.product_id; INSERT INTO dw.order_dw.dwd_orders (pay_id, order_id, pay_platform, pay_create_time) SELECT * FROM dw.order_dw.orders_pay; END;
查看寬表dwd_orders數據。
在HoloWeb開發頁面連接Hologres實例并登錄目標數據庫后,在SQL編輯器上執行如下命令。
SELECT * FROM dwd_orders;
構建DWS層:實時指標計算
通過Flink Catalog功能,在Hologres中創建dws層的聚合dws_users以及dws_shops。
在實時計算控制臺SQL開發頁面的查詢腳本頁簽,將如下代碼拷貝到查詢腳本后,選中目標片段后單擊左側代碼行上的運行。
-- 用戶維度聚合指標表。 CREATE TABLE dw.order_dw.dws_users ( user_id string not null, ds string not null, paied_buy_fee_sum numeric(20,2) not null comment '當日完成支付的總金額', primary key(user_id,ds) NOT ENFORCED ); -- 商戶維度聚合指標表。 CREATE TABLE dw.order_dw.dws_shops ( shop_id bigint not null, ds string not null, paied_buy_fee_sum numeric(20,2) not null comment '當日完成支付總金額', primary key(shop_id,ds) NOT ENFORCED );
實時消費DWD層的寬表dw.order_dw.dwd_orders,在Flink中做聚合計算,最終寫入Hologres中的DWS表。
在實時計算控制臺上,新建名為DWS的SQL流作業,并將如下代碼拷貝到SQL編輯器后,部署并啟動作業。
BEGIN STATEMENT SET; INSERT INTO dw.order_dw.dws_users SELECT order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, SUM (order_fee) FROM dw.order_dw.dwd_orders c WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 訂單流和支付流數據都已寫入寬表。 GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd'); INSERT INTO dw.order_dw.dws_shops SELECT order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds, SUM (order_fee) FROM dw.order_dw.dwd_orders c WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 訂單流和支付流數據都已寫入寬表。 GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd'); END;
查看DWS層的聚合結果,其結果會根據上游數據的變更實時更新。
在HoloWeb開發頁面連接Hologres實例并登錄目標數據庫后,在SQL編輯器上執行如下命令。
查詢dws_users表結果。
SELECT * FROM dws_users;
查詢dws_shops表結果。
SELECT * FROM dws_shops;
數據探查
因為開啟了Binlog,所以可直接探查到數據的變化情況。如果對中間結果需要即席(Ad-hoc)性質的業務數據探查,或者對最終計算結果進行數據正確性排查,此方案的每一層數據都實現了持久化,可以便捷地探查中間過程。
流模式探查
新建并啟動數據探查流作業。
在實時計算控制臺上,新建名為Data-exploration的SQL流作業,并將如下代碼拷貝到SQL編輯器后,部署并啟動作業。
-- 流模式探查,打印到print可以看到數據的變化情況。 CREATE TEMPORARY TABLE print_sink( order_id bigint not null, order_user_id string, order_shop_id bigint, order_product_id bigint, order_product_catalog_name string, order_fee numeric(20,2), order_create_time timestamp, order_update_time timestamp, order_state int, pay_id bigint, pay_platform int, pay_create_time timestamp, PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'print' ); INSERT INTO print_sink SELECT * FROM dw.order_dw.dwd_orders /*+ OPTIONS('startTime'='2023-02-15 12:00:00') */ --這里的startTime是binlog生成的時間 WHERE order_user_id = 'user_001';
查看數據探查結果。
在作業運維詳情頁面,單擊目標作業名稱,在作業探查頁簽下左側運行日志頁簽,單擊運行Task Managers頁簽下的Path, ID。在Stdout頁面搜索user_001相關的日志信息。
批模式探查
在實時計算控制臺上,創建SQL流作業,并將如下代碼拷貝到SQL編輯器后,單擊調試。詳情請參見作業調試。
批模式探查是獲取當前時刻的終態數據,在Flink作業開發界面調試結果如下圖所示。
SELECT * FROM dw.order_dw.dwd_orders /*+ OPTIONS('binlog'='false') */ WHERE order_user_id = 'user_001' and order_create_time > '2023-02-15 12:00:00'; --批量模式支持filter下推,提升批作業執行效率。
使用實時數倉
上一小節展示了通過Flink Catalog,可以僅在Flink側搭建一個基于Flink和Hologres的Streaming Warehouse實時分層數倉。本節則展示數倉搭建完成之后的一些簡單應用場景。
Key-Value服務
根據主鍵查詢DWS層的聚合指標表,支持百萬級RPS。
在HoloWeb開發頁面查詢指定用戶指定日期的消費額的代碼示例如下。
-- holo sql
SELECT * FROM dws_users WHERE user_id ='user_001' AND ds = '20230215';
明細查詢
對DWD層寬表進行OLAP分析。
在HoloWeb開發頁面查詢某個客戶23年2月特定支付平臺支付的訂單明細的代碼示例如下。
-- holo sql
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time LIMIT 100;
實時報表
基于DWD層寬表數據展示實時報表,Hologres的行列共存以及列存表有非常優秀的OLAP分析能力,支持秒級響應。
在HoloWeb開發頁面查詢23年2月內每個品類的訂單總量和訂單總金額的代碼示例如下。
-- holo sql
SELECT
TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
order_product_catalog_name,
COUNT(*),
SUM(order_fee)
FROM
dwd_orders
WHERE
order_create_time >= '2023-02-01 00:00:00' and order_create_time < '2023-03-01 00:00:00'
GROUP BY
order_create_date, order_product_catalog_name
ORDER BY
order_create_date, order_product_catalog_name;
相關文檔
Hologres的Binlog能力詳情,請參見訂閱Hologres Binlog。
Flink支持在一個作業中寫入多個INSERT INTO語句,語法請參見INSERT INTO語句。
實時計算Flink版支持豐富的連接器,詳情請參見支持的連接器。