日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Hologres+Flink通過預聚合實現實時UV統計

實時UV計算主要依賴Hologres與Flink結合完成,本文將為您介紹Hologres如何進行實時UV精確去重。

前提條件

背景信息

Hologres與Flink有著強大的融合優化,支持Flink數據高通量實時寫入,寫入即可見,支持Flink SQL維表關聯,以及作為CDC Source事件驅動開發。因此實時UV去重主要通過Flink和Hologres來實現,場景架構圖如下所示。流程

  1. Flink實時訂閱實時采集的數據,數據源可以來源于日志數據,如Kafka等。

  2. Flink對數據做進一步加工處理,將流式數據轉化為表與Hologres維表進行JOIN操作,實時寫入Hologres。

  3. Hologres對Flink實時寫入的數據實時處理。

  4. 最終查詢的數據對接上層數據應用,如數據服務、Quick BI等。

實時UV計算方案流程

Flink與Hologres有著非常強的融合性,再結合Hologres天然支持的RoaringBitmap,完成實時UV計算,實時對用戶標簽去重,詳細方案流程如下圖所示。流程圖

  1. Flink實時訂閱用戶數據,這些數據可以來源于Kafka、Redis等,并通過DataStream轉化為數據源表。

  2. 在Hologres中創建用戶映射表,存放歷史用戶的uid以及對應的32位自增uid。

    說明

    常見的業務系統或者埋點中的用戶ID很多是字符串類型或Long類型,因此需要使用uid_mapping類型構建一張映射表。RoaringBitmap類型要求用戶ID必須是32位int類型且越稠密越好(即用戶ID最好連續)。映射表利用Hologres的SERIAL類型(自增的32位int)來實現用戶映射的自動管理和穩定映射。

  3. 在Flink中,將Hologres中的用戶映射表作為Flink維表,利用Hologres維表的insertIfNotExists特性結合自增字段實現高效的uid映射。維表與數據源表進行Join關聯,并將Join得到的結果轉化為流式數據DataStream。

  4. Hologres中創建聚合結果表,Flink把維表關聯的結果數據按照時間窗口進行處理,根據查詢維度使用RoaringBitmap函數

  5. 查詢時,與離線方式相似,直接按照查詢條件查詢聚合結果表,并對其中關鍵的RoaringBitmap字段做or運算后并統計基數,即可得出對應用戶數。

這樣的方式,可以較細粒度的實時得到用戶UV、PV數據,同時便于根據需求調整最小統計窗口(如最近5分鐘的UV),實現類似實時監控的效果,更好的在大屏等BI展示。相較于以天、周、月等為單位的去重,更適合在活動日期進行更細粒度的統計,并且通過簡單的聚合,也可以得到較大時間單位的統計結果。如果加工聚合的粒度較細,但查詢時缺少相應的過濾條件或聚合維度,則也會在查詢時引起較多的二次聚合操作,對性能有不利影響。

該方案數據鏈路簡單,可以任意維度靈活計算,只需要一份Bitmap存儲,也沒有存儲爆炸問題,還能保證實時更新,從而實現更實時、開發更靈活、功能更完善的多維分析數倉。

操作步驟

  1. 在Hologres中創建相關基礎表

    1. 創建用戶映射表

      在Hologres創建表uid_mapping為用戶映射表,命令語句如下所示。用于映射uid到32位int類型。如果原始uid已經是int32類型,此步驟可忽略。

      • 常見的業務系統或者埋點中的用戶ID很多是字符串類型或Long類型,因此需要使用uid_mapping類型構建一張映射表。RoaringBitmap類型要求用戶ID必須是32位int類型且越稠密越好(即用戶ID最好連續)。映射表利用Hologres的SERIAL類型(自增的32位int)來實現用戶映射的自動管理和穩定映射。

      • 由于是實時數據,在Hologres中該表為行存表,以提高Flink維表實時JOIN的QPS。

      • 需要開啟相應GUC使用優化的執行引擎對包含serial字段的表進行寫入,詳情請參見Fixed Plan加速SQL執行

      --開啟GUC,支持含有Serial類型列的Fixed Plan寫入
      alter database <dbname> set hg_experimental_enable_fixed_dispatcher_autofill_series=on;
      alter database <dbname> set hg_experimental_enable_fixed_dispatcher_for_multi_values=on;
      
      BEGIN;
      CREATE TABLE public.uid_mapping (
      uid text NOT NULL,
      uid_int32 serial,
      PRIMARY KEY (uid)
      );
      --將uid設為clustering_key和distribution_key便于快速查找其對應的int32值
      CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid');
      CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid');
      CALL set_table_property('public.uid_mapping', 'orientation', 'row');
      COMMIT;
    2. 創建聚合結果表

      創建表dws_app為聚合結果表,用于存放在基礎維度上聚合后的結果。

      使用RoaringBitmap函數前需要創建RoaringBitmap extension,同時也需要Hologres實例為 V0.10及以上版本。

      CREATE EXTENSION IF NOT EXISTS roaringbitmap;

      相比離線結果表,此結果表增加了時間戳字段,用于實現以Flink窗口周期為單位的統計,結果表DDL如下。

      BEGIN;
      CREATE TABLE dws_app(
        country text,
        prov text,
        city text,
        ymd text NOT NULL,  --日期字段
        timetz TIMESTAMPTZ,  --統計時間戳,可以實現以Flink窗口周期為單位的統計
        uid32_bitmap roaringbitmap, -- 使用roaringbitmap記錄uv
       PRIMARY KEY (country, prov, city, ymd, timetz)--查詢維度和時間作為主鍵,防止重復插入數據
      );
      CALL set_table_property('public.dws_app', 'orientation', 'column');
      --日期字段設為clustering_key和event_time_column,便于過濾
      CALL set_table_property('public.dws_app', 'clustering_key', 'ymd');
      CALL set_table_property('public.dws_app', 'event_time_column', 'ymd');
      --group by字段設為distribution_key
      CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city');
      COMMIT;
  2. Flink實時讀取數據并更新聚合結果表

    在Flink中的完整示例源碼請參見alibabacloud-hologres-connectors examples,下面是在Flink中的具體操作步驟。

    1. Flink流式讀取數據源(DataStream)并轉化為源表(Table)

      在Flink中使用流式讀取數據源,數據源可以是CSV文件,也可以來源于Kafka、Redis等,可以根據業務場景準備。在Flink中轉化為源表的代碼示例如下。

      // 此處使用csv文件作為數據源,也可以是kafka/redis等
      DataStreamSource odsStream = env.createInput(csvInput, typeInfo);
      // 與維表join需要添加proctime字段
      Table odsTable =
          tableEnv.fromDataStream(
          odsStream,
          $("uid"),
          $("country"),
          $("prov"),
          $("city"),
          $("ymd"),
          $("proctime").proctime());
      // 注冊到catalog環境
      tableEnv.createTemporaryView("odsTable", odsTable);
    2. 將源表與Hologres維表(uid_mapping)進行關聯

      在Flink中創建Hologres維表,需要使用insertIfNotExists參數,即查詢不到數據時自行插入,uid_int32字段便可以利用Hologres的Serial類型自增創建。將Flink源表與Hologres維表進行關聯(JOIN),代碼示例如下。

      -- 創建Hologres維表,其中insertIfNotExists表示查詢不到則自行插入
      String createUidMappingTable =
          String.format(
          "create table uid_mapping_dim("
          + "  uid string,"
          + "  uid_int32 INT"
          + ") with ("
          + "  'connector'='hologres',"
          + "  'dbname' = '%s'," //Hologres DB名
          + "  'tablename' = '%s',"http://Hologres 表名
          + "  'username' = '%s'," //當前賬號access id
          + "  'password' = '%s'," //當前賬號access key
          + "  'endpoint' = '%s'," //Hologres endpoint
          + "  'insertifnotexists'='true'"
          + ")",
          database, dimTableName, username, password, endpoint);
      tableEnv.executeSql(createUidMappingTable);
      
      -- 源表與維表join
      String odsJoinDim =
          "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32"
          + "  FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim"
          + "  ON ods.uid = dim.uid";
      Table joinRes = tableEnv.sqlQuery(odsJoinDim);
    3. 將關聯結果轉化為DataStream

      通過Flink時間窗口處理,結合RoaringBitmap進行對指標進行去重處理,代碼示例如下所示。

      DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource =
          source
          -- 篩選需要統計的維度(country, prov, city, ymd)
          .keyBy(0, 1, 2, 3)
          -- 滾動時間窗口;此處由于使用讀取csv模擬輸入流,采用ProcessingTime,實際使用中可使用EventTime
          .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
          -- 觸發器,可以在窗口未結束時獲取聚合結果
          .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1)))
          .aggregate(
          -- 聚合函數,根據key By篩選的維度,進行聚合
          new AggregateFunction<
              Tuple5<String, String, String, String, Integer>,
              RoaringBitmap,
              RoaringBitmap>() {
                  @Override
                  public RoaringBitmap createAccumulator() {
                      return new RoaringBitmap();
                  }
      
                  @Override
                  public RoaringBitmap add(
                      Tuple5<String, String, String, String, Integer> in,
                      RoaringBitmap acc) {
                      -- 將32位的uid添加到RoaringBitmap進行去重
                      acc.add(in.f4);
                      return acc;
                  }
      
                  @Override
                  public RoaringBitmap getResult(RoaringBitmap acc) {
                      return acc;
                  }
      
                  @Override
                  public RoaringBitmap merge(
                      RoaringBitmap acc1, RoaringBitmap acc2) {
                      return RoaringBitmap.or(acc1, acc2);
                  }
              },
          -- 窗口函數,輸出聚合結果
          new WindowFunction<
              RoaringBitmap,
              Tuple6<String, String, String, String, Timestamp, byte[]>,
              Tuple,
              TimeWindow>() {
                  @Override
                  public void apply(
                      Tuple keys,
                      TimeWindow timeWindow,
                      Iterable<RoaringBitmap> iterable,
                      Collector<
                      Tuple6<String, String, String, String, Timestamp, byte[]>> out)
                      throws Exception {
                      RoaringBitmap result = iterable.iterator().next();
      
                      // 優化RoaringBitmap
                      result.runOptimize();
                      // 將RoaringBitmap轉化為字節數組以存入Holo中
                      byte[] byteArray = new byte[result.serializedSizeInBytes()];
                      result.serialize(ByteBuffer.wrap(byteArray));
      
                      // 其中 Tuple6.f4(Timestamp) 字段表示以窗口長度為周期進行統計,以秒為單位
                      out.collect(
                          new Tuple6<>(
                              keys.getField(0),
                              keys.getField(1),
                              keys.getField(2),
                              keys.getField(3),
                              new Timestamp(
                                  timeWindow.getEnd() / 1000 * 1000),
                              byteArray));
              }
          });
    4. 寫入Hologres聚合結果表

      經過Flink去重處理的數據寫入至Hologres結果表dws_app,但需要注意的是Hologres中RoaringBitmap類型在Flink中對應Byte數組類型,Flink中代碼如下。

       -- 計算結果轉換為表
      Table resTable =
          tableEnv.fromDataStream(
              processedSource,
              $("country"),
              $("prov"),
              $("city"),
              $("ymd"),
              $("timest"),
              $("uid32_bitmap"));
      
      -- 創建Hologres結果表, 其中Hologres的RoaringBitmap類型通過Byte數組存入
      String createHologresTable =
          String.format(
              "create table sink("
              + "  country string,"
              + "  prov string,"
              + "  city string,"
              + "  ymd string,"
              + "  timetz timestamp,"
              + "  uid32_bitmap BYTES"
              + ") with ("
              + "  'connector'='hologres',"
              + "  'dbname' = '%s',"
              + "  'tablename' = '%s',"
              + "  'username' = '%s',"
              + "  'password' = '%s',"
              + "  'endpoint' = '%s',"
              + "  'connectionSize' = '%s',"
              + "  'mutatetype' = 'insertOrReplace'"
              + ")",
          database, dwsTableName, username, password, endpoint, connectionSize);
      tableEnv.executeSql(createHologresTable);
      
      -- 寫入計算結果到dws_app表
      tableEnv.executeSql("insert into sink select * from " + resTable);
  3. 數據查詢

    在Hologres中對聚合結果表(dws_app)進行UV計算。按照查詢維度做聚合計算,查詢Bitmap基數,得出Group By條件下的用戶數。

    • 示例一:查詢某天內各個城市的uv

      -- 運行下面RB_AGG運算查詢,可執行參數先關閉三階段聚合開關(默認關閉),性能更好,此步驟可選
      set hg_experimental_enable_force_three_stage_agg=off;
      
      SELECT  country
              ,prov
              ,city
              ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
      FROM    dws_app
      WHERE   ymd = '20210329'
      GROUP BY country
               ,prov
               ,city
      ;
    • 示例二:查詢某段時間內各個省份的UV、PV

      -- 運行下面RB_AGG運算查詢,可執行參數先關閉三階段聚合開關(默認關閉),性能更好,此步驟可選
      set hg_experimental_enable_force_three_stage_agg=off;
      
      SELECT  country
              ,prov
              ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv
              ,SUM(pv) AS pv
      FROM    dws_app
      WHERE   time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08'
      GROUP BY country
               ,prov
      ;
  4. 可視化展示

    計算出UV、PV后,大多數情況需要使用BI工具以更直觀的方式可視化展示,由于需要使用RB_CARDINALITYRB_OR_AGG進行聚合計算,需要使用BI的自定義聚合函數的能力,常見的具備該能力的BI包括Apache Superset和Tableau。

    • Apache Superset

      1. Apache Superset連接Hologres,詳情請參見Apache Superset

      2. 設置dws_app表作為數據集。添加Dataset

      3. 在數據集中創建一個名稱為UV的單獨Metrics,表達式如下。創建UV

        RB_CARDINALITY(RB_OR_AGG(uid32_bitmap))

        完成后您就可以開始探索數據了。

      4. (可選)創建Dashboard。

        創建儀表板請參見Create Dashboard

    • Tableau

      1. Tableau連接Hologres,詳情請參見Tableau

        可以使用Tableau的直通函數直接實現自定義函數的能力,詳細介紹請參見直通函數

      2. 創建一個計算字段,表達式如下。創建UV

        RAWSQLAGG_INT("RB_CARDINALITY(RB_OR_AGG(%1))", [Uid32 Bitmap])

        完成后您就可以開始探索數據了。

      3. (可選)創建Dashboard。

        創建儀表板請參見Create a Dashboard