日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

模擬實時統計特征的配置和原理

本文介紹實時統計特征如何構建,有哪些注意點。

背景信息

推薦系統的召回和排序,越能實時、準確地根據用戶的操作感知到用戶的偏好行為,根據操作行為來召回候選集和排序候選的物品,推薦的效果就越好。但是我們在優化一個推薦場景的召回或者排序模型的時候,往往還沒有準備好用戶和物品的實時特征,無法在模型打分前1秒獲取到用戶實時特征和物品實時特征。

如果我們先建設一個實時統計特征、寫入到系統中、再根據曝光、點擊等事件來關聯正負樣本,這個流程非常長,還可能開發了一些沒有用的實時特征從而浪費開發資源和時間。我們根據長期的推薦系統實踐,提出利用離線歷史行為數據、用戶和物品特征數據來模擬計算出實時特征,得到特征和訓練樣本,在線上服務的時候用Flink統計真實的實時特征供模型打分使用,充分使用歷史數據跳過實時特征積累的過程,能夠離線先驗證模型效果,同時也大大縮短了項目時間(先用離線模擬的實時特征來訓練模型,用真實實時特征來做線上服務,最終再用真實實時特征來訓練模型)。

準備數據

1.離線和實時的用戶行為日志表

除了離線的行為日志表之外,還需要一張實時行為日志表。實時行為日志表首先存在DataHub中,其余消息中間件也可以,只需要能被Flink SQL消費即可。注意:實時行為日志表的字段是離線行為表字段的子集(即DataHub和MaxCompute中的表一些字段名稱是一致的),但至少需要以下字段。

字段

類型

是否必須非空

描述

user_id

string

用戶id

item_id

string

商品id

request_id

string

請求id

event

string

行為類型,該字段的值一般是英文字符串:expr,click,buy等

event_time

bigint

行為發生的時間,必須是10位的時間戳,單位是秒

event_value

double

行為對應的值,如觀看時長,購買金額等;如果沒有則設置為0

scene

string

用來區分日志場景,如首頁猜你喜歡推薦、搜索場景、詳情頁相關推薦、排行榜等等

字段名稱不一定和以上描述完全一樣,可以增加冗余字段。

我們準備如下用戶行為日志表。MaxCompute離線行為日志表結構如下(其中playtime對應上面表格中的event_value字段,page對應上面的scene字段):

image

離線數據表參考《推薦算法定制最佳實踐文檔》。

DataHub實時行為日志表結構如下:

image

可以看到實時行為表字段和離線行為表完全一樣,且字段是離線行為表的子集。

PAI-Rec推薦方案定制->數據注冊->DataHub表中進行實時行為表的注冊,同時在MaxCompute表中對離線行為表注冊

image

2.用戶屬性表

MaxCompute準備如下表結構:

image并在PAI-Rec中的推薦算法定制中進行MaxCompute進行數據注冊

image

3.物品表

在MaxCompute中準備如下物品表,同上面一樣在PAI-Rec的推薦算法定制中進行數據注冊

imageimage

推薦算法定制

我們在PAI-Rec控制臺的推薦算法定制的特征工程功能模塊中集成了如何配置實時統計特征的功能,配置實時統計特征之后,會產出MaxCompute SQL離線模擬產出推薦請求那一刻需要的實時統計特征。

1.數據表配置

行為日志表配置

image

image

用戶表配置

image

物品表配置

image

實時行為日志表配置

image

image

2.離線模擬實時統計特征

在算法定制的特征配置中,對用戶和物品實時特征如下配置。盡可能實時特征不要配置曝光行為,否則會導致Flink的資源消耗過大,統計點擊以上的行為基本上能取得同樣的收益,特殊場景除外。

用戶實時特征

注意:防穿越時間設置5秒,表示只統計在某次推薦請求5秒之前的行為。防止離線可以統計到,但是在線服務中因為系統延遲、轉發等邏輯并不能統計到這個特征。

image

物品實時特征

image

離線模擬實時特征SQL如下:

image

當把推薦算法定制生成的代碼部署到DataWorks上面之后,可以在DataWorks上的“數據開發-業務流程中”看到如上節點,分別構造了用戶的實時特征和物品的實時特征。

首先構造了ali_rec_wide_table_for_rt的表,該表為模擬實時特征準備。在模擬一天數據的實時特征過程中至少需要前一天和當天的數據,因為當天凌晨的一些用戶和物品的實時特征需要利用前一天的數據來模擬統計出實時特征,如上圖所示ali_rec_user_id_rt_feature,ali_rec_item_id_rt_feature節點是根據離線日志模擬實時特征的核心

以ali_rec_user_id_rt_feature為例,做以下講解

REATE TABLE IF NOT EXISTS ali_rec_user_id_rt_feature 
(
    user_id string
    ,request_id string
    ,event_unix_time bigint
    ,user__cnt_click_all_scene_rt5m bigint
    ,user__kv_author_click_all_scene_rt5m string
    ,user__kv_category_click_all_scene_rt5m string
    ,user__cnt_praise_all_scene_rt5m bigint
    ,user__kv_author_praise_all_scene_rt5m string
    ,user__kv_category_praise_all_scene_rt5m string
    ,user__cnt_click_all_scene_rt1h bigint
    ,user__kv_author_click_all_scene_rt1h string
    ,user__kv_category_click_all_scene_rt1h string
    ,user__cnt_praise_all_scene_rt1h bigint
    ,user__kv_author_praise_all_scene_rt1h string
    ,user__kv_category_praise_all_scene_rt1h string
    ,user__cnt_click_all_scene_rt12h bigint
    ,user__kv_author_click_all_scene_rt12h string
    ,user__kv_category_click_all_scene_rt12h string
    ,user__cnt_praise_all_scene_rt12h bigint
    ,user__kv_author_praise_all_scene_rt12h string
    ,user__kv_category_praise_all_scene_rt12h string
)
PARTITIONED BY 
(
    ds string
)
LIFECYCLE 90
;
set odps.stage.mapper.split.size=8;
set odps.stage.reducer.num=200;
set odps.namespace.schema=false;
--@resource_reference{"feature-generate-mr-v1.16.jar"}
jar -resources feature-generate-mr-v1.16.jar
    -classpath feature-generate-mr-v1.16.jar
    com.aliyun.pai.feature_generate_mr.RealTimeFeature
    -pid "user_id"
    -pid_type "string"
    -time_stamp "event_unix_time"
    -time_stamp_type "bigint"
    -event "event"
    -valid_events "click,praise"
    -valid_event_selections "click,praise"
    -category_fields "author,category"
    -default_v ""
    -lag_time_values "300,3600,43200"
    -pre_seconds 5
    -topk 100
    -output_date "${bdp.system.bizdate}"
    -input_table "ali_rec_wide_table_for_rt/ds='${bdp.system.bizdate}'"
    -output_table "ali_rec_user_id_rt_feature/ds='${bdp.system.bizdate}'"
    -requestid "request_id"
;

該jar包是根據離線日志模擬實時特征,參數介紹如下:

-pid                        一般是userid,表示統計誰的實時特征,也可以是itemid
-pid_type                   pid的數據類型,string或者bigint
-requestid                  行為表的請求id字段,可以不配置,則每秒輸出一次結果,如果有,則每個用戶,每個requestid計算輸出一次實時特征
-time_stamp                 行為表的行為發生時間,10位時間戳,單位是秒
-time_stamp_type            行為表的時間戳的數據類型,類型是bigint
-event                      行為表的事件類型字段
-valid_events               行為表對事件類型的別名
-valid_event_selections     行為表需要被統計的事件類型,一個事件類型可以包含多個具體行為名,例如 click 可以包含 discover_click 和 popular_click, 輸入時以 "|" 分隔
-category_fields            需要被統計的category或者tags的字段
-default_v                  被統計的category或者tags的字段空值填充
-lag_time_values            窗口值,單位秒,例如"1800,3600,10800"
-pre_seconds                多少秒之前發生的行為才可以統計,防止穿越用的:因為線上日志有延遲,離線模擬不能過于及時。建議至少設置5秒。
-topk                       一般表示kv里面k的數量
-output_date                "${bdp.system.bizdate}",表示輸出的開始時間,一般輸出當前每個時刻的實時特征
-input_table                "dwd_rt_wide_table/dt='${bdp.system.bizdate}'"
-output_table               "dwd_user_rt_feature/dt='${bdp.system.bizdate}'"
-scene                      場景字段,配合valid_scenes使用
-valid_scenes               需要單獨統計場景字段的值全部場景統計,單獨場景統計

生成的特征規律如下,除了userid,requestid,以及event time字段,以下輸出是生成特征的規律

for sc in [all]+valid_scenes: # 遍歷場景,默認是全場景,all標識全場景
  for win in windows: # 遍歷窗口
    for event in valid_events: # 遍歷行為類型
      print(cnt_event) # 行為次數統計
      for c in enumurate(category_fields): #遍歷類目字段
        print(kv_c_event) # 類目c在行為event下統計的kv字段

實時特征范圍

1.行為計數統計,周期內可以統計不同行為發生的不同次數,如曝光多少次,點擊多少次,都是數值特征

2.行為轉換率統計,如點擊次數/曝光次數,此種是在第一種結果上做了兩兩相除

3.偏好屬性行為計數,如周期內統計不同行為,對應的屬性偏好次數,如周期內,用戶對不同類目的點擊次數,統計得到kv特征

4.偏好屬性行為轉換率,這也是在得到偏好屬性行為計數后做兩兩相除得到

樣本根據userid和requestid join到實時特征便可以進行訓練。物品側同理

實時統計特征在線統計,供給推理使用

實時特征統計實現:

實時特征實現主要依據2個關鍵函數得實現.

MessageDelay:日志立即輸出一次,當水位到達日志在對應的窗口后再各輸出一次;如以下講解中有3個窗口,會直接輸出一次,當水位達到當前日志發生時間的300s之后再輸出一次,3600s之后再輸出一次,43200s之后再輸出一次。

SWCountCatesKVS:根據以上輸出日志統計實時特征。

以此實現實時特征秒級變動。

配置依賴

如果在該算法定制的環境配置中配置有flink數據源,則會生成flink統計當前實時特征的sql

image

打開flink對應空間,在作業運維中可以看到實時特征統計的sql腳本

image

腳本sql如下

-- please register flink function from https://easyrec.oss-cn-beijing.aliyuncs.com/deploy/real_time_seq_feature/feature-generate-flink-udf-v1.3.jar

CREATE TEMPORARY TABLE rec_sln_demo_behavior_table_dh
(
  user_id     string
  ,item_id    string
  ,event_time bigint
  ,event      string
  ,playtime   double
  ,page       string
)
WITH (
  'connector' = 'datahub'
  ,'subId' = '{topic_subid}'
  ,'endPoint' = '{datahub_endpoint}'
  ,'project' = '{datahub_project}'
  ,'topic' = 'rec_sln_demo_behavior_table_dh'
  ,'accessId' = '{accessId}'
  ,'accessKey' = '{accessKey}'
)
;

CREATE TEMPORARY TABLE ali_rec_rec_sln_demo_user_table_preprocess
(
  user_id                 string
  ,gender                 string
  ,age                    bigint
  ,city                   string
  ,item_cnt               bigint
  ,follow_cnt             bigint
  ,follower_cnt           bigint
  ,tags                   string
  ,user_register_time     bigint
  ,user_register_time_bin string
  ,PRIMARY KEY (user_id)
)
WITH (
  'connector' = 'odps'
  ,'endPoint' = '{odps_endpoint}'
  ,'project' = 'pairec_demo'
  ,'accessId' = '{accessId}'
  ,'accessKey' = '{accessKey}'
  ,'tableName' = 'ali_rec_rec_sln_demo_user_table_preprocess'
  ,'partition' = 'max_pt_with_done()'
  ,'cache' = 'ALL'
  ,'cacheSize' = '10000000'
  ,'cacheTTLMs' = '3600000'
)
;

CREATE TEMPORARY TABLE ali_rec_rec_sln_demo_item_table_preprocess
(
  item_id            string
  ,duration          double
  ,category          string
  ,author            bigint
  ,click_count       bigint
  ,praise_count      bigint
  ,item_pub_time     bigint
  ,item_pub_time_bin string
  ,PRIMARY KEY (item_id)
)
WITH (
  'connector' = 'odps'
  ,'endPoint' = '{odps_endpoint}'
  ,'project' = 'pairec_demo'
  ,'accessId' = '{accessId}'
  ,'accessKey' = '{accessKey}'
  ,'tableName' = 'ali_rec_rec_sln_demo_item_table_preprocess'
  ,'partition' = 'max_pt_with_done()'
  ,'cache' = 'ALL'
  ,'cacheSize' = '10000000'
  ,'cacheTTLMs' = '3600000'
)
;

CREATE TEMPORARY VIEW rec_sln_demo_behavior_table_dh_wide
AS SELECT
  user_id
  ,item_id
  ,event_time
  ,event
  ,playtime
  ,page
  ,gender
  ,age
  ,city
  ,item_cnt
  ,follow_cnt
  ,follower_cnt
  ,tags
  ,user_register_time
  ,user_register_time_bin
  ,duration
  ,category
  ,author
  ,click_count
  ,praise_count
  ,item_pub_time
  ,item_pub_time_bin
  ,HashBucket(user_id, 2) group_key
FROM rec_sln_demo_behavior_table_dh t1
  LEFT JOIN ali_rec_rec_sln_demo_user_table_preprocess FOR SYSTEM_TIME AS OF PROCTIME() t2
    ON t1.user_id = t2.user_id
  LEFT JOIN ali_rec_rec_sln_demo_item_table_preprocess FOR SYSTEM_TIME AS OF PROCTIME() t3
    ON t1.item_id = t3.item_id
;

CREATE TEMPORARY VIEW rec_sln_demo_behavior_table_dh_wide_delay
AS SELECT
  delay_info[1] user_id
  ,delay_info[2] item_id
  ,delay_info[3] event
  ,delay_info[4] page
  ,delay_info[5] gender
  ,delay_info[6] city
  ,delay_info[7] tags
  ,delay_info[8] author
  ,delay_info[9] category
  ,CAST(delay_info[10] AS BIGINT) event_time
  ,CAST(delay_info[13] AS BIGINT) sys_time
  ,CAST(delay_info[14] AS INT) delay_flag
FROM (
  SELECT
    MessageDelay(
      ARRAY[user_id,item_id,event,page,gender,city,tags,CAST(author AS STRING),category]
      ,event_time
      ,ARRAY[300,3600,43200]
      ,group_key
      ,IF(event IN ('click','praise'), true, false)
      ,ARRAY['']
    ) delay_info
  FROM rec_sln_demo_behavior_table_dh_wide
  GROUP BY
    group_key
) sq0
;

CREATE TEMPORARY VIEW dwd_item_id_rt_feature
AS SELECT
  item_id
  ,sys_time
  ,CAST(features[1] AS BIGINT) item__cnt_click_rt5m
  ,features[2] item__kv_gender_click_rt5m
  ,features[3] item__kv_city_click_rt5m
  ,features[4] item__kv_tags_click_rt5m
  ,CAST(features[5] AS BIGINT) item__cnt_praise_rt5m
  ,features[6] item__kv_gender_praise_rt5m
  ,features[7] item__kv_city_praise_rt5m
  ,features[8] item__kv_tags_praise_rt5m
  ,CAST(features[9] AS BIGINT) item__cnt_click_rt1h
  ,features[10] item__kv_gender_click_rt1h
  ,features[11] item__kv_city_click_rt1h
  ,features[12] item__kv_tags_click_rt1h
  ,CAST(features[13] AS BIGINT) item__cnt_praise_rt1h
  ,features[14] item__kv_gender_praise_rt1h
  ,features[15] item__kv_city_praise_rt1h
  ,features[16] item__kv_tags_praise_rt1h
  ,CAST(features[17] AS BIGINT) item__cnt_click_rt12h
  ,features[18] item__kv_gender_click_rt12h
  ,features[19] item__kv_city_click_rt12h
  ,features[20] item__kv_tags_click_rt12h
  ,CAST(features[21] AS BIGINT) item__cnt_praise_rt12h
  ,features[22] item__kv_gender_praise_rt12h
  ,features[23] item__kv_city_praise_rt12h
  ,features[24] item__kv_tags_praise_rt12h
FROM (
  SELECT
    item_id
    ,UNIX_TIMESTAMP() sys_time
    ,SWCountCatesKVS(
      ARRAY[gender,city,tags]
      ,1
      ,event
      ,ARRAY['click','praise']
      ,delay_flag
      ,ARRAY[300,3600,43200]
      ,page
      ,ARRAY['']
      ,100
      ,item_id
      ,ARRAY['']
    ) features
  FROM rec_sln_demo_behavior_table_dh_wide_delay
  GROUP BY
    item_id
) sq0
;

CREATE TEMPORARY TABLE ali_rec_item_id_rt_statistic_feat
(
  item_id                       string
  ,sys_time                     bigint
  ,item__cnt_click_rt5m         bigint
  ,item__cnt_praise_rt5m        bigint
  ,item__cnt_click_rt1h         bigint
  ,item__cnt_praise_rt1h        bigint
  ,item__cnt_click_rt12h        bigint
  ,item__cnt_praise_rt12h       bigint
  ,item__kv_gender_click_rt5m   string
  ,item__kv_city_click_rt5m     string
  ,item__kv_tags_click_rt5m     string
  ,item__kv_gender_praise_rt5m  string
  ,item__kv_city_praise_rt5m    string
  ,item__kv_tags_praise_rt5m    string
  ,item__kv_gender_click_rt1h   string
  ,item__kv_city_click_rt1h     string
  ,item__kv_tags_click_rt1h     string
  ,item__kv_gender_praise_rt1h  string
  ,item__kv_city_praise_rt1h    string
  ,item__kv_tags_praise_rt1h    string
  ,item__kv_gender_click_rt12h  string
  ,item__kv_city_click_rt12h    string
  ,item__kv_tags_click_rt12h    string
  ,item__kv_gender_praise_rt12h string
  ,item__kv_city_praise_rt12h   string
  ,item__kv_tags_praise_rt12h   string
)
WITH (
  'connector' = 'hologres'
  ,'endPoint' = '{hologres_endpoint}'
  ,'dbname' = 'mydb'
  ,'tablename' = 'ali_rec_item_id_rt_statistic_feat'
  ,'username' = '{accessId}'
  ,'password' = '{accessKey}'
  ,'jdbcWriteFlushInterval' = '500'
  ,'mutatetype' = 'insertorreplace'
)
;

CREATE TEMPORARY VIEW dwd_user_id_rt_feature
AS SELECT
  user_id
  ,sys_time
  ,CAST(features[1] AS BIGINT) user__cnt_click_rt5m
  ,features[2] user__kv_author_click_rt5m
  ,features[3] user__kv_category_click_rt5m
  ,CAST(features[4] AS BIGINT) user__cnt_praise_rt5m
  ,features[5] user__kv_author_praise_rt5m
  ,features[6] user__kv_category_praise_rt5m
  ,CAST(features[7] AS BIGINT) user__cnt_click_rt1h
  ,features[8] user__kv_author_click_rt1h
  ,features[9] user__kv_category_click_rt1h
  ,CAST(features[10] AS BIGINT) user__cnt_praise_rt1h
  ,features[11] user__kv_author_praise_rt1h
  ,features[12] user__kv_category_praise_rt1h
  ,CAST(features[13] AS BIGINT) user__cnt_click_rt12h
  ,features[14] user__kv_author_click_rt12h
  ,features[15] user__kv_category_click_rt12h
  ,CAST(features[16] AS BIGINT) user__cnt_praise_rt12h
  ,features[17] user__kv_author_praise_rt12h
  ,features[18] user__kv_category_praise_rt12h
FROM (
  SELECT
    user_id
    ,UNIX_TIMESTAMP() sys_time
    ,SWCountCatesKVS(
      ARRAY[author,category]
      ,1
      ,event
      ,ARRAY['click','praise']
      ,delay_flag
      ,ARRAY[300,3600,43200]
      ,page
      ,ARRAY['']
      ,100
      ,user_id
      ,ARRAY['']
    ) features
  FROM rec_sln_demo_behavior_table_dh_wide_delay
  GROUP BY
    user_id
) sq0
;

CREATE TEMPORARY TABLE ali_rec_user_id_rt_statistic_feat
(
  user_id                         string
  ,sys_time                       bigint
  ,user__cnt_click_rt5m           bigint
  ,user__cnt_praise_rt5m          bigint
  ,user__cnt_click_rt1h           bigint
  ,user__cnt_praise_rt1h          bigint
  ,user__cnt_click_rt12h          bigint
  ,user__cnt_praise_rt12h         bigint
  ,user__kv_author_click_rt5m     string
  ,user__kv_category_click_rt5m   string
  ,user__kv_author_praise_rt5m    string
  ,user__kv_category_praise_rt5m  string
  ,user__kv_author_click_rt1h     string
  ,user__kv_category_click_rt1h   string
  ,user__kv_author_praise_rt1h    string
  ,user__kv_category_praise_rt1h  string
  ,user__kv_author_click_rt12h    string
  ,user__kv_category_click_rt12h  string
  ,user__kv_author_praise_rt12h   string
  ,user__kv_category_praise_rt12h string
)
WITH (
  'connector' = 'hologres'
  ,'endPoint' = '{hologres_endpoint}'
  ,'dbname' = 'mydb'
  ,'tablename' = 'ali_rec_user_id_rt_statistic_feat'
  ,'username' = '{accessId}'
  ,'password' = '{accessKey}'
  ,'jdbcWriteFlushInterval' = '500'
  ,'mutatetype' = 'insertorreplace'
)
;

BEGIN STATEMENT SET
;

INSERT INTO ali_rec_item_id_rt_statistic_feat
SELECT
  item_id
  ,sys_time
  ,item__cnt_click_rt5m
  ,item__cnt_praise_rt5m
  ,item__cnt_click_rt1h
  ,item__cnt_praise_rt1h
  ,item__cnt_click_rt12h
  ,item__cnt_praise_rt12h
  ,item__kv_gender_click_rt5m
  ,item__kv_city_click_rt5m
  ,item__kv_tags_click_rt5m
  ,item__kv_gender_praise_rt5m
  ,item__kv_city_praise_rt5m
  ,item__kv_tags_praise_rt5m
  ,item__kv_gender_click_rt1h
  ,item__kv_city_click_rt1h
  ,item__kv_tags_click_rt1h
  ,item__kv_gender_praise_rt1h
  ,item__kv_city_praise_rt1h
  ,item__kv_tags_praise_rt1h
  ,item__kv_gender_click_rt12h
  ,item__kv_city_click_rt12h
  ,item__kv_tags_click_rt12h
  ,item__kv_gender_praise_rt12h
  ,item__kv_city_praise_rt12h
  ,item__kv_tags_praise_rt12h
FROM dwd_item_id_rt_feature
;

INSERT INTO ali_rec_user_id_rt_statistic_feat
SELECT
  user_id
  ,sys_time
  ,user__cnt_click_rt5m
  ,user__cnt_praise_rt5m
  ,user__cnt_click_rt1h
  ,user__cnt_praise_rt1h
  ,user__cnt_click_rt12h
  ,user__cnt_praise_rt12h
  ,user__kv_author_click_rt5m
  ,user__kv_category_click_rt5m
  ,user__kv_author_praise_rt5m
  ,user__kv_category_praise_rt5m
  ,user__kv_author_click_rt1h
  ,user__kv_category_click_rt1h
  ,user__kv_author_praise_rt1h
  ,user__kv_category_praise_rt1h
  ,user__kv_author_click_rt12h
  ,user__kv_category_click_rt12h
  ,user__kv_author_praise_rt12h
  ,user__kv_category_praise_rt12h
FROM dwd_user_id_rt_feature
;

end
;
  1. 首先根據第一行下載對應的jar包,在Flink中注冊自定義函數。

  2. 第1段SQL是注冊DataHub的行為數據流。

  3. 第2,3段SQL是加載mc的user和item的靜態特征維表。

  4. 第4段SQL是行為數據流join用戶和物品維表信息,形成寬表。同時對userid進行了hash,該hash size需要和下游運行的MessageDelay函數搭配起來用。如果該size過大,會影響數據稀疏:比如一天內,在一些特定的hash后的key中,存在一些秒時間上是不存在數據,會導致數據延遲下發;如果size過小,則會出現數據在worker上傾斜,造成一部分worker計算量和內存消耗過大。

  5. 第5段SQL,是對數據進行滯后發送。當前數據進來后會立即下發一次,標delay_flag記0; 當數據流水位到達該數據下一個窗口,則該數據會再次下發一次,delay_flag為窗口。

MessageDelay函數是一個非常重要函數,具體介紹如下:

MessageDelay(
      ARRAY[user_id,item_id,event,page,gender,city,tags,CAST(author AS STRING),category]
      ,event_time
      ,ARRAY[300,3600,43200]
      ,group_key
      ,IF(event IN ('click','praise'), true, false)
      ,ARRAY['']
    ) delay_info

# 參數介紹
第一個參數是array<string>表示要滯后發送哪些字段
第二個event_time是10位的時間戳,表示行為時間,也是水位時間
第三個array<bigint>,滯后窗口, 是該消息滯后多久會再次被下發,單位是秒
第四個group_key是當前數據hash后的key
第五個是boolen的參數,如果是true,則該消息會根據滯后窗口被再次下發;如果是false,則觸發別的數據下發
第六個參數是debug使用,用于統計計算效率,會追加在聚合特征的后面輸出

# 輸出數據介紹
SELECT
  delay_info[1] user_id
  ,delay_info[2] item_id
  ,delay_info[3] event
  ,delay_info[4] page
  ,delay_info[5] gender
  ,delay_info[6] city
  ,delay_info[7] tags
  ,delay_info[8] author
  ,delay_info[9] category    # 1-9都是messagdelay函數的第一個入參,表示要滯后的字段
  ,CAST(delay_info[10] AS BIGINT) event_time  # 當前日志真實的發生時間,10位時間戳
  ,CAST(delay_info[11] AS BIGINT) water_time  # 被觸發輸出水位時間
  ,CAST(delay_info[12] AS BIGINT) reader_water_time  # 觸發日志開始處理時間
  ,CAST(delay_info[13] AS BIGINT) sys_time    # 當前系統時間,13位時間戳
  ,CAST(delay_info[14] AS INT) delay_flag     # 該日志被滯后的窗口
  ,delay_info[....]  # 如果MessageDelay最后一個入參不為空則會追加在后面,比如可以追加讀取到數據時間,則可以計算MessageDelay的效率如何
  1. 第6、8段SQL是統計實時特征的SQL

依賴的主要函數是SWCountCatesKVS,在下面參數中介紹了相關參數,并且展示了生成特征的規律。

"""
SWCountCatesKVS(
ARRAY[gender,city,tags]    # category_fields,要被統計的category_fields
,1                         # cnt_event,每出現一次日志的累加/減量
,event                     # event,當前日志的行為類型
,ARRAY['click','praise']   # valid_events,要統計特征的行為類型
,delay_flag                # messagedelay出入的日志延遲標識,根據次標識進行累加或者累減
,ARRAY[300,3600,43200]     # windows,要統計的日志窗口,單位是秒
,page                      # valid_scenes,場景信息字段
,ARRAY['']                 # 要單獨統計的日志場景,如果沒有單獨統計的場景,給個空字符串,和場景字段配合使用
,100                       # kv字段保留的最多的類目值得數量
,item_id                   # 當前聚合key,item_id或者user_id
,ARRAY['']                 # debug使用的信息,會追加在特征后面,不用可以給個空字符串
)
"""

# 生成的特征規律如下
for sc in ['all']+list(valid_scenes):  # 遍歷場景,默認用all來標識全場景
  for win in windows:          # 遍歷窗口
    for event in valid_events:  # 遍歷行為類型
      print(cnt_event)   # 行為次數統計
      for c in enumurate(category_fields): #遍歷類目字段
        print(kv_c_event)  # 類目c在行為event下統計的kv字段

數據檢查

以上2種方式,我們可以得到離線訓練使用的實時特征和推理使用的實時特征。但是,日志數據在數據鏈路中可能存在延遲、丟失、重復等問題,這會導致模擬的實時特征和真實統計的實時特征之間存在差異。我們將每一時刻存儲到Hologres的特征同步存到MaxCompute,可以和離線模擬的實時特征進行一致性的對比,需要注意模擬實時特征該參數pre_seconds必須設置為0。

select ,avg(if(COMP_RT_FEA(user__kv_brand_id_click_rt_15m_offline,user__kv_brand_id_click_rt_15m_online)>0,0,1))  acc_user__kv_brand_id_click_rt_15m
from(
    select t1.user_id
      ,t1.request_id
      ,FROM_UNIXTIME(event_unix_time) tt
      ,t1.event_unix_time
      ,t2.request_time
      ,t2.sys_time
      ,t1.user__kv_brand_id_click_rt_15m as user__kv_brand_id_click_rt_15m_offline
      ,t2.user__kv_brand_id_click_rt_15m as user__kv_brand_id_click_rt_15m_online
    from(
        select *
        from(
            SELECT  *,ROW_NUMBER() OVER(PARTITION by user_id,request_id order by event_unix_time asc) rnk
            from dwd_user_rt_feature_offline
            where dt='${bdp.system.bizdate}'
            and LENGTH(request_id)>0
        )t where rnk=1
    )t1 join (
        select *
        from(
            SELECT *,ROW_NUMBER() OVER(PARTITION by user_id,request_id order by request_time asc) rnk
            from user_rt_feature_online
            where dt='${bdp.system.bizdate}'
        ) t where rnk=1
    )t2 on t1.user_id=t2.user_id and t1.request_id=t2.request_id and t1.event_unix_time=t2.request_time
)

檢查函數如下:

# Copyright (c) Alibaba, Inc. and its affiliates.
from odps.udf import annotate


@annotate('string, string->bigint')
class CompRTFea():
    """Compare 2 rt feature."""

    def evaluate(self, fea1, fea2):
        def _parse_fea(fea):
            if fea is None:
                return {}
            toks = [x for x in fea.split(chr(29)) if ':' in x]
            # toks = [ x for x in fea.split() if ':' in x ]
            kvs = {}
            for kv_s in toks:
                k, v = kv_s.split(':')
                kvs[k] = float(v)
            return kvs
        fea1_kvs = _parse_fea(fea1)
        fea2_kvs = _parse_fea(fea2)
        all_keys = set(fea1_kvs.keys())
        all_keys.update(fea2_kvs.keys())
        err_cnt = 0
        for k in all_keys:
            if k in fea1_kvs and k in fea2_kvs:
                if fea1_kvs[k] != fea2_kvs[k]:
                    err_cnt += 1
            else:
                err_cnt += 1
        return err_cnt