日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

GitHub近實時數據同步以及增全量數據一體化分析

本文介紹如何通過MaxCompute實現GitHub近實時數據同步以及增全量數據一體化分析。

方案概述

基于GitHub Archive公開數據集,通過DataWorks數據集成、FlinkCDC和Flink等多種實時數據寫入方式,將GitHub中的項目、行為等超過十種事件類型的數據實時采集至MaxCompute進行增全量更新。借助MCQA 2.0的資源隔離能力,同時構建批處理資源組(QuotaA組)與交互式資源組(QuotaB組),從而實現MaxCompute中增全量數據的寫入與更新,并同時進行交互式查詢分析。此外,通過使用TopConsole和DataWorks Notebook,從開發者、項目和編程語言等多個維度,對GitHub實時數據的變化情況進行深入分析與挖掘。

方案架構與優勢

image.png

以上圖示基于典型數據分析場景設計,可以滿足當日近實時數據的寫入以及歷史離線數據的更新與查詢分析場景。

  • 實現增全量數據的統一校正(包括數據聚合、去重和反作弊等),定期將全量數據回寫至DWD,并對應地更新DWS和ADS的增量MV。通過FlinkCDC和DataWorks實時數據集成,將數據寫入Delta Table的增量數據表,實現增量查詢與更新。

  • 實現交互式數據查詢(DWS/ADS層),支持增量物化視圖(MV)在DWS和ADS中自動刷新,以確保數據的時效性。同時,與TopConsole和DataWorks對接,以便進行數據查詢和展示。

  • 借助MCQA 2.0查詢加速引擎,在資源隔離架構下配置不同的Quota組,以分別支持增量數據計算和交互式查詢分析場景。

近實時數倉-Delta Table增量表格式

針對分鐘級或者小時級的近實時數據處理疊加海量數據批處理的場景,MaxCompute基于Delta Table的統一表格式特性,提供近實時的增全量一體的數據存儲和計算解決方案,支持分鐘級數據實時Upsert寫入和TimeTravel數據回溯等能力。其核心特性包括:

  • 支持近實時寫入,并且能夠實現Checkpoint間隔達到分鐘級別以內。

  • 支持SQL近實時查詢(Incremental Query),且在完成寫入后,可在分鐘級別內進行查詢。

  • 通過StorageService、AutoCompaction和AutoSorting功能,實現對數據文件的自動管理。

近實時數倉-增量計算&增量物化視圖(Incremental MV)

MaxCompute的增量計算結合了CDC和Stream增量查詢能力,使用戶可以通過自定義SQL來構建自己的增量數據處理鏈路。增量物化視圖(MV)可有效地構建增量計算模型,用戶只需使用聲明式SQL表達預期的數據結果,便可通過配置不同的刷新參數來指定刷新頻率或數據新鮮度,后臺引擎將自動進行增量刷新和內部優化,從而實現近實時數據分析Pipeline。其主要核心特性包括:

  • 聲明式SQL。

  • 增量與全量數據一體化,支持統一的SQL、存儲和計算。

  • 增量物化視圖(MV)支持智能Pipeline編排 。

  • 增量CDC應用,支持周期性任務及流處理特性。

  • 數據新鮮度,提供實時或自定義的增量數據刷新。公式為:MV(T1) = delta(T0, T1) + MV(T0)

近實時數倉-MCQA2.0查詢加速

MaxCompute的MCQA2.0查詢加速引擎旨在滿足對性能、隔離性以及穩定性有更高要求的業務需求。它構建類似Virtual Warehouse的資源隔離管理引擎,從而顯著提升了交互式查詢的性能。支持租戶級獨占計算資源,使用多線程Pipeline執行,以充分利用精準獨享的計算資源管理。此外,還支持全鏈路的Cache能力,全類型的SQL作業、屏顯作業以及DDL、DML作業等特性。

  • 支持單租戶環境下構建多Quota組進行資源隔離,并支持對Quota組進行交互式查詢。

  • 支持分時資源的分組管理。

  • 交互式查詢性能加速,與上一版本相比,性能提升1倍。

操作視頻

操作步驟

步驟一:MaxCompute項目準備

步驟

操作

預期結果

Step 0

注冊MaxCompute新功能邀測申請表單

  1. 打開MaxCompute新功能邀測申請表單

  2. 選中Delta Table增量表格式基于增量物化視圖的增量計算MCQA2.0 SQL引擎查詢加速image

  3. 單擊注冊

開通MaxCompute新功能。

Step 1

初始化MaxCompute新項目

  1. 打開MaxCompute控制臺,單擊新建項目

  2. 新建項目對話框中配置如下參數。

    項目名稱設置為delta_compute_yunqi項目類型選擇云棲新功能邀測項目,其他參數按需設置或保持默認。image (2)

    地域可選擇北京、上海或杭州。

  3. 并單擊確定

  1. 在控制臺左側導航欄中,選擇工作區 > SQL分析,打開臨時SQL分析頁面。

  2. SQL分析頁面左側,可以看到新建項目delta_compute_yunqi2024-09-18_11-31-45.png

Step 2

配額(Quota)管理上新購Quota

  1. 在MaxCompute控制臺左側導航欄中,選擇工作區 > 配額(Quota)管理,單擊新購Quota

  2. 創建一個包年包月的一級Quota,并根據頁面提示完成支付。

  3. 在已創建的一級Quota組下,創建2個不同二級Quota組:

    • 批處理Quota組:batch_demo_yunqi

    • 交互式Quota組:mcqa2_demo_yunqi

    截屏2024-09-13 17

  4. 分別設置批處理Quota組和交互式Quota組資源。image (5)

在配額(Quota)管理頁面,確認批處理Quota組和交互式Quota組已成功創建。截屏2024-09-03 21.50.19.png

Step 3

新建MaxCompute Delta Table增量表

SQL分析頁面,執行如下示例命令,創建2個MaxCompute內部表:

  • 創建yunqi_github_events_odps_cdc表,后續會將數據實時通過FlinkCDC寫入至該表中。

    --設置ODPS SQL支持upsertable
    SET odps.sql.upsertable.table.enable=true;
    SET odps.storage.orc.enable.memcmp.sort.key=true; 
    
    --FlinkCDC
    CREATE TABLE IF NOT EXISTS yunqi_github_events_odps_cdc
    (
        id                     BIGINT NOT NULL COMMENT '事件ID'
        ,actor_id              BIGINT COMMENT '事件發起人ID'
        ,actor_login           STRING COMMENT '事件發起人登錄名'
        ,repo_id               BIGINT COMMENT 'repoID'
        ,repo_name             STRING COMMENT 'repo全名:owner/Repository_name'
        ,org_id                BIGINT COMMENT 'repo所屬組織ID'
        ,org_login             STRING COMMENT 'repo所屬組織名稱'
        ,`type`                STRING COMMENT '事件類型'
        ,created_at            TIMESTAMP NOT NULL COMMENT '事件發生時間'
        ,action                STRING COMMENT '事件行為'
        ,commit_id             STRING COMMENT '提交記錄ID'
        ,member_id             BIGINT COMMENT '成員ID'
        ,language              STRING COMMENT '編程語言'
        ,PRIMARY KEY(id, created_at))
    STORED AS ALIORC  
    TBLPROPERTIES ('acid.data.retain.hours'='24',
         'acid.incremental.query.out.of.time.range.enabled'='true',
         'columnar.nested.type'='true',
         'transactional'='true',
         'write.bucket.num'='64',
         'acid.ingest.commit.num.check.limit'='10',
         'cdc.insert.into.passthrough.enable'='true', 
         'acid.cdc.mode.enable' = 'true',--打開異步的話,就要額外設置
         'acid.cdc.build.async'='true', 
         'acid.cdc.build.interval'='60') 
    LIFECYCLE 36500;
    
  • 創建yunqi_github_events_odps_dw表,后續會將數據實時通過DataWorks數據集成寫入至該表中。

    --設置ODPS SQL支持upsertable
    SET odps.sql.upsertable.table.enable=true;
    SET odps.storage.orc.enable.memcmp.sort.key=true; 
    
    --DataWorks實時數據同步
    CREATE TABLE IF NOT EXISTS yunqi_github_events_odps_dw
    (
        id                     BIGINT NOT NULL COMMENT '事件ID'
        ,actor_id              BIGINT COMMENT '事件發起人ID'
        ,actor_login           STRING COMMENT '事件發起人登錄名'
        ,repo_id               BIGINT COMMENT 'repoID'
        ,repo_name             STRING COMMENT 'repo全名:owner/Repository_name'
        ,org_id                BIGINT COMMENT 'repo所屬組織ID'
        ,org_login             STRING COMMENT 'repo所屬組織名稱'
        ,`type`                STRING COMMENT '事件類型'
        ,created_at            TIMESTAMP COMMENT '事件發生時間'
        ,action                STRING COMMENT '事件行為'
        ,commit_id             STRING COMMENT '提交記錄ID'
        ,member_id             BIGINT COMMENT '成員ID'
        ,language              STRING COMMENT '編程語言'
        ,PRIMARY KEY(id))
    STORED AS ALIORC  
    TBLPROPERTIES ('acid.data.retain.hours'='24',
         'acid.incremental.query.out.of.time.range.enabled'='true',
         'columnar.nested.type'='true',
         'transactional'='true',
         'write.bucket.num'='64',
         'acid.ingest.commit.num.check.limit'='10',
         'cdc.insert.into.passthrough.enable'='true', 
         'acid.cdc.mode.enable' = 'true',--默認同步CDC
         'acid.cdc.build.async'='true', --打開異步的話,就要額外設置
         'acid.cdc.build.interval'='60') 
    LIFECYCLE 36500;

SQL分析頁面,選擇項目delta_compute_yunqi,確認以下兩個Delta Table表已存在:

  • yunqi_github_events_odps_cdc

  • yunqi_github_events_odps_dw

步驟二:實時增全量數據寫入

通過FlinkCDC或DataWorks數據集成能力獲取基于GitHub Archive公開實時數據集。

您可以按需選擇以下任意一種方式寫入數據。

Flink CDC

步驟

操作

預期結果

Step 1

實時MySQL數據源對接與驗證

MySQL數據源連接信息格式如下。

配置模式: 選擇連接串模式。
JDBC連接地址: 單擊新增地址,配置信息如下:
主機地址IP:
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com
端口號:3306

輸入數據庫名稱后,完整的JDBC URL為
jdbc:mysql://rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com:3306/github_events_share
數據庫名稱: github_events_share
用戶名: workshop
密碼: workshop#2017
此密碼僅為本教程示例,請勿在實際業務中使用。
說明

請確保MySQL數據源可以通過公網訪問,如無法通過公網訪問,可以配置公網NAT網關。

確認公開Github數據源可用。

--MySQL PublicData

id  '事件ID';
actor_id  '事件發起人ID';
actor_login  '事件發起人登錄名';
repo_id  'repoID';
repo_name  'repo名稱';
org_id  'repo所屬組織ID';
org_login  'repo所屬組織名稱';
type  '事件類型';
created_at  '事件發生時間';
action  '事件行為';
commit_id  '提交記錄ID';
member_id  '成員ID';
language  '編程語言';

Step 2

通過Flink

CDC寫入數據

實時GitHub Event數據通過FlinkCDC實時數據寫入MaxCompute Delta Table增量表,更多信息,請參見利用Flink CDC實現數據同步至Delta Table

  1. 選擇數據鏈接器。

  2. 編寫任務配置YAML文件。整庫同步的示例文件rds-to-maxcompute.yaml如下。

    # Author:         openlake@test.aliyunid.com
    # Created Time:   2024-08-27 12:46:11
    # Description:    Write your description here
    
    source:
      type: mysql
      hostname: rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com
      port: 3306
      username: workshop
      password: workshop#2017
      tables: github_events_share.\.*
      server-id: 5400-5404
    
    sink:
       type: maxcompute
       name: MaxComputeSink
       accessId: xxxxxxxxxxxxxxxxxxxxxx
       accessKey: xxxxxxxxxxxxxxxxxxxxxxx
       endpoint: http://xxx.xxx.xxx.xxx:8008
       tunnelEndpoint: http://xxx.xxx.xxx.xxx:8009
       project: delta_compute_yunqi
       bucketSize: 8
    
    pipeline:
       parallelism: 4
  3. 執行flink-cdc命令,啟動數據實時同步。

    ./bin/flink-cdc.sh rds-to-maxcompute.yaml

查看MaxCompute的yunqi_github_events_odps_cdc數據表的數據變化。

SELECT COUNT(1) FROM yunqi_github_events_odps_cdc; 

DataWorks數據集成

步驟

操作

預期結果

Step 1

實時MySQL數據源對接與驗證

MySQL數據源連接信息格式如下。

配置模式: 選擇連接串模式。
JDBC連接地址: 單擊新增地址,配置信息如下:
主機地址IP:
rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com
端口號:3306

輸入數據庫名稱后,完整的JDBC URL為
jdbc:mysql://rm-bp1z69dodhh85z9qa.mysql.rds.aliyuncs.com:3306/github_events_share
數據庫名稱: github_events_share
用戶名: workshop
密碼: workshop#2017
此密碼僅為本教程示例,請勿在實際業務中使用。
說明

請確保MySQL數據源可以通過公網訪問,如無法通過公網訪問,可以配置公網NAT網關。

確認公開Github數據源可用。

--MySQL PublicData

id  '事件ID';
actor_id  '事件發起人ID';
actor_login  '事件發起人登錄名';
repo_id  'repoID';
repo_name  'repo名稱';
org_id  'repo所屬組織ID';
org_login  'repo所屬組織名稱';
type  '事件類型';
created_at  '事件發生時間';
action  '事件行為';
commit_id  '提交記錄ID';
member_id  '成員ID';
language  '編程語言';

Step 2

配置DataWorks實時數據源

實時GitHub Event數據通過DataWorks數據集成實時數據寫入MaxCompute Delta Table增量表(實時數據源)。

  1. 創建同步任務所需的數據源。

    登錄DataWorks控制臺,切換至北京或上海地域,在左側導航欄選擇數據開發與治理 > 數據集成

    2024-09-18_14-29-21.png

    您可以選擇已有工作空間或新建工作空間,然后單擊進入數據集成

  2. DataWorks數據集成頁面,單擊左側導航欄中的數據源,單擊新增數據源

  3. 新增數據源對話框中,根據界面引導分配創建MySQL和MaxCompute數據源。

    • MySQL配置頁面示例如下,填入Step 1中提供的MySQL數據源信息。更多說明,請參見MySQL數據源截屏2024-09-13 20.29.12.png

    • MaxCompute配置頁面示例如下,更多說明,請參見創建MaxCompute數據源1111111.png

確認來源和去向數據源聯通正常。

Step 3

DataWorks實時任務同步

實時GitHub Event數據通過DataWorks數據集成實時數據寫入MaxCompute Delta Table增量表。

  1. 在DataWorks數據集成頁面,單擊左側導航欄中的同步任務,然后單擊新建同步任務

  2. 新建同步任務頁面,配置以下信息。

    • 數據來源選擇MySQL。

    • 數據去向選擇MaxCompute。

    • 示例任務名為sync_mysql_to_odps。

    • 同步類型為整庫實時

    • 資源組選擇開通DataWorks時創建的資源組,占用的CU量配置為5CU

      說明
      • 為保持公共數據源連接穩定,資源組與公共MySQL數據源創建連接后7天將進行釋放,不影響資源組與您的MySQL創建的連接。

      • 使用新版資源組運行數據集成整庫任務,最低要求配置2CU,詳情請參見Serverless資源組計費

    • 來源數據源選擇mysql_github_source。

    • 去向數據源選擇delta_compute_demo。

    2222.png

  3. 單擊測試連通性,保障數據源與資源組網絡連通。更多網絡連通介紹,請參見網絡連通方案

  4. 測試通過后,單擊下一步

  5. 選擇要同步的庫表頁面,選中MySQL中的github表,移動至已選庫表5eecdaf48460cde5216439d7342f912e6fbae11900bdcd4058e70b814913bc360a414d3de9277d871abf3af1cbd752494530dd9cb3fcc75c56ea1a4ad9c1ccc252b12b1c048286d7ebcf1c5741ba735290cf61cee1c45abefc653b69905bac42.png

  6. 目標表映射區域,勾選對應表,單擊批量刷新映射。基于上述已創建的MaxCompute Delta表,將目標表名改為 yunqi_github_events_odps_dw,單擊完成配置

    image

  7. 任務列表頁面啟動任務,查看執行詳情。

    公共數據源MySQL中保留近7天數據,離線數據將通過全量進行同步,實時數據將在全量初始化完成后,實時寫入MaxCompute。

    image

  8. 待數據同步成功后,前往MaxCompute TopConsole進行近實時數據分析。

    image

查看MaxCompute的yunqi_github_events_odps_dw數據表的數據變化。

SELECT COUNT(1) FROM yunqi_github_events_odps_dw; 

步驟三:近實時查詢分析和增量計算

  • 近實時數據分析:使用交互式Quota組-mcqa2_demo_yunqi(64CU)在MaxCompute TopConsole SQL數據分析。

  • 增量MV-自動化動態表:使用批處理Quota組-batch_demo_yunqi(128CU)在MaxCompute TopConsole進行動態表增量計算。

  • 增量計算-CDC/Stream/周期性Tasks:使用批處理Quota組-batch_demo_yunqi在MaxCompute TopConsole進行自定義增量計算。

步驟

操作

預期結果

Step 1

使用交互式資源組-近實時數據分析

  1. 在MaxCompute控制臺左側導航欄中,選擇工作區 > SQL分析,打開臨時SQL分析頁面。

  2. 選擇步驟一創建的交互式Quota組,進行近實時查詢分析。555.png

具體SQL分析步驟如下:

  • Q1:過去24小時最活躍項目

    --Q1:過去24小時最活躍項目
    SELECT
        repo_name,
        COUNT(*) AS events
    FROM
        yunqi_github_events_odps_dw
    WHERE
        date_add(created_at,-1)>'2024-08-26'
    GROUP BY
        repo_name
    ORDER BY
        events DESC
    LIMIT 5;
  • Q2:過去24小時最活躍開發者

    --Q2:過去24小時最活躍開發者
    SELECT
        actor_login,
        COUNT(*) AS events
    FROM
        yunqi_github_events_odps_dw
    WHERE
        date_add(created_at,-1)>'2024-08-26'
    GROUP BY
        actor_login
    ORDER BY
        events 
    LIMIT 5;
  • Q3:今日公開事件總數

    --Q3:今日公開事件總數
    SELECT count(*) FROM yunqi_github_events_odps_dw WHERE TO_DATE(created_at) >= date_add(now(),-1);
  • Q4:實時事件展示

    --Q4:實時事件展示
    SELECT
        cast(created_at as STRING ),
        actor_login,
        type,
        repo_name
    FROM
        yunqi_github_events_odps_dw
    ORDER BY
        created_at DESC
    LIMIT 5;

觀察查詢SQL結果。

Step 2

使用批處理資源組-增量計算-增量物化視圖MV聚合查詢

  1. 在MaxCompute控制臺左側導航欄中,選擇工作區 > SQL分析,打開臨時SQL分析頁面。

  2. 選擇步驟一創建的批處理Quota組,批量MV應用。666.png

具體SQL分析步驟如下:

  1. 確認Delta Table數據表已生成,歷史和增量數據更新中。

    創建跟蹤Delta Table數據表的CDC信息 (可選:默認CDC自動生成)。

    --Q1: 創建跟蹤Delta Table數據表的CDC信息
    ALTER TABLE yunqi_github_events_odps_dw build cdc; --觸發繼續CDC后續流程
  2. 支持實時增量物化視圖MV查詢。基于歷史和增量數據,創建輕度聚合-增量MV:按照repo_name/language/create_date聚合統計事件數量。

    --Q2: 生成增量物化視圖MV1-yunqi_incre_mv1
    SET useTunnel=FALSE;
    CREATE materialized VIEW IF NOT EXISTS yunqi_incre_mv1
    refresh every 2 minutes
    tblproperties("enable_auto_refresh"="true", "refresh_mode"="incremental")
    AS SELECT
        repo_name,
        language,
        COUNT(*) AS events,
        to_date(created_at) AS create_date
    FROM
        yunqi_github_events_odps_dw
    WHERE
        date_add(created_at,-1)>'2024-07-26' AND language IS NOT NULL
    GROUP BY
      repo_name,language,to_date(created_at);
    --Q3: 觀測源數據表狀態 - 驗證show history查看表行為 / CDC Build狀態確認
    SHOW history FOR TABLE yunqi_github_events_odps_dw;
    SELECT * FROM table_changes('yunqi_github_events_odps_dw', 1);
  3. SQL分析:按照repo_name/language信息,查詢在一定時間內的各repo的總共事件數量和語言。

    --Q4:按照repo_name / language 信息,降序查詢總共事件數據與Language
    SELECT repo_name, language, SUM(events) AS events_sum, COUNT(create_date) AS day_count 
    FROM yunqi_incre_mv1 
    WHERE events > 100 
    GROUP BY repo_name,language ORDER BY events_sum DESC;
    --Q5:增量物化視圖MV1歷史操作變更
    SHOW history FOR TABLE yunqi_incre_mv1;
    --數據更新批次5分鐘間隔,如無變更需執行
    ALTER materialized VIEW yunqi_incre_mv1 rebuild;
    SELECT * FROM table_changes('yunqi_github_events_odps_dw', 1);
  4. 構建級聯增量物化視圖MV2。基于已有的增量物化視圖MV1,創建Aggragate重度匯聚層。

    • 生成增量物化視圖MV2-yunqi_incre_mv2。在增量MV1的基礎上,物化SQL分析結果到增量MV2。參考Q4 SQL。

      --Q6:生成級聯增量物化視圖MV2-yunqi_incre_mv2
      SET useTunnel=FALSE;
      CREATE materialized VIEW IF NOT EXISTS yunqi_incre_mv2
      refresh every 2 minutes
      tblproperties("enable_auto_refresh"="true", "refresh_mode"="incremental")
      AS 
      SELECT repo_name, language, SUM(events) AS events_sum, COUNT(create_date) AS day_count 
      FROM yunqi_incre_mv1 
      WHERE events > 100 
      GROUP BY repo_name,language;
    • SQL分析:在一定時間內按照編程language分類,按照Repo數量多少來排序查詢每個語言對應的總共事件數量。

      --Q7: 編程Language熱門排序
      SELECT language, COUNT(repo_name) AS repo_cnt,MAX(events_sum) AS max_events FROM yunqi_incre_mv2 
      GROUP BY language ORDER BY repo_cnt DESC;
      --Q8:觀測增量MV2歷史操作狀態 / 表變化信息
      SHOW history FOR TABLE yunqi_incre_mv2;
      --數據更新批次5分鐘間隔,如無變更需執行
      ALTER materialized VIEW yunqi_incre_mv2 rebuild;
      SELECT * FROM table_changes('yunqi_incre_mv2', 2);

觀察查詢SQL結果。

Step 3

使用批處理資源組-增量計算-Stream&Task應用

  1. 在MaxCompute控制臺左側導航欄中,選擇工作區 > SQL分析,打開臨時SQL分析頁面。

  2. 選擇步驟一創建的批處理Quota組,增量CDC。

具體SQL分析步驟如下:

  1. 創建stream對象,查詢新增的cdc記錄,進行后續的增量計算。

    --Q1: 查詢最新的表數據的版本信息
    SELECT get_latest_version('yunqi_github_events_odps_dw'); 

    創建Stream對象。通過上面get_lastest_version信息,設置開始跟蹤的增量數據的起始版本。選擇小于等于最新版本的一個歷史版本開始跟蹤增量的CDC記錄。

    --Q2:創建Stream對象
    CREATE stream stream_githubs ON TABLE yunqi_github_events_odps_dw version AS OF 10 strmproperties('read_mode'='cdc') ;
    
    DESC stream stream_githubs;

  2. 創建并執行Task,Task自動執行增量更新repo_name。

    • 創建聚合表yunqi_github_events_ads。

      --創建聚合表yunqi_github_events_ads
      SET odps.sql.upsertable.table.enable=true;
      CREATE TABLE IF NOT EXISTS 
      yunqi_github_events_ads
      (
          repo_name             STRING COMMENT 'repo全名:owner/Repository_name'
          ,language              STRING COMMENT '編程語言'
          ,events_cnt            BIGINT COMMENT '事件數量'
          ,create_date           DATE COMMENT '事件發生時間'
      )
      STORED AS ALIORC  
      TBLPROPERTIES (
           'transactional'='true',
           'write.bucket.num'='64') 
      LIFECYCLE 36500;
    • Task自動執行增量更新repo_name。

      --Task自動執行增量更新repo_name
      SET odps.sql.periodic.task.enabled=true;
      CREATE task yunqi_incre_task1
      schedule='45 second' 
      taskproperties('schedule_strategy'='200')
      WHEN stream_has_data('stream_githubs')
      AS INSERT overwrite yunqi_github_events_ads SELECT
          repo_name,
          language,
          COUNT(*) AS events_cnt,
          to_date(created_at) AS create_date
      FROM
          yunqi_github_events_odps_dw
      WHERE
          date_add(created_at,-1)>'2024-08-26' AND language IS NOT NULL
      GROUP BY
      repo_name,language,to_date(created_at);
      

      每隔一分鐘,自動查詢出來DWD的CDC增量數據,進行分析處理后寫入ADS表-yunqi_github_events_ads,自定義Pipeline,寫起來復雜但用戶可以靈活通過SQL自定義增量計算分析處理邏輯。(時長設置:1分鐘內可按照秒級設置schdule,超過1分鐘按照分鐘級設置schdule,最長設置為59分鐘)

  3. 更新Task狀態,Task自動執行增量更新repo_name。

    --更新Task狀態,Task自動執行增量更新repo_name
    ALTER task  yunqi_incre_task1  suspend;  --暫停
    ALTER task  yunqi_incre_task1  resume; --恢復
  4. 查詢CDC build task情況。

    --查詢CDC build task情況
    SHOW history FOR task yunqi_incre_task1;
    SELECT *  FROM stream_githubs

觀察查詢SQL結果。

步驟四:多Quota資源配置和交互式查詢加速

交互式Quota組擴容(64CU->96CU),在新規格的Quota組情況下,在MaxCompute TopConsole進行交互式查詢, 觀測查詢加速性能提升。

步驟

操作

預期結果

Step 1

Quota擴容

  1. 在MaxCompute控制臺左側導航欄中,選擇工作區 > 配額(Quota)管理,單擊目標一級Quota組右側的Quota配置2024-09-18_15-49-38.png

  2. 基礎配置頁面,單擊編輯基礎配置

  3. 修改交互式Quota組mcqa2_demo_yunqi,將其從64CU擴容至128CU。2024-09-18_15-54-23.png

目標Quota組資源擴容成功。

Step 2

查詢對比分析與性能優化

  1. 在MaxCompute控制臺左側導航欄中,選擇工作區 > SQL分析,打開臨時SQL分析頁面。

  2. 選擇步驟一創建的交互式Quota組,進行近實時查詢分析,觀測數據變化。

具體SQL分析步驟如下:

  • Q1(可選): 執行Major Compaction操作,優化查詢。

    --Q1: 執行Major Compaction操作,優化查詢
    SET odps.merge.task.mode=service;
    ALTER TABLE yunqi_github_events_odps_dw compact major;

    MCQA2.0 查詢對比性能-交互式資源組。

    --Q2: 近實時查詢-實時事件展示
    SELECT
        cast(created_at AS STRING ),
        actor_login,
        type,
        repo_name
    FROM
        yunqi_github_events_odps_dw
    ORDER BY
        created_at DESC
    LIMIT 5;
  • 增量MV1查詢:按照repo_name/language信息,查詢在一定時間內的各repo的總共事件數量和語言。

    --Q3:按照repo_name / language 信息,降序查詢總共事件數據與Language
    SELECT repo_name, language, SUM(events) AS events_sum, COUNT(create_date) AS day_count 
    FROM yunqi_incre_mv1 
    WHERE events > 100 
    GROUP BY repo_name,language ORDER BY events_sum DESC;
  • 增量MV2查詢:在一定時間內按照編程language分類,查詢每個語言對應的repo數量與總共事件數量。

    --Q4: 編程Language熱門排序
    SELECT language, COUNT(repo_name) AS repo_cnt,MAX(events_sum) AS max_events FROM yunqi_incre_mv2 
    GROUP BY language;

觀察查詢SQL結果。

步驟五(可選):增全量一體交互式分析

使用DataWorks IDE模塊-實現增全量一體的近實時數據分析。

步驟

操作

預期結果

Step 1

在DataWorks IDE 數據開發模塊配置MaxCompute計算資源

  1. 訪問DataWorks IDE數據開發頁面,在左側導航欄單擊數據目錄.png圖標,打開數據目錄頁面。777.png

  2. 單擊右上角的工作空間管理,選擇計算資源,單擊綁定計算資源888.png

  3. 在頁面右側配置計算資源計算配額資源組999.png

  4. 綁定MaxCompute項目,執行MaxComputeSQL任務。000.png

Step 2

在DataWorks IDE數據開發平臺進行數據分析

具體SQL分析步驟如下:

  • Q1:過去24小時最活躍項目

    SELECT
        repo_name,
        COUNT(*) AS events
    FROM
        yunqi_github_events_odps_dw
    WHERE
        date_add(created_at,-1)>'2024-08-26'
    GROUP BY
        repo_name
    ORDER BY
        events DESC
    LIMIT 5;
  • Q2:過去24小時最活躍開發者

    SELECT
        actor_login,
        COUNT(*) AS events
    FROM
        yunqi_github_events_odps_dw
    WHERE
        date_add(created_at,-1)>'2024-08-26'
    GROUP BY
        actor_login
    ORDER BY
        events 
    LIMIT 5;
  • Q3:今日公開事件總數

    SELECT count(*) FROM yunqi_github_events_odps_dw WHERE TO_DATE(created_at) >= date_add(now(),-1);
  • Q4:實時事件展示

    SELECT
        cast(created_at AS STRING ),
        actor_login,
        type,
        repo_name
    FROM
        yunqi_github_events_odps_dw
    ORDER BY
        created_at DESC
    LIMIT 5;

觀察查詢SQL結果。

總結

本次示例展示了MaxCompute全新構建的基于近實時數倉產品特性的產品方案實踐。MaxCompute提供增全量一體的數據處理和近實時查詢能力,從數據存儲層(Delta Table統一表格式)、計算層(增量計算:CDC/Task/增量MV)、加速層(MCQA2.0查詢加速引擎)等三層架構來實現MaxCompute近線計算能力的全面升級。通過此次典型的Demo創建,您能夠深度了解如何在MaxCompute產品上構建完備的近實時以及增全量計算的計算任務,簡化數據全生命周期的計算優化工作。