日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

基于Flink+MongoDB+Hologres的游戲行業用戶行為分析

用戶行為數據通常具有龐大的數據量,存儲成本較高,且缺乏統一的格式,導致處理難度較大。常用的寬表模型雖查詢效率高,但冗余度高、存儲空間大、維護復雜,更新慢。本文基于Flink+MongoDB+Hologres更好地實現寬表的數據分析,以游戲行業的用戶行為數據分析為示例,構建用戶行為數據寬表進行數據分析的方案。

方案架構和優勢

架構

實時計算Flink版是強大的流式計算引擎,支持對海量實時數據高效處理。云數據庫MongoDB版是一種文檔型的NoSQL數據庫,具有數據存儲結構靈活、讀寫性能高、支持復雜的查詢條件等特點。實時數倉Hologres是一站式實時數倉,支持數據實時寫入與更新,實時數據寫入即可查。三者結合相輔相成,能夠將復雜多變的數據進行聯合計算處理,實現對大型數據即時更新、查詢與分析。本文的示例方案架構如下:

  1. MongoDB的業務數據更新后,將數據主鍵通過消息隊列Kafka采用Upsert的方式更新寫入。

  2. 無論是事實表或維度表的更新,都會將所影響的數據主鍵通過Flink寫入消息隊列Kafka。

  3. Flink通過消費消息隊列Kafka中的主鍵信息,根據主鍵獲取MongoDB的完整的業務數據后更新寫入到Hologres。

image

優勢

該方案有如下優勢:

  • 云數據庫MongoDB版適用于高并發讀寫的場景的分片集群架構,針對高頻寫入的海量數據,可無限擴展性能及存儲空間,解決寫入效率低和存儲空間不足的問題。

  • 無論是事實表還是維度表的更新,都會讓結果表的最新數據進行更新。這一過程確保了數據更新的及時性,并且在處理海量數據時,僅對發生變更的數據進行更新,從而有效解決了更新效率低下的問題。

  • 實時數倉Hologres支持高效更新與修正,寫入即可查,由其對外提供寬表模型的數據查詢,有效提升數據查詢與分析的效率。

實踐場景

本文以某游戲廠商為例,實現了將用戶在平臺上購買游戲的行為數據進行實時預處理并寫入Hologres即時查詢的業務場景。

image
game_sales(游戲銷售表)

sale_id(銷售ID)

game_id(游戲ID)

platform_id(平臺ID)

sale_date(銷售日期)

units_sold(銷售量)

sale_amt(銷售金額)

game_dimension(游戲維度表)

game_id(游戲ID)

game_name(游戲名稱)

release_date(發售日期)

developer(開發者)

publisher(發行商)

gameplatform_dimension(平臺維度表)

platform_id(平臺ID)

platform_name(平臺名稱)

type(終端類型)

game_sales_details(游戲銷售明細表)

sale_id(銷售ID)

game_id(游戲ID)

platform_id(平臺ID)

sale_date(銷售日期)

units_sold(銷售量)

sale_amt(銷售金額)

game_name(游戲名稱)

release_date(發售日期)

developer(開發者)

publisher(發行商)

platform_name(平臺名稱)

type(終端類型)

  1. MongoDB業務數據更新:

    業務數據更新,無論是游戲銷售表或維度表更新,都將同步更新游戲銷售明細表數據。

  2. Kafka數據更新記錄:

    游戲銷售表或維度表更新,其對應所影響數據的主鍵將更新到消息隊列。

  3. Hologres更新:

    通過消費消息隊列的主鍵信息,將其與三張表相互關聯,從而得到完整的用戶行為數據以更新游戲銷售明細表。

前提條件

步驟一:準備數據

  1. 在MongoDB創建數據庫和三張業務表并插入數據。

    1. 登錄MongoDB控制臺,選擇單擊創建好的分片集群實例。

    2. 將Flink工作空間網段設置為白名單,詳情請參見設置白名單如何設置白名單。

    3. 單擊右上角登錄數據庫,選擇單擊目標實例。

    4. 創建mongo_test數據庫。

      use mongo_test;
    5. 在mongo_test創建game_sales(游戲銷售表),game_dimension(游戲維度表)和platform_dimension(平臺維度表)并插入數據。

      //游戲銷售表
      db.game_sales.insert(
        [
      	{sale_id:0,game_id:101,platform_id:1,"sale_date":"2024-01-01",units_sold:500,sale_amt:2500},
        ]
      );
      
      //游戲維度表
      db.game_dimension.insert(
        [
      	{game_id:101,"game_name":"SpaceInvaders","release_date":"2023-06-15","developer":"DevCorp","publisher":"PubInc"},
      	{game_id:102,"game_name":"PuzzleQuest","release_date":"2023-07-20","developer":"PuzzleDev","publisher":"QuestPub"},
      	{game_id:103,"game_name":"RacingFever","release_date":"2023-08-10","developer":"SpeedCo","publisher":"RaceLtd"},
      	{game_id:104,"game_name":"AdventureLand","release_date":"2023-09-05","developer":"Adventure","publisher":"LandCo"},
        ]
      );
      
      //平臺維度表
      db.platform_dimension.insert(
        [
      	{platform_id:1,"platform_name":"PCGaming","type":"PC"},
      	{platform_id:2,"platform_name":"PlayStation","type":"Console"},
      	{platform_id:3,"platform_name":"Mobile","type":"Mobile"}
        ]
      );
    6. 查詢創建的表信息。

      db.game_sales.find();
      db.game_dimension.find();
      db.platform_dimension.find();

      image

  2. Hologres創建數據分析寬表。

    1. 登錄Hologres控制臺,在實例列表頁面,單擊目標實例,單擊右上角的登錄實例。

    2. 單擊上方導航欄的新建庫,新建數據庫名稱為test,本示例權限策略選擇SPM,詳情請參見使用Hologres管理控制臺創建數據庫。

      image

    3. 選擇上方導航欄中的SQL編輯器,單擊左側導航欄上方的SQL圖標,新建SQL查詢,選擇對應實例名和數據庫,填入下方代碼創建銷售明細表。

      CREATE TABLE game_sales_details(
        sale_id INT not null primary key,
        game_id INT,
        platform_id INT,
        sale_date VARCHAR(50),
        units_sold INT,
        sale_amt INT,
        game_name VARCHAR(50),
        release_date VARCHAR(50),
        developer VARCHAR(50),
        publisher VARCHAR(50),
        platform_name VARCHAR(50),
        type VARCHAR(50)
      );
  3. 消息隊列Kafka創建Topic。

    1. 登錄Kafka控制臺,在實例列表頁面,單擊目標實例名稱。

    2. 單擊左側導航欄的白名單管理,添加或修改白名單分組,將Flink工作空間網段設置為白名單。

    3. 左側導航欄選擇Topic管理,單擊創建Topic,名稱為game_sales_fact,其余選擇默認設置,單擊確認

步驟二:創建流作業

作業一:將銷售表主鍵寫入消息隊列

游戲銷售表將表主鍵(sale_id)存入消息隊列Kafka,流程如下圖:

image
  1. 登錄實時計算控制臺。

  2. 單擊目標工作空間操作列下的控制臺。

  3. 在左側導航欄,單擊數據開發 > ETL

  4. 單擊新建后,在新建作業草稿對話框,選擇空白的流作業草稿,單擊下一步。

  5. 修改文件名稱為MongoDB_to_Kafka,單擊創建。

    下列代碼中使用MongoDB連接器創建了源表game_sales,使用Upsert Kafka連接器創建了Kafka的Topic為game_sales_fact。為避免作業中出現明文密碼,造成安全隱患,可以參見變量管理來配置密碼和地址等變量。

    //創建MongoDB游戲銷售表
    CREATE TEMPORARY TABLE game_sales
    (
      `_id`       STRING,    --MongoDB自生成Id
      sale_id     INT,       --銷售Id
      game_id     INT,       --游戲Id
      platform_id INT,       --平臺Id
      sale_date   STRING,    --銷售日期
      units_sold  INT,       --銷售量
      sale_amt    INT,       --銷售金額
      PRIMARY KEY (_id) NOT ENFORCED
    )
    WITH (
      'connector' = 'mongodb',      --使用的連接器
      'hosts' = '${secret_values.MongoDB-hosts}',        --MongoDB連接地址
      'username' = '${secret_values.MongoDB-username}',  --MongoDB用戶名
      'password' = '${secret_values.MongoDB-password}',  --MongoDB密碼
      'database' = 'mongo_test',    --數據庫名稱
      'collection' = 'game_sales'   --數據庫表名
    );
    
    //創建Kafka Topic存儲主鍵信息的事實表
    CREATE TEMPORARY TABLE game_sales_fact (
      sale_id     INT,
      PRIMARY KEY (sale_id) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',    --使用的連接器
      'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}',   --Kafka連接地址
      'topic' = 'game_sales_fact',    --Topic名稱
      'key.format' = 'json',          --key部分數據格式
      'value.format' = 'json'         --value部分數據格式
    );
    
    BEGIN STATEMENT SET;   --事務塊起始,下列操作全部成功,或者全部回滾
    
    // 將主鍵信息插入Kafka的Topic
    INSERT INTO game_sales_fact (
      sale_id
    )
    SELECT
      sale_id
    FROM game_sales;
    
    END;
    說明

    本示例使用的是Upsert Kafka連接器,以Upsert方式向Kafka寫入數據,其與Kafka連接器的區別請參見Kafka、Upsert Kafka或Kafka JSON catalog的選擇。

  6. 單擊右上方的部署,單擊確認。

    更多部署參數詳情請參見部署作業。

作業二:同步更新主鍵信息到消息隊列

維度表與銷售表相互進行連接(維表JOIN)。若維度表或銷售表發生變更,將會把銷售表中受影響數據的主鍵(sale_id)更新至消息隊列Kafka。作業流程如下圖:

image

維表JOIN:通常用于使用從外部系統查詢的數據來豐富表。join 要求一個表具有處理時間屬性,還需要一個強制的相等連接條件,詳情請參見維表JOIN語句。在下列作業中,帶有process time屬性的FOR SYSTEM_TIME AS OF 子句在確保聯接處理game_sales表每一行時,都能與join條件匹配的維表行連接。它還防止連接的維表在未來發生更新時變更連接的結果。強制的連接條件則是gd.game_id = gsf.game_id和pd.platform_id = gsf.platform_id。

參考作業一新建并部署作業(MongoDB_joinTo_Kafka)。

//創建游戲維度表
CREATE TEMPORARY TABLE game_dimension
(
  `_id`        STRING,
  game_id      INT,
  game_name    STRING,
  release_date STRING,
  developer    STRING,
  publisher    STRING,
  PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'hosts' = '${secret_values.MongoDB-hosts}',
  'username' = '${secret_values.MongoDB-username}',
  'password' = '${secret_values.MongoDB-password}',
  'database' = 'mongo_test',
  'collection' = 'game_dimension'
);

//創建平臺維度表
CREATE TEMPORARY TABLE platform_dimension
(
  `_id`         STRING,
  platform_id   INT,
  platform_name STRING,
  type          STRING,
  PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'hosts' = '${secret_values.MongoDB-hosts}',
  'username' = '${secret_values.MongoDB-username}',
  'password' = '${secret_values.MongoDB-password}',
  'database' = 'mongo_test',
  'collection' = 'platform_dimension'
);

//創建游戲銷售表
CREATE TEMPORARY TABLE game_sales
(
  `_id`       STRING,
  sale_id     INT,
  game_id     INT,
  platform_id INT,
  sale_date   STRING,
  units_sold  INT,
  sale_amt    INT,
  PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'hosts' = '${secret_values.MongoDB-hosts}',
  'username' = '${secret_values.MongoDB-username}',
  'password' = '${secret_values.MongoDB-password}',
  'database' = 'mongo_test',
  'collection' = 'game_sales'
);

//創建Kafka Topic存儲主鍵信息的事實表
CREATE TEMPORARY TABLE game_sales_fact (
  sale_id      INT,
  PRIMARY KEY (sale_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}',
  'topic' = 'game_sales_fact',
  'key.format' = 'json',
  'value.format' = 'json'
);

BEGIN STATEMENT SET;

//游戲維度表與游戲銷售表關聯,有數據更新則將所影響的銷售表主鍵插入Kafka Topic
INSERT INTO game_sales_fact (
   sale_id
)
select
  gs.sale_id
from game_dimension as gd
join game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gs
on gd.game_id = gs.game_id;

//平臺維度表與銷售表關聯,有數據更新則將所影響的銷售主鍵插入Kafka事實表
INSERT INTO game_sales_fact (
   sale_id
)
select
  gs.sale_id
from platform_dimension as pd
join game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gs
on pd.platform_id = gs.platform_id;

END;

作業三:同步更新Hologres明細表

消費Kafka Topic當中的主鍵信息(sale_id),通過三個維表Join操作得到銷售明細信息,寫入最終的明細表。作業流程如下圖:

image

參考作業一新建并部署作業(MongoDB_joinKafkaTo_Holo)。下列代碼中使用Hologres連接器創建了結果表game_sales_details。

//創建Kafka Topic存儲主鍵的事實表,消費主鍵信息
CREATE TEMPORARY TABLE game_sales_fact (
  sale_id      INT,
  PRIMARY KEY (sale_id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '${secret_values.Kafka-hosts}',
  'topic' = 'game_sales_fact',    --消費的topic
  'format' = 'json',              --數據格式
  'properties.group.id' = 'game_sales_fact',  --消費者組
  'properties.auto.offset.reset' = 'earliest' --如果消費者組首次使用,則從最早位點開始消費
);

//游戲銷售表
CREATE TEMPORARY TABLE game_sales
(
  `_id`       STRING,
  sale_id     INT,
  game_id     INT,
  platform_id INT,
  sale_date   STRING,
  units_sold  INT,
  sale_amt    INT,
  PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'hosts' = '${secret_values.MongoDB-hosts}',
  'username' = '${secret_values.MongoDB-username}',
  'password' = '${secret_values.MongoDB-password}',
  'database' = 'mongo_test',
  'collection' = 'game_sales'
);

//游戲維度表
CREATE TEMPORARY TABLE game_dimension
(
  `_id`        STRING,
  game_id      INT,
  game_name    STRING,
  release_date STRING,
  developer    STRING,
  publisher    STRING,
  PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'hosts' = '${secret_values.MongoDB-hosts}',
  'username' = '${secret_values.MongoDB-username}',
  'password' = '${secret_values.MongoDB-password}',
  'database' = 'mongo_test',
  'collection' = 'game_dimension'
);

//平臺維度表
CREATE TEMPORARY TABLE platform_dimension
(
  `_id`          STRING
  ,platform_id   INT
  ,platform_name STRING
  ,type          STRING
  ,PRIMARY KEY (_id) NOT ENFORCED
)
WITH (
  'connector' = 'mongodb',
  'hosts' = '${secret_values.MongoDB-hosts}',
  'username' = '${secret_values.MongoDB-username}',
  'password' = '${secret_values.MongoDB-password}',
  'database' = 'mongo_test',
  'collection' = 'platform_dimension'
);

//游戲銷售明細表
CREATE TEMPORARY TABLE IF NOT EXISTS game_sales_details
(
  sale_id       INT,
  game_id       INT,
  platform_id   INT,
  sale_date     STRING,
  units_sold    INT,
  sale_amt      INT,
  game_name     STRING,
  release_date  STRING,
  developer     STRING,
  publisher     STRING,
  platform_name STRING,
  type          STRING,
  PRIMARY KEY (sale_id) NOT ENFORCED
)
WITH (
  'connector' = 'hologres',
  'dbname' = 'test', --Hologres的數據庫名稱
  'tablename' = 'public.game_sales_details', --Hologres用于接收數據的表名稱
  'username' = '${secret_values.AccessKeyID}', --當前阿里云賬號的AccessKey ID
  'password' = '${secret_values.AccessKeySecret}', --當前阿里云賬號的AccessKey Secret
  'endpoint' = '${secret_values.Hologres-endpoint}', --當前Hologres實例VPC網絡的Endpoint
  'ignoredelete' = 'false', -- 是否忽略撤回消息。寬表merge需要關閉。
  'mutatetype' = 'insertorupdate', -- 更新模式。寬表merge需要開啟此參數,實現部分列更新。
  'partial-insert.enabled' = 'true', -- 是否只插入INSERT語句中定義的字段。寬表merge需要開啟此參數,實現部分列更新。
  'ignoreNullWhenUpdate' = 'true' --是否忽略更新寫入數據中的Null值。
);

INSERT INTO game_sales_details (
  sale_id,
  game_id,
  platform_id,
  sale_date,
  units_sold,
  sale_amt,
  game_name,
  release_date,
  developer,
  publisher,
  platform_name,
  type
)
select
  gsf.sale_id,
  gs.game_id,
  gs.platform_id,
  gs.sale_date,
  gs.units_sold,
  gs.sale_amt,
  gd.game_name,
  gd.release_date,
  gd.developer,
  gd.publisher,
  pd.platform_name,
  pd.type
from game_sales_fact as gsf
join game_sales FOR SYSTEM_TIME AS OF PROCTIME() as gs
on gsf.sale_id = gs.sale_id

join game_dimension FOR SYSTEM_TIME AS OF PROCTIME() as gd
on gs.game_id = gd.game_id

join platform_dimension FOR SYSTEM_TIME AS OF PROCTIME() as pd
on gs.platform_id = pd.platform_id;

步驟三:啟動作業

  1. 完成三個作業開發部署后,在左側導航選擇運維中心 > 作業運維,啟動三個作業。

  2. 作業運行后,前往Hologres控制臺,對明細表game_sales_details進行查詢。

    SELECT * FROM game_sales_details;

    此時game_sales_details表中插入了一條數據。

    image

步驟四:數據更新和查詢

對于銷售表和維度表的數據變更,能直接更新反饋到明細表上進行查詢分析。下面將列舉幾種常見的數據變更操作,以展示數據實時更新的情況。

銷售表數據變更

  1. 對MongoDB的game_sales表插入五條數據,并觀察Hologres的game_sale_details表的結果。

    db.game_sales.insert(
      [
    	{sale_id:1,game_id:101,platform_id:1,"sale_date":"2024-01-01",units_sold:500,sale_amt:2500},
    	{sale_id:2,game_id:102,platform_id:2,"sale_date":"2024-08-02",units_sold:400,sale_amt:2000},
    	{sale_id:3,game_id:103,platform_id:1,"sale_date":"2024-08-03",units_sold:300,sale_amt:1500},
    	{sale_id:4,game_id:101,platform_id:3,"sale_date":"2024-08-04",units_sold:200,sale_amt:1000},
    	{sale_id:5,game_id:104,platform_id:2,"sale_date":"2024-08-05",units_sold:100,sale_amt:3000}
      ]
    );

    查詢Hologres的game_sale_details表,可以看到,該表增加了五條相應的數據。

    image

  2. 將MongoDB表game_sales中sale_date=2024-01-01的數據均修改為2024-08-01。

    db.game_sales.updateMany({"sale_date": "2024-01-01"}, {$set: {"sale_date": "2024-08-01"}});

    而后查詢Hologres的game_sale_details表??梢钥吹剑摫碇衧ale_date=2024-01-01的數據都被修改成2024-08-01了。

    image

  3. 對MongoDB的game_sales表刪除數據,將日期為2024-08-01的數據刪除。

    db.game_sales.remove({"sale_date": "2024-08-01"});

    而后查詢Hologres的game_sale_details表??梢钥吹?,該表中刪除了兩條日期為2024-08-01的數據。

    image

維度表數據變更

  1. 對MongoDB的game_dimension表和platform_dimension表插入數據,插入新的游戲和新的平臺數據。

    //游戲維度表
    db.game_dimension.insert(
      [
    	{game_id:105,"game_name":"HSHWK","release_date":"2024-08-20","developer":"GameSC","publisher":"GameSC"},
    	{game_id:106,"game_name":"HPBUBG","release_date":"2018-01-01","developer":"BLUE","publisher":"KK"}
      ]
    );
    
    //平臺維度表
    db.platform_dimension.insert(
      [
    	{platform_id:4,"platform_name":"Steam","type":"PC"},
    	{platform_id:5,"platform_name":"Epic","type":"PC"}
      ]
    );

    新增游戲數據和新增平臺數據,并不會增加明細表數據。用戶需要產生購買或者下載行為才可以,所以繼續添加game_sales表的數據。

    // 游戲銷售表
    db.game_sales.insert(
      [
    	{sale_id:6,game_id:105,platform_id:4,"sale_date":"2024-09-01",units_sold:400,sale_amt:2000},
    	{sale_id:7,game_id:106,platform_id:1,"sale_date":"2024-09-01",units_sold:300,sale_amt:1500}
      ]
    );

    而后查詢Hologres的game_sale_details表,可以看到,明細表新增了兩條關于新游戲和新平臺的數據記錄。

    image

  2. 對MongoDB的game_dimension表和platform_dimension表更新數據,更新游戲信息和平臺信息。

    //更新開發日期
    db.game_dimension.updateMany({"release_date": "2018-01-01"}, {$set: {"release_date": "2024-01-01"}});
    
    //更新平臺類型
    db.platform_dimension.updateMany({"type": "PC"}, {$set: {"type": "Swich"}});

    可以看到相關的字段信息已經被修改了。

    image

相關文檔