日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

基于Flink+Hologres搭建實(shí)時(shí)數(shù)倉

更新時(shí)間:

使用Flink+Hologres搭建實(shí)時(shí)數(shù)倉可以充分利用Flink強(qiáng)大的實(shí)時(shí)處理能力和Hologres提供的Binlog、行列共存和資源強(qiáng)隔離等能力,實(shí)現(xiàn)高效、可擴(kuò)展的實(shí)時(shí)數(shù)據(jù)處理和分析,幫助您更好地應(yīng)對(duì)不斷增長的數(shù)據(jù)量和實(shí)時(shí)業(yè)務(wù)需求。本文介紹如何通過實(shí)時(shí)計(jì)算Flink版和實(shí)時(shí)數(shù)倉Hologres搭建實(shí)時(shí)數(shù)倉。

背景信息

隨著社會(huì)數(shù)字化發(fā)展,企業(yè)對(duì)數(shù)據(jù)時(shí)效性的需求越來越強(qiáng)烈。除傳統(tǒng)的面向海量數(shù)據(jù)加工場(chǎng)景設(shè)計(jì)的離線場(chǎng)景外,大量業(yè)務(wù)需要解決面向?qū)崟r(shí)加工、實(shí)時(shí)存儲(chǔ)、實(shí)時(shí)分析的實(shí)時(shí)場(chǎng)景問題。傳統(tǒng)離線數(shù)倉搭建的方法論比較明確,通過定時(shí)調(diào)度實(shí)現(xiàn)數(shù)倉分層(ODS->DWD->DWS->ADS);但對(duì)于實(shí)時(shí)數(shù)倉的搭建,目前缺乏明確的方法體系。基于Streaming Warehouse理念,實(shí)現(xiàn)數(shù)倉分層之間實(shí)時(shí)數(shù)據(jù)的高效流動(dòng),可以解決實(shí)時(shí)數(shù)倉分層問題。

方案架構(gòu)

實(shí)時(shí)計(jì)算Flink版是強(qiáng)大的流式計(jì)算引擎,支持對(duì)海量實(shí)時(shí)數(shù)據(jù)高效處理。Hologres是一站式實(shí)時(shí)數(shù)倉,支持?jǐn)?shù)據(jù)實(shí)時(shí)寫入與更新,實(shí)時(shí)數(shù)據(jù)寫入即可查。Hologres與Flink深度集成,能夠提供一體化的實(shí)時(shí)數(shù)倉聯(lián)合解決方案。本文基于Flink+Hologres搭建實(shí)時(shí)數(shù)倉的方案架構(gòu)如下:

  1. Flink將數(shù)據(jù)源寫入Hologres,形成ODS層。

  2. Flink訂閱ODS層的Binlog進(jìn)行加工,形成DWD層再次寫入Hologres。

  3. Flink訂閱DWD層的Binlog,通過計(jì)算形成DWS層,再次寫入Hologres。

  4. 最后由Hologres對(duì)外提供應(yīng)用查詢。

image.png

該方案有如下優(yōu)勢(shì):

  • Hologres的每一層數(shù)據(jù)都支持高效更新與修正、寫入即可查,解決了傳統(tǒng)實(shí)時(shí)數(shù)倉解決方案的中間層數(shù)據(jù)不易查、不易更新、不易修正的問題。

  • Hologres的每一層數(shù)據(jù)都可單獨(dú)對(duì)外提供服務(wù),數(shù)據(jù)的高效復(fù)用,真正實(shí)現(xiàn)數(shù)倉分層復(fù)用的目標(biāo)。

  • 模型統(tǒng)一,架構(gòu)簡化。實(shí)時(shí)ETL鏈路的邏輯是基于Flink SQL實(shí)現(xiàn)的;ODS層、DWD層和DWS層的數(shù)據(jù)統(tǒng)一存儲(chǔ)在Hologres中,可以降低架構(gòu)復(fù)雜度,提高數(shù)據(jù)處理效率。

該方案依賴于Hologres的3個(gè)核心能力,詳情如下表所示。

Hologres核心能力

詳情

Binlog

Hologres提供Binlog能力,用于驅(qū)動(dòng)Flink進(jìn)行實(shí)時(shí)計(jì)算,以此作為流式計(jì)算的上游。Hologres的Binlog能力詳情請(qǐng)參見訂閱Hologres Binlog

行列共存

Hologres支持行列共存的存儲(chǔ)格式。一張表同時(shí)存儲(chǔ)行存數(shù)據(jù)和列存數(shù)據(jù),并且兩份數(shù)據(jù)強(qiáng)一致。該特性保證中間層表不僅可以作為Flink的源表,也可以作為Flink的維表進(jìn)行主鍵點(diǎn)查與維表Join,還可以供其他應(yīng)用(OLAP、線上服務(wù)等)查詢。Hologres的行列共存能力詳情請(qǐng)參見表存儲(chǔ)格式:列存、行存、行列共存

資源強(qiáng)隔離

Hologres實(shí)例的負(fù)載較高時(shí),可能影響中間層的點(diǎn)查性能。Hologres支持通過主從實(shí)例讀寫分離部署(共享存儲(chǔ))計(jì)算組實(shí)例架構(gòu)實(shí)現(xiàn)資源強(qiáng)隔離,從而保證Flink對(duì)Hologres Binlog的數(shù)據(jù)拉取不影響線上服務(wù)。

實(shí)踐場(chǎng)景

本文以某個(gè)電商平臺(tái)為例,通過搭建一套實(shí)時(shí)數(shù)倉,實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)加工清洗和對(duì)接上層應(yīng)用數(shù)據(jù)查詢,形成實(shí)時(shí)數(shù)據(jù)的分層和復(fù)用,支撐各個(gè)業(yè)務(wù)方的報(bào)表查詢(交易大屏、行為數(shù)據(jù)分析、用戶畫像標(biāo)簽)以及個(gè)性化推薦等多個(gè)業(yè)務(wù)場(chǎng)景。

image.png

  1. 構(gòu)建ODS層:業(yè)務(wù)數(shù)據(jù)庫實(shí)時(shí)入倉

    MySQL有orders(訂單表),orders_pay(訂單支付表),product_catalog(商品類別字典表)3張業(yè)務(wù)表,這3張表通過Flink實(shí)時(shí)同步到Hologres中作為ODS層。

  2. 構(gòu)建DWD層:實(shí)時(shí)主題寬表

    將訂單表、商品類別字典表、訂單支付表進(jìn)行實(shí)時(shí)打?qū)挘蒁WD層寬表。

  3. 構(gòu)建DWS層:實(shí)時(shí)指標(biāo)計(jì)算

    實(shí)時(shí)消費(fèi)寬表的binlog,事件驅(qū)動(dòng)地聚合出相應(yīng)的DWS層指標(biāo)表。

注意事項(xiàng)

  • 僅實(shí)時(shí)計(jì)算引擎VVR 6.0.7及以上版本支持該實(shí)時(shí)數(shù)倉方案。

  • 僅1.3及以上版本的獨(dú)享Hologre實(shí)例支持該實(shí)時(shí)數(shù)倉方案。

  • 實(shí)時(shí)計(jì)算Flink版、RDS MySQL和Hologres需要在同一VPC。如果不在同一VPC,需要先打通跨VPC的網(wǎng)絡(luò)或者使用公網(wǎng)的形式訪問,詳情請(qǐng)參見如何訪問跨VPC的其他服務(wù)?Flink全托管如何訪問公網(wǎng)?

  • 通過RAM用戶或RAM角色等身份訪問實(shí)時(shí)計(jì)算Flink、Hologres和RDS MySQL資源時(shí),需要其具備對(duì)應(yīng)資源的權(quán)限。

準(zhǔn)備工作

創(chuàng)建RDS MySQL實(shí)例并準(zhǔn)備數(shù)據(jù)源

  1. 創(chuàng)建RDS MySQL實(shí)例,詳情請(qǐng)參見創(chuàng)建RDS MySQL實(shí)例

  2. 創(chuàng)建數(shù)據(jù)庫和賬號(hào)。

    為目標(biāo)實(shí)例創(chuàng)建名稱為order_dw的數(shù)據(jù)庫和具有對(duì)應(yīng)數(shù)據(jù)庫讀寫權(quán)限的普通賬號(hào)。具體操作請(qǐng)參見創(chuàng)建數(shù)據(jù)庫和賬號(hào)管理數(shù)據(jù)庫

  3. 準(zhǔn)備MySQL CDC數(shù)據(jù)源。

    1. 在目標(biāo)實(shí)例詳情頁面,單擊上方的登錄數(shù)據(jù)庫

    2. 在彈出的DMS頁面中,填寫創(chuàng)建的數(shù)據(jù)庫賬號(hào)名和密碼,然后單擊登錄

    3. 登錄成功后,在左側(cè)雙擊order_dw數(shù)據(jù)庫,切換數(shù)據(jù)庫。

    4. 在SQL Console區(qū)域編寫三張業(yè)務(wù)表的建表DDL以及插入的數(shù)據(jù)語句。

      CREATE TABLE `orders` (
        order_id bigint not null primary key,
        user_id varchar(50) not null,
        shop_id bigint not null,
        product_id bigint not null,
        buy_fee numeric(20,2) not null,   
        create_time timestamp not null,
        update_time timestamp not null default now(),
        state int not null 
      );
      
      
      CREATE TABLE `orders_pay` (
        pay_id bigint not null primary key,
        order_id bigint not null,
        pay_platform int not null,
        create_time timestamp not null
      );
      
      
      CREATE TABLE `product_catalog` (
        product_id bigint not null primary key,
        catalog_name varchar(50) not null
      );
      
      -- 準(zhǔn)備數(shù)據(jù)
      INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
      
      INSERT INTO orders VALUES
      (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
      (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
      (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
      (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
      (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
      (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
      (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
      
      INSERT INTO orders_pay VALUES
      (2001, 100001, 1, '2023-02-15 17:40:56'),
      (2002, 100002, 1, '2023-02-15 17:40:56'),
      (2003, 100003, 0, '2023-02-15 17:40:56'),
      (2004, 100004, 0, '2023-02-15 17:40:56'),
      (2005, 100005, 0, '2023-02-15 18:40:56'),
      (2006, 100006, 0, '2023-02-15 18:40:56'),
      (2007, 100007, 0, '2023-02-15 18:40:56');
  4. 單擊執(zhí)行,單擊直接執(zhí)行

創(chuàng)建Hologres實(shí)例和計(jì)算組

  1. 創(chuàng)建獨(dú)享Hologres實(shí)例,詳情請(qǐng)參見購買Hologres

    為了體驗(yàn)Hologres通過讀寫分離實(shí)現(xiàn)資源強(qiáng)隔離的核心能力,本文以計(jì)算組型實(shí)例為例為您進(jìn)行介紹。

  2. HoloWeb頁面連接目標(biāo)實(shí)例后,創(chuàng)建數(shù)據(jù)庫并授權(quán)。

    創(chuàng)建名為order_dw的數(shù)據(jù)庫(需要開啟簡單權(quán)限模型),并授予用戶admin權(quán)限。數(shù)據(jù)庫創(chuàng)建和授權(quán)操作,請(qǐng)參見DB管理

    說明
    • 如果在被授權(quán)賬號(hào)的下拉列表找不到對(duì)應(yīng)的賬號(hào),則說明該賬號(hào)并未添加至當(dāng)前實(shí)例,您需要前往用戶管理頁面添加用戶為SuperUser。

    • Hologres2.0之后版本默認(rèn)開啟binlog擴(kuò)展,無需手動(dòng)執(zhí)行。Hologres1.3版本在創(chuàng)建完數(shù)據(jù)庫后,需要執(zhí)行create extension hg_binlog命令才能開啟binlog擴(kuò)展。

  3. 新增計(jì)算組。

    您可以通過不同的計(jì)算組實(shí)現(xiàn)資源隔離,使用初始計(jì)算組init_warehouse用于寫入數(shù)據(jù),使用read_warehouse_1計(jì)算組用于服務(wù)查詢。

    預(yù)留計(jì)算資源會(huì)全部分配給初始計(jì)算組init_warehouse,需先減少計(jì)算組資源,再新增計(jì)算組。詳情請(qǐng)參見場(chǎng)景1:創(chuàng)建全新的計(jì)算組實(shí)例

    1. 單擊安全中心 > 計(jì)算組管理,確認(rèn)實(shí)例名為目標(biāo)實(shí)例名稱。

    2. 單擊已有資源組init_warehouse操作列下的調(diào)整配置,調(diào)小資源后單擊確認(rèn)

    3. 單擊新增計(jì)算組,新增名稱為read_warehouse_1的計(jì)算組,單擊確認(rèn)

創(chuàng)建Flink工作空間和Catalog

  1. 創(chuàng)建Flink工作空間,詳情請(qǐng)參見開通實(shí)時(shí)計(jì)算Flink版

  2. 登錄實(shí)時(shí)計(jì)算控制臺(tái),單擊目標(biāo)工作空間操作列下的控制臺(tái)。

  3. 創(chuàng)建Session集群,為后續(xù)創(chuàng)建Catalog和查詢腳本提供執(zhí)行環(huán)境,詳情請(qǐng)參見步驟一:創(chuàng)建Session集群

  4. 創(chuàng)建Hologres Catalog。

    數(shù)據(jù)開發(fā) > 數(shù)據(jù)查詢頁面的查詢腳本頁簽,將如下代碼拷貝到查詢腳本,并修改目標(biāo)參數(shù)取值,選中目標(biāo)片段后單擊左側(cè)代碼行上的運(yùn)行

    CREATE CATALOG dw WITH (
      'type' = 'hologres',
      'endpoint' = '<ENDPOINT>', 
      'username' = '<USERNAME>',
      'password' = '<PASSWORD>',
      'dbname' = 'order_dw@init_warehouse', --數(shù)據(jù)庫名稱,并指定連接init_warehouse計(jì)算組。
      'binlog' = 'true', -- 創(chuàng)建catalog時(shí)可以設(shè)置源表、維表和結(jié)果表支持的with參數(shù),之后在使用此catalog下的表時(shí)會(huì)默認(rèn)添加這些默認(rèn)參數(shù)。
      'sdkMode' = 'jdbc', -- 推薦使用jdbc模式。
      'cdcmode' = 'true',
      'connectionpoolname' = 'the_conn_pool',
      'ignoredelete' = 'true',  -- 寬表merge需要開啟,防止回撤。
      'partial-insert.enabled' = 'true', -- 寬表merge需要開啟此參數(shù),實(shí)現(xiàn)部分列更新。
      'mutateType' = 'insertOrUpdate', -- 寬表merge需要開啟此參數(shù),實(shí)現(xiàn)部分列更新。
      'table_property.binlog.level' = 'replica', --也可以在創(chuàng)建catalog時(shí)傳入持久化的hologres表屬性,之后創(chuàng)建表時(shí),默認(rèn)都開啟binlog。
      'table_property.binlog.ttl' = '259200'
    );

    您需要修改以下參數(shù)取值為您實(shí)際Hologres服務(wù)信息。

    參數(shù)

    說明

    備注

    endpoint

    Hologres的Endpoint地址。

    詳情請(qǐng)參見實(shí)例配置

    username

    阿里云賬號(hào)的AccessKey ID。

    當(dāng)前配置的AccessKey對(duì)應(yīng)的用戶需要能夠訪問所有的Hologres數(shù)據(jù)庫,Hologres數(shù)據(jù)庫權(quán)限請(qǐng)參見Hologres權(quán)限模型概述

    password

    阿里云賬號(hào)的AccessKey Secret。

    說明

    創(chuàng)建Catalog時(shí)可以設(shè)置默認(rèn)的源表、維表和結(jié)果表的WITH參數(shù),也可以設(shè)置創(chuàng)建Hologres物理表的默認(rèn)屬性,例如上方table_property開頭的參數(shù)。詳情請(qǐng)參見管理Hologres Catalog實(shí)時(shí)數(shù)倉Hologres WITH參數(shù)

  5. 創(chuàng)建MySQL Catalog。

    將如下代碼拷貝到查詢腳本,并修改目標(biāo)參數(shù)取值,選中目標(biāo)片段后單擊左側(cè)代碼行上的運(yùn)行

    CREATE CATALOG mysqlcatalog WITH(
      'type' = 'mysql',
      'hostname' = '<hostname>',
      'port' = '<port>',
      'username' = '<username>',
      'password' = '<password>',
      'default-database' = 'order_dw'
    );

    您需要修改以下參數(shù)取值為您實(shí)際的MySQL服務(wù)信息。

    參數(shù)

    說明

    hostname

    MySQL數(shù)據(jù)庫的IP地址或者Hostname。

    port

    MySQL數(shù)據(jù)庫服務(wù)的端口號(hào),默認(rèn)值為3306。

    username

    MySQL數(shù)據(jù)庫服務(wù)的用戶名。

    password

    MySQL數(shù)據(jù)庫服務(wù)的密碼。

搭建實(shí)時(shí)數(shù)倉

構(gòu)建ODS層:業(yè)務(wù)數(shù)據(jù)庫實(shí)時(shí)入倉

基于Catalog的CREATE DATABASE AS(CDAS)語句功能,可以一次性把ODS層建出來。ODS層一般不直接做OLAP或SERVING(KV點(diǎn)查),主要作為流式作業(yè)的事件驅(qū)動(dòng),開啟binlog即可滿足需求。Binlog是Hologres的核心能力之一,Hologres連接器也支持先全量讀取再增量消費(fèi)Binlog的全增量模式。

  1. 創(chuàng)建CDAS同步作業(yè)ODS。

    1. 數(shù)據(jù)開發(fā) > ETL頁面,新建名為ODS的SQL流作業(yè),并將如下代碼拷貝到SQL編輯器。

      CREATE DATABASE IF NOT EXISTS dw.order_dw   -- 創(chuàng)建catalog時(shí)設(shè)置了table_property.binlog.level參數(shù),因此通過CDAS創(chuàng)建的所有表都開啟了binlog。
      AS DATABASE mysqlcatalog.order_dw INCLUDING all tables -- 可以根據(jù)需要選擇上游數(shù)據(jù)庫需要入倉的表。
      /*+ OPTIONS('server-id'='8001-8004') */ ;   -- 指定mysql-cdc實(shí)例server-id范圍。
      說明
      • 本示例默認(rèn)將數(shù)據(jù)同步到數(shù)據(jù)庫order_dw的Public Schema下。您也可以將數(shù)據(jù)同步到Hologres目標(biāo)庫的指定Schema中,詳情請(qǐng)參見作為CDAS的目標(biāo)端Catalog,指定后使用Catalog時(shí)的表名格式也會(huì)發(fā)生變化,詳情請(qǐng)參見使用Hologres Catalog

      • 如果源表的數(shù)據(jù)結(jié)構(gòu)發(fā)生變化,則需要等待源表的數(shù)據(jù)出現(xiàn)變更(刪除、插入、更新),結(jié)果表的數(shù)據(jù)結(jié)構(gòu)才會(huì)看到變化。

    2. 單擊右上方的部署,進(jìn)行作業(yè)部署。

    3. 單擊左側(cè)導(dǎo)航欄的運(yùn)維中心 > 作業(yè)運(yùn)維,單擊剛剛部署的ODS作業(yè)操作列的啟動(dòng),選擇無狀態(tài)啟動(dòng)后單擊啟動(dòng)

  2. 向計(jì)算組加載數(shù)據(jù)。

    Table Group是Hologres中數(shù)據(jù)的載體。使用read_warehouse_1查詢order_dw數(shù)據(jù)庫中Table Group(本示例為order_dw_tg_default)的數(shù)據(jù)時(shí),為計(jì)算組read_warehouse_1加載order_dw_tg_default,以實(shí)現(xiàn)使用init_warehouse計(jì)算組寫入數(shù)據(jù),使用read_warehouse_1計(jì)算組進(jìn)行服務(wù)查詢。

    HoloWeb開發(fā)頁單擊SQL編輯器,確認(rèn)實(shí)例名和數(shù)據(jù)庫名稱后,執(zhí)行如下命令。更多詳情請(qǐng)參見場(chǎng)景1:創(chuàng)建全新的計(jì)算組實(shí)例。加載后,可以查看到read_warehouse_1已經(jīng)加載了order_dw_tg_default Table Group的數(shù)據(jù)。

    --查看當(dāng)前數(shù)據(jù)庫有哪些Table Group
    SELECT tablegroup_name FROM hologres.hg_table_group_properties GROUP BY tablegroup_name;
    
    --為計(jì)算組加載Table Group
    CALL hg_table_group_load_to_warehouse ('order_dw.order_dw_tg_default', 'read_warehouse_1', 1);
    
    --查看計(jì)算組加載Table Group的情況
    select * from hologres.hg_warehouse_table_groups;
  3. 在右上角切換計(jì)算組為read_warehouse_1,后續(xù)使用read_warehouse_1進(jìn)行查詢分析。

    image

  4. HoloWeb中執(zhí)行如下命令,查看MySQL同步到Hologres的3張表數(shù)據(jù)。

    ---查orders中的數(shù)據(jù)。
    SELECT * FROM orders;
    
    ---查orders_pay中的數(shù)據(jù)。
    SELECT * FROM orders_pay;
    
    ---查product_catalog中的數(shù)據(jù)。
    SELECT * FROM product_catalog;

    image.png

構(gòu)建DWD層:實(shí)時(shí)主題寬表

構(gòu)建DWD層用到了Hologres連接器特有的部分列更新能力,可以使用INSERT DML方便地表達(dá)部分列更新的語義。作業(yè)中需要對(duì)不同的維表進(jìn)行查詢,是基于Hologres行存以及行列共存表提供的高性能的點(diǎn)查能力。同時(shí),Hologres資源強(qiáng)隔離的架構(gòu),可以保證寫入、讀取、分析等作業(yè)之間互不干擾。

  1. 通過Flink Catalog功能在Hologres中建DWD層的寬表dwd_orders。

    數(shù)據(jù)開發(fā) > 數(shù)據(jù)查詢頁面的查詢腳本頁簽,將如下代碼拷貝到查詢腳本后,選中目標(biāo)片段后單擊左側(cè)代碼行上的運(yùn)行

    -- 寬表字段要nullable,因?yàn)椴煌牧鲗懭氲酵粡埥Y(jié)果表,每一列都可能出現(xiàn)null的情況。
    CREATE TABLE dw.order_dw.dwd_orders (
      order_id bigint not null,
      order_user_id string,
      order_shop_id bigint,
      order_product_id bigint,
      order_product_catalog_name string,
      order_fee numeric(20,2),
      order_create_time timestamp,
      order_update_time timestamp,
      order_state int,
      pay_id bigint,
      pay_platform int comment 'platform 0: phone, 1: pc', 
      pay_create_time timestamp,
      PRIMARY KEY(order_id) NOT ENFORCED
    );
    
    -- 支持通過catalog修改Hologres物理表屬性。
    ALTER TABLE dw.order_dw.dwd_orders SET (
      'table_property.binlog.ttl' = '604800' --修改binlog的超時(shí)時(shí)間為一周。
    );
  2. 實(shí)現(xiàn)實(shí)時(shí)消費(fèi)ODS層orders、orders_pay表的binlog。

    數(shù)據(jù)開發(fā) > ETL頁面,新建名為DWD的SQL流作業(yè),并將如下代碼拷貝到SQL編輯器后,部署啟動(dòng)作業(yè)。通過如下SQL作業(yè),orders表會(huì)與product_catalog表進(jìn)行維表關(guān)聯(lián),將最終結(jié)果寫入dwd_orders表中,實(shí)現(xiàn)數(shù)據(jù)的實(shí)時(shí)打?qū)挕?/p>

    BEGIN STATEMENT SET;
    
    INSERT INTO dw.order_dw.dwd_orders 
     (
       order_id,
       order_user_id,
       order_shop_id,
       order_product_id,
       order_fee,
       order_create_time,
       order_update_time,
       order_state,
       order_product_catalog_name
     ) SELECT o.*, dim.catalog_name 
       FROM dw.order_dw.orders as o
       LEFT JOIN dw.order_dw.product_catalog FOR SYSTEM_TIME AS OF proctime() AS dim
       ON o.product_id = dim.product_id;
    
    INSERT INTO dw.order_dw.dwd_orders 
      (pay_id, order_id, pay_platform, pay_create_time)
       SELECT * FROM dw.order_dw.orders_pay;
    
    END;
  3. 查看寬表dwd_orders數(shù)據(jù)。

    HoloWeb開發(fā)頁面連接Hologres實(shí)例并登錄目標(biāo)數(shù)據(jù)庫后,在SQL編輯器上執(zhí)行如下命令。

    SELECT * FROM dwd_orders;

    image

構(gòu)建DWS層:實(shí)時(shí)指標(biāo)計(jì)算

  1. 通過Flink Catalog功能,在Hologres中創(chuàng)建dws層的聚合dws_users以及dws_shops。

    數(shù)據(jù)開發(fā) > 數(shù)據(jù)查詢頁面的查詢腳本頁簽,將如下代碼拷貝到查詢腳本后,選中目標(biāo)片段后單擊左側(cè)代碼行上的運(yùn)行

    -- 用戶維度聚合指標(biāo)表。
    CREATE TABLE dw.order_dw.dws_users (
      user_id string not null,
      ds string not null,
      paied_buy_fee_sum numeric(20,2) not null comment '當(dāng)日完成支付的總金額',
      primary key(user_id,ds) NOT ENFORCED
    );
    
    -- 商戶維度聚合指標(biāo)表。
    CREATE TABLE dw.order_dw.dws_shops (
      shop_id bigint not null,
      ds string not null,
      paied_buy_fee_sum numeric(20,2) not null comment '當(dāng)日完成支付總金額',
      primary key(shop_id,ds) NOT ENFORCED
    );
  2. 實(shí)時(shí)消費(fèi)DWD層的寬表dw.order_dw.dwd_orders,在Flink中做聚合計(jì)算,最終寫入Hologres中的DWS表。

    數(shù)據(jù)開發(fā) > ETL頁面,新建名為DWS的SQL流作業(yè),并將如下代碼拷貝到SQL編輯器后,部署啟動(dòng)作業(yè)。

    BEGIN STATEMENT SET;
    
    INSERT INTO dw.order_dw.dws_users
      SELECT 
        order_user_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        SUM (order_fee)
        FROM dw.order_dw.dwd_orders c
        WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 訂單流和支付流數(shù)據(jù)都已寫入寬表。
        GROUP BY order_user_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
    
    INSERT INTO dw.order_dw.dws_shops
      SELECT 
        order_shop_id,
        DATE_FORMAT (pay_create_time, 'yyyyMMdd') as ds,
        SUM (order_fee)
       FROM dw.order_dw.dwd_orders c
       WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL -- 訂單流和支付流數(shù)據(jù)都已寫入寬表。
       GROUP BY order_shop_id, DATE_FORMAT (pay_create_time, 'yyyyMMdd');
    END;
  3. 查看DWS層的聚合結(jié)果,其結(jié)果會(huì)根據(jù)上游數(shù)據(jù)的變更實(shí)時(shí)更新。

    1. 在Hologres控制臺(tái)查看變更前數(shù)據(jù)

      • 查詢dws_users表結(jié)果。

        SELECT * FROM dws_users;

        image

      • 查詢dws_shops表結(jié)果。

        SELECT * FROM dws_shops;

        image

    2. 在RDS控制臺(tái)向order_dw數(shù)據(jù)庫orders和orders_pay表中分別插入1條新數(shù)據(jù)。

      INSERT INTO orders VALUES
      (100008, 'user_003', 12345, 5, 6000.02, '2023-02-15 09:40:56', '2023-02-15 18:42:56', 1);
      
      INSERT INTO orders_pay VALUES
      (2008, 100008, 1, '2023-02-15 19:40:56');
    3. 在Hologres控制臺(tái)查看變更后的數(shù)據(jù)。

      • dwd_orders表

        image

      • dws_users表

        image

      • dws_shops表

        image

數(shù)據(jù)探查

因?yàn)殚_啟了Binlog,所以可直接探查到數(shù)據(jù)的變化情況。如果對(duì)中間結(jié)果需要即席(Ad-hoc)性質(zhì)的業(yè)務(wù)數(shù)據(jù)探查,或者對(duì)最終計(jì)算結(jié)果進(jìn)行數(shù)據(jù)正確性排查,此方案的每一層數(shù)據(jù)都實(shí)現(xiàn)了持久化,可以便捷地探查中間過程。

  • 流模式探查

    1. 新建并啟動(dòng)數(shù)據(jù)探查流作業(yè)。

      數(shù)據(jù)開發(fā) > ETL頁面,新建名為Data-exploration的SQL流作業(yè),并將如下代碼拷貝到SQL編輯器后,部署啟動(dòng)作業(yè)。

      -- 流模式探查,打印到print可以看到數(shù)據(jù)的變化情況。
      CREATE TEMPORARY TABLE print_sink(
        order_id bigint not null,
        order_user_id string,
        order_shop_id bigint,
        order_product_id bigint,
        order_product_catalog_name string,
        order_fee numeric(20,2),
        order_create_time timestamp,
        order_update_time timestamp,
        order_state int,
        pay_id bigint,
        pay_platform int,
        pay_create_time timestamp,
        PRIMARY KEY(order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO print_sink SELECT *
      FROM dw.order_dw.dwd_orders /*+ OPTIONS('startTime'='2023-02-15 12:00:00') */ --這里的startTime是binlog生成的時(shí)間
      WHERE order_user_id = 'user_001';
    2. 查看數(shù)據(jù)探查結(jié)果。

      運(yùn)維中心 > 作業(yè)運(yùn)維詳情頁面,單擊目標(biāo)作業(yè)名稱,在作業(yè)日志頁簽下左側(cè)運(yùn)行日志頁簽,單擊運(yùn)行Task Managers頁簽下的Path, ID。在Stdout頁面搜索user_001相關(guān)的日志信息。

      image.png

  • 批模式探查

    數(shù)據(jù)開發(fā) > ETL頁面,創(chuàng)建SQL流作業(yè),并將如下代碼拷貝到SQL編輯器后,單擊調(diào)試。詳情請(qǐng)參見作業(yè)調(diào)試

    批模式探查是獲取當(dāng)前時(shí)刻的終態(tài)數(shù)據(jù),在Flink作業(yè)開發(fā)界面調(diào)試結(jié)果如下圖所示。

    SELECT *
    FROM dw.order_dw.dwd_orders /*+ OPTIONS('binlog'='false') */ 
    WHERE order_user_id = 'user_001' and order_create_time > '2023-02-15 12:00:00'; --批量模式支持filter下推,提升批作業(yè)執(zhí)行效率。

    image.png

使用實(shí)時(shí)數(shù)倉

上一小節(jié)展示了通過Flink Catalog,可以僅在Flink側(cè)搭建一個(gè)基于Flink和Hologres的Streaming Warehouse實(shí)時(shí)分層數(shù)倉。本節(jié)則展示數(shù)倉搭建完成之后的一些簡單應(yīng)用場(chǎng)景。

Key-Value服務(wù)

根據(jù)主鍵查詢DWS層的聚合指標(biāo)表,支持百萬級(jí)RPS。

HoloWeb開發(fā)頁面查詢指定用戶指定日期的消費(fèi)額的代碼示例如下。

-- holo sql
SELECT * FROM dws_users WHERE user_id ='user_001' AND ds = '20230215';

image.png

明細(xì)查詢

對(duì)DWD層寬表進(jìn)行OLAP分析。

HoloWeb開發(fā)頁面查詢某個(gè)客戶23年2月特定支付平臺(tái)支付的訂單明細(xì)的代碼示例如下。

-- holo sql
SELECT * FROM dwd_orders
WHERE order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
AND order_user_id = 'user_001'
AND pay_platform = 0
ORDER BY order_create_time LIMIT 100;

image.png

實(shí)時(shí)報(bào)表

基于DWD層寬表數(shù)據(jù)展示實(shí)時(shí)報(bào)表,Hologres的行列共存以及列存表有非常優(yōu)秀的OLAP分析能力,支持秒級(jí)響應(yīng)。

HoloWeb開發(fā)頁面查詢23年2月內(nèi)每個(gè)品類的訂單總量和訂單總金額的代碼示例如下。

-- holo sql
SELECT
  TO_CHAR(order_create_time, 'YYYYMMDD') AS order_create_date,
  order_product_catalog_name,
  COUNT(*),
  SUM(order_fee)
FROM
  dwd_orders
WHERE
  order_create_time >= '2023-02-01 00:00:00'  and order_create_time < '2023-03-01 00:00:00'
GROUP BY
  order_create_date, order_product_catalog_name
ORDER BY
  order_create_date, order_product_catalog_name;

image.png

相關(guān)文檔