用戶行為數據通常具有龐大的數據量,存儲成本較高,且缺乏統一的格式,導致處理難度較大。常用的寬表模型雖查詢效率高,但冗余度高、存儲空間大、維護復雜,更新慢。本文基于Flink+MongoDB+Hologres更好地實現寬表的數據分析,以游戲行業的用戶行為數據分析為示例,構建用戶行為數據寬表進行數據分析的方案。
方案架構和優勢
架構
實時計算Flink版是強大的流式計算引擎,支持對海量實時數據高效處理。云數據庫MongoDB版是一種文檔型的NoSQL數據庫,具有數據存儲結構靈活、讀寫性能高、支持復雜的查詢條件等特點。實時數倉Hologres是一站式實時數倉,支持數據實時寫入與更新,實時數據寫入即可查。三者結合相輔相成,能夠將復雜多變的數據進行聯合計算處理,實現對大型數據即時更新、查詢與分析。本文的示例方案架構如下:
MongoDB的業務數據更新后,將數據主鍵通過消息隊列Kafka采用Upsert的方式更新寫入。
無論是事實表或維度表的更新,都會將所影響的數據主鍵通過Flink寫入消息隊列Kafka。
Flink通過消費消息隊列Kafka中的主鍵信息,根據主鍵獲取MongoDB的完整的業務數據后更新寫入到Hologres。
優勢
該方案有如下優勢:
云數據庫MongoDB版適用于高并發讀寫的場景的分片集群架構,針對高頻寫入的海量數據,可無限擴展性能及存儲空間,解決寫入效率低和存儲空間不足的問題。
無論是事實表還是維度表的更新,都會讓結果表的最新數據進行更新。這一過程確保了數據更新的及時性,并且在處理海量數據時,僅對發生變更的數據進行更新,從而有效解決了更新效率低下的問題。
實時數倉Hologres支持高效更新與修正,寫入即可查,由其對外提供寬表模型的數據查詢,有效提升數據查詢與分析的效率。
實踐場景
本文以某游戲廠商為例,實現了將用戶在平臺上購買游戲的行為數據進行實時預處理并寫入Hologres即時查詢的業務場景。
game_sales(游戲銷售表)
sale_id(銷售ID) | game_id(游戲ID) | platform_id(平臺ID) | sale_date(銷售日期) | units_sold(銷售量) | sale_amt(銷售金額) |
game_dimension(游戲維度表)
game_id(游戲ID) | game_name(游戲名稱) | release_date(發售日期) | developer(開發者) | publisher(發行商) |
gameplatform_dimension(平臺維度表)
platform_id(平臺ID) | platform_name(平臺名稱) | type(終端類型) |
game_sales_details(游戲銷售明細表)
sale_id(銷售ID) | game_id(游戲ID) | platform_id(平臺ID) | sale_date(銷售日期) | units_sold(銷售量) | sale_amt(銷售金額) |
game_name(游戲名稱) | release_date(發售日期) | developer(開發者) | publisher(發行商) | platform_name(平臺名稱) | type(終端類型) |
MongoDB業務數據更新:
業務數據更新,無論是游戲銷售表或維度表更新,都將同步更新游戲銷售明細表數據。
Kafka數據更新記錄:
游戲銷售表或維度表更新,其對應所影響數據的主鍵將更新到消息隊列。
Hologres更新:
通過消費消息隊列的主鍵信息,將其與三張表相互關聯,從而得到完整的用戶行為數據以更新游戲銷售明細表。
前提條件
已開通Flink工作空間,僅實時計算引擎VVR 8.0.5及以上版本支持該方案,詳情請參見開通實時計算Flink版。
已創建云數據庫MongoDB,僅云數據庫 MongoDB 4.0及以上版本支持該方案,詳情請參見創建MongoDB分片集群實例。
已創建實時數倉Hologres,僅1.3及以上版本的獨享Hologres實例支持該方案,詳情請參見購買實時數倉Hologres。
已創建消息隊列Kafka,詳情請參見部署消息隊列Kafka實例。
實時計算Flink版、云數據庫 MongoDB、實時數倉Hologres和云消息隊列Kafka需要在同一VPC下。如果不在同一VPC,需要先打通跨VPC的網絡或者使用公網的形式訪問,詳情請參見如何訪問跨VPC的其他服務?和實時計算Flink版如何訪問公網?
通過RAM用戶或RAM角色等身份訪問對應資源時,需要其具備對應資源的權限。
步驟一:準備數據
在MongoDB創建數據庫和三張業務表并插入數據。
登錄MongoDB控制臺,選擇單擊創建好的分片集群實例。
單擊右上角登錄數據庫,選擇單擊目標實例。
創建mongo_test數據庫。
use mongo_test;
在mongo_test創建game_sales(游戲銷售表),game_dimension(游戲維度表)和platform_dimension(平臺維度表)并插入數據。
//游戲銷售表 db.game_sales.insert( [ {sale_id:0,game_id:101,platform_id:1,"sale_date":"2024-01-01",units_sold:500,sale_amt:2500}, ] ); //游戲維度表 db.game_dimension.insert( [ {game_id:101,"game_name":"SpaceInvaders","release_date":"2023-06-15","developer":"DevCorp","publisher":"PubInc"}, {game_id:102,"game_name":"PuzzleQuest","release_date":"2023-07-20","developer":"PuzzleDev","publisher":"QuestPub"}, {game_id:103,"game_name":"RacingFever","release_date":"2023-08-10","developer":"SpeedCo","publisher":"RaceLtd"}, {game_id:104,"game_name":"AdventureLand","release_date":"2023-09-05","developer":"Adventure","publisher":"LandCo"}, ] ); //平臺維度表 db.platform_dimension.insert( [ {platform_id:1,"platform_name":"PCGaming","type":"PC"}, {platform_id:2,"platform_name":"PlayStation","type":"Console"}, {platform_id:3,"platform_name":"Mobile","type":"Mobile"} ] );
查詢創建的表信息。
db.game_sales.find(); db.game_dimension.find(); db.platform_dimension.find();
Hologres創建數據分析寬表。
登錄Hologres控制臺,在實例列表頁面,單擊目標實例,單擊右上角的登錄實例。
單擊上方導航欄的新建庫,新建數據庫名稱為test,本示例權限策略選擇SPM,詳情請參見使用Hologres管理控制臺創建數據庫。
選擇上方導航欄中的SQL編輯器,單擊左側導航欄上方的SQL圖標,新建SQL查詢,選擇對應實例名和數據庫,填入下方代碼創建銷售明細表。
CREATE TABLE game_sales_details( sale_id INT not null primary key, game_id INT, platform_id INT, sale_date VARCHAR(50), units_sold INT, sale_amt INT, game_name VARCHAR(50), release_date VARCHAR(50), developer VARCHAR(50), publisher VARCHAR(50), platform_name VARCHAR(50), type VARCHAR(50) );
消息隊列Kafka創建Topic。
登錄Kafka控制臺,在實例列表頁面,單擊目標實例名稱。
單擊左側導航欄的白名單管理,添加或修改白名單分組,將Flink工作空間網段設置為白名單。
左側導航欄選擇Topic管理,單擊創建Topic,名稱為
game_sales_fact
,其余選擇默認設置,單擊確認。
步驟二:創建流作業
作業一:將銷售表主鍵寫入消息隊列
游戲銷售表將表主鍵(sale_id)存入消息隊列Kafka,流程如下圖:
登錄實時計算控制臺。
單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊
。單擊新建后,在新建作業草稿對話框,選擇空白的流作業草稿,單擊下一步。
修改文件名稱為MongoDB_to_Kafka,單擊創建。
下列代碼中使用MongoDB連接器創建了源表game_sales,使用Upsert Kafka連接器創建了Kafka的Topic為game_sales_fact。為避免作業中出現明文密碼,造成安全隱患,可以參見變量管理來配置密碼和地址等變量。
//創建MongoDB游戲銷售表 CREATE TEMPORARY TABLE game_sales ( `_id` STRING, --MongoDB自生成Id sale_id INT, --銷售Id game_id INT, --游戲Id platform_id INT, --平臺Id sale_date STRING, --銷售日期 units_sold INT, --銷售量 sale_amt INT, --銷售金額 PRIMARY KEY (_id) NOT ENFORCED ) WITH ( 'connector' = 'mongodb', --使用的連接器 'hosts' = '${secret_values.MongoDB-hosts}', --MongoDB連接地址 'username' = '${secret_values.MongoDB-username}', --MongoDB用戶名 'password' = '${secret_values.MongoDB-password}', --MongoDB密碼 'database' = 'mongo_test', --數據庫名稱 'collection' = 'game_sales' --數據庫表名 ); //創建Kafka Topic存儲主鍵信息的事實表 CREATE TEMPORARY TABLE game_sales_fact ( sale_id INT, PRIMARY KEY (sale_id) NOT ENFORCED ) WITH ( 'connector' = 'upsert-kafka', --使用的連接器 'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}', --Kafka連接地址 'topic' = 'game_sales_fact', --Topic名稱 'key.format' = 'json', --key部分數據格式 'value.format' = 'json' --value部分數據格式 ); BEGIN STATEMENT SET; --事務塊起始,下列操作全部成功,或者全部回滾 // 將主鍵信息插入Kafka的Topic INSERT INTO game_sales_fact ( sale_id ) SELECT sale_id FROM game_sales; END;
說明本示例使用的是Upsert Kafka連接器,以Upsert方式向Kafka寫入數據,其與Kafka連接器的區別請參見Kafka、Upsert Kafka或Kafka JSON catalog的選擇。
單擊右上方的部署,單擊確認。
更多部署參數詳情請參見部署作業。
作業二:同步更新主鍵信息到消息隊列
維度表與銷售表相互進行連接(維表JOIN)。若維度表或銷售表發生變更,將會把銷售表中受影響數據的主鍵(sale_id)更新至消息隊列Kafka。作業流程如下圖:
維表JOIN:通常用于使用從外部系統查詢的數據來豐富表。join 要求一個表具有處理時間屬性,還需要一個強制的相等連接條件,詳情請參見維表JOIN語句。在下列作業中,帶有process time屬性的FOR SYSTEM_TIME AS OF 子句在確保聯接處理game_sales表每一行時,都能與join條件匹配的維表行連接。它還防止連接的維表在未來發生更新時變更連接的結果。強制的連接條件則是gd.game_id = gsf.game_id和pd.platform_id = gsf.platform_id。
參考作業一新建并部署作業(MongoDB_joinTo_Kafka)。
//創建游戲維度表
CREATE TEMPORARY TABLE game_dimension
(
`_id` STRING,
game_id INT,
game_name STRING,
release_date STRING,
developer STRING,
publisher STRING,
PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'hosts' = '${secret_values.MongoDB-hosts}',
'username' = '${secret_values.MongoDB-username}',
'password' = '${secret_values.MongoDB-password}',
'database' = 'mongo_test',
'collection' = 'game_dimension'
);
//創建平臺維度表
CREATE TEMPORARY TABLE platform_dimension
(
`_id` STRING,
platform_id INT,
platform_name STRING,
type STRING,
PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'hosts' = '${secret_values.MongoDB-hosts}',
'username' = '${secret_values.MongoDB-username}',
'password' = '${secret_values.MongoDB-password}',
'database' = 'mongo_test',
'collection' = 'platform_dimension'
);
//創建游戲銷售表
CREATE TEMPORARY TABLE game_sales
(
`_id` STRING,
sale_id INT,
game_id INT,
platform_id INT,
sale_date STRING,
units_sold INT,
sale_amt INT,
PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'hosts' = '${secret_values.MongoDB-hosts}',
'username' = '${secret_values.MongoDB-username}',
'password' = '${secret_values.MongoDB-password}',
'database' = 'mongo_test',
'collection' = 'game_sales'
);
//創建Kafka Topic存儲主鍵信息的事實表
CREATE TEMPORARY TABLE game_sales_fact (
sale_id INT,
PRIMARY KEY (sale_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}',
'topic' = 'game_sales_fact',
'key.format' = 'json',
'value.format' = 'json'
);
BEGIN STATEMENT SET;
//游戲維度表與游戲銷售表關聯,有數據更新則將所影響的銷售表主鍵插入Kafka Topic
INSERT INTO game_sales_fact (
sale_id
)
select
gs.sale_id
from game_dimension as gd
join game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gs
on gd.game_id = gs.game_id;
//平臺維度表與銷售表關聯,有數據更新則將所影響的銷售主鍵插入Kafka事實表
INSERT INTO game_sales_fact (
sale_id
)
select
gs.sale_id
from platform_dimension as pd
join game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gs
on pd.platform_id = gs.platform_id;
END;
作業三:同步更新Hologres明細表
消費Kafka Topic當中的主鍵信息(sale_id),通過三個維表Join操作得到銷售明細信息,寫入最終的明細表。作業流程如下圖:
參考作業一新建并部署作業(MongoDB_joinKafkaTo_Holo)。下列代碼中使用Hologres連接器創建了結果表game_sales_details。
//創建Kafka Topic存儲主鍵的事實表,消費主鍵信息
CREATE TEMPORARY TABLE game_sales_fact (
sale_id INT,
PRIMARY KEY (sale_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}',
'topic' = 'game_sales_fact', --消費的topic
'format' = 'json', --數據格式
'properties.group.id' = 'game_sales_fact', --消費者組
'properties.auto.offset.reset' = 'earliest' --如果消費者組首次使用,則從最早位點開始消費
);
//游戲銷售表
CREATE TEMPORARY TABLE game_sales
(
`_id` STRING,
sale_id INT,
game_id INT,
platform_id INT,
sale_date STRING,
units_sold INT,
sale_amt INT,
PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'hosts' = '${secret_values.MongoDB-hosts}',
'username' = '${secret_values.MongoDB-username}',
'password' = '${secret_values.MongoDB-password}',
'database' = 'mongo_test',
'collection' = 'game_sales'
);
//游戲維度表
CREATE TEMPORARY TABLE game_dimension
(
`_id` STRING,
game_id INT,
game_name STRING,
release_date STRING,
developer STRING,
publisher STRING,
PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'hosts' = '${secret_values.MongoDB-hosts}',
'username' = '${secret_values.MongoDB-username}',
'password' = '${secret_values.MongoDB-password}',
'database' = 'mongo_test',
'collection' = 'game_dimension'
);
//平臺維度表
CREATE TEMPORARY TABLE platform_dimension
(
`_id` STRING
,platform_id INT
,platform_name STRING
,type STRING
,PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
'connector' = 'mongodb',
'hosts' = '${secret_values.MongoDB-hosts}',
'username' = '${secret_values.MongoDB-username}',
'password' = '${secret_values.MongoDB-password}',
'database' = 'mongo_test',
'collection' = 'platform_dimension'
);
//游戲銷售明細表
CREATE TEMPORARY TABLE IF NOT EXISTS game_sales_details
(
sale_id INT,
game_id INT,
platform_id INT,
sale_date STRING,
units_sold INT,
sale_amt INT,
game_name STRING,
release_date STRING,
developer STRING,
publisher STRING,
platform_name STRING,
type STRING,
PRIMARY KEY (sale_id) NOT ENFORCED
)
WITH (
'connector' = 'hologres',
'dbname' = 'test', --Hologres的數據庫名稱
'tablename' = 'public.game_sales_details', --Hologres用于接收數據的表名稱
'username' = '${secret_values.AccessKeyID}', --當前阿里云賬號的AccessKey ID
'password' = '${secret_values.AccessKeySecret}', --當前阿里云賬號的AccessKey Secret
'endpoint' = '${secret_values.Hologres-endpoint}', --當前Hologres實例VPC網絡的Endpoint
'ignoredelete' = 'false', -- 是否忽略撤回消息。寬表merge需要關閉。
'mutatetype' = 'insertorupdate', -- 更新模式。寬表merge需要開啟此參數,實現部分列更新。
'partial-insert.enabled' = 'true', -- 是否只插入INSERT語句中定義的字段。寬表merge需要開啟此參數,實現部分列更新。
'ignoreNullWhenUpdate' = 'true' --是否忽略更新寫入數據中的Null值。
);
INSERT INTO game_sales_details (
sale_id,
game_id,
platform_id,
sale_date,
units_sold,
sale_amt,
game_name,
release_date,
developer,
publisher,
platform_name,
type
)
select
gsf.sale_id,
gs.game_id,
gs.platform_id,
gs.sale_date,
gs.units_sold,
gs.sale_amt,
gd.game_name,
gd.release_date,
gd.developer,
gd.publisher,
pd.platform_name,
pd.type
from game_sales_fact as gsf
join game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gs
on gsf.sale_id = gs.sale_id
join game_dimension FOR SYSTEM_TIME AS OF PROCTIME() as gd
on gs.game_id = gd.game_id
join platform_dimension FOR SYSTEM_TIME AS OF PROCTIME() as pd
on gs.platform_id = pd.platform_id;
步驟三:啟動作業
完成三個作業開發部署后,在左側導航選擇
,啟動三個作業。作業運行后,前往Hologres控制臺,對明細表game_sales_details進行查詢。
SELECT * FROM game_sales_details;
此時game_sales_details表中插入了一條數據。
步驟四:數據更新和查詢
對于銷售表和維度表的數據變更,能直接更新反饋到明細表上進行查詢分析。下面將列舉幾種常見的數據變更操作,以展示數據實時更新的情況。
銷售表數據變更
對MongoDB的game_sales表插入五條數據,并觀察Hologres的game_sale_details表的結果。
db.game_sales.insert( [ {sale_id:1,game_id:101,platform_id:1,"sale_date":"2024-01-01",units_sold:500,sale_amt:2500}, {sale_id:2,game_id:102,platform_id:2,"sale_date":"2024-08-02",units_sold:400,sale_amt:2000}, {sale_id:3,game_id:103,platform_id:1,"sale_date":"2024-08-03",units_sold:300,sale_amt:1500}, {sale_id:4,game_id:101,platform_id:3,"sale_date":"2024-08-04",units_sold:200,sale_amt:1000}, {sale_id:5,game_id:104,platform_id:2,"sale_date":"2024-08-05",units_sold:100,sale_amt:3000} ] );
查詢Hologres的game_sale_details表,可以看到,該表增加了五條相應的數據。
將MongoDB表game_sales中sale_date=2024-01-01的數據均修改為2024-08-01。
db.game_sales.updateMany({"sale_date": "2024-01-01"}, {$set: {"sale_date": "2024-08-01"}});
而后查詢Hologres的game_sale_details表??梢钥吹剑摫碇衧ale_date=2024-01-01的數據都被修改成2024-08-01了。
對MongoDB的game_sales表刪除數據,將日期為2024-08-01的數據刪除。
db.game_sales.remove({"sale_date": "2024-08-01"});
而后查詢Hologres的game_sale_details表??梢钥吹?,該表中刪除了兩條日期為2024-08-01的數據。
維度表數據變更
對MongoDB的game_dimension表和platform_dimension表插入數據,插入新的游戲和新的平臺數據。
//游戲維度表 db.game_dimension.insert( [ {game_id:105,"game_name":"HSHWK","release_date":"2024-08-20","developer":"GameSC","publisher":"GameSC"}, {game_id:106,"game_name":"HPBUBG","release_date":"2018-01-01","developer":"BLUE","publisher":"KK"} ] ); //平臺維度表 db.platform_dimension.insert( [ {platform_id:4,"platform_name":"Steam","type":"PC"}, {platform_id:5,"platform_name":"Epic","type":"PC"} ] );
新增游戲數據和新增平臺數據,并不會增加明細表數據。用戶需要產生購買或者下載行為才可以,所以繼續添加game_sales表的數據。
// 游戲銷售表 db.game_sales.insert( [ {sale_id:6,game_id:105,platform_id:4,"sale_date":"2024-09-01",units_sold:400,sale_amt:2000}, {sale_id:7,game_id:106,platform_id:1,"sale_date":"2024-09-01",units_sold:300,sale_amt:1500} ] );
而后查詢Hologres的game_sale_details表,可以看到,明細表新增了兩條關于新游戲和新平臺的數據記錄。
對MongoDB的game_dimension表和platform_dimension表更新數據,更新游戲信息和平臺信息。
//更新開發日期 db.game_dimension.updateMany({"release_date": "2018-01-01"}, {$set: {"release_date": "2024-01-01"}}); //更新平臺類型 db.platform_dimension.updateMany({"type": "PC"}, {$set: {"type": "Swich"}});
可以看到相關的字段信息已經被修改了。