滑動窗口
本文為您介紹如何使用Flink滑動窗口函數(shù)。
定義
滑動窗口(HOP),也被稱作Sliding Window。不同于滾動窗口,滑動窗口的窗口可以重疊。
函數(shù)語法
HOP函數(shù)用在GROUP BY子句中,用來定義滑動窗口。
HOP(<time-attr>, <slide-interval>,<size-interval>)
入?yún)?/h2>
參數(shù) | 說明 | 示例 |
time-attr | 參數(shù)必須是流中的一個合法的時間屬性字段,指定為Processing Time或Event Time。詳情請參見時間屬性。 | - |
slide-interval | 滑動窗口移動的間隔,定義了連續(xù)窗口之間的時間差。格式為 |
|
size-interval | 滑動窗口的大小或持續(xù)時間,定義了每個窗口覆蓋的時間范圍。格式為 |
|
滑動窗口根據(jù)slide-interval和size-interval配置大小不同,分為以下三種情況:
slide-interval < size-interval,則窗口會重疊,每個元素會被分配到多個窗口。
slide-interval = size-interval,則等同于滾動窗口(TUMBLE)。
slide-interval > size-interval,則為跳躍窗口,窗口之間不重疊且有間隙。
通常,大部分元素符合多個窗口情景,窗口是重疊的。因此,滑動窗口在計算移動平均數(shù)(moving averages)時很適用。例如,計算過去5分鐘數(shù)據(jù)的平均值,每10秒鐘更新一次,可以設(shè)置slide-interval為10秒,size-interval為5分鐘。
標(biāo)識函數(shù)
使用滑動窗口標(biāo)識函數(shù)選出窗口的起始時間或者結(jié)束時間,窗口的時間屬性用于下級Window的聚合。
窗口標(biāo)識函數(shù) | 返回類型 | 描述 |
| TIMESTAMP | 返回窗口的起始時間(包含邊界)。例如 |
| TIMESTAMP | 返回窗口的結(jié)束時間(包含邊界)。例如 |
| TIMESTAMP(rowtime-attr) | 返回窗口的結(jié)束時間(不包含邊界)。例如 |
| TIMESTAMP(rowtime-attr) | 返回窗口的結(jié)束時間(不包含邊界)。例如 |
示例
統(tǒng)計每個用戶過去1分鐘的單擊次數(shù),每30秒更新1次,即1分鐘的窗口,30秒滑動1次。
測試表user_clicks數(shù)據(jù)
username(VARCHAR)
click_url(VARCHAR)
eventtime(VARCHAR)
Jark
http://taobao.com/xxx
2024-10-10 10:00:00.0
Jark
http://taobao.com/xxx
2024-10-10 10:00:10.0
Jark
http://taobao.com/xxx
2024-10-10 10:00:49.0
Jark
http://taobao.com/xxx
2024-10-10 10:01:05.0
Jark
http://taobao.com/xxx
2024-10-10 10:01:58.0
Timo
http://taobao.com/xxx
2024-10-10 10:02:10.0
測試語句
CREATE TEMPORARY TABLE user_clicks ( username VARCHAR, click_url VARCHAR, eventtime VARCHAR, ts AS TO_TIMESTAMP(eventtime), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --為Rowtime定義Watermark。 ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<brokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE hop_output ( window_start TIMESTAMP, window_end TIMESTAMP, username VARCHAR, clicks BIGINT ) WITH ( 'connector'='print', 'logger'='true' ); INSERT INTO hop_output SELECT HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username, COUNT (click_url) FROM user_clicks GROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),username;
測試結(jié)果
window_start (TIMESTAMP)
window_end (TIMESTAMP)
username (VARCHAR)
clicks (BIGINT)
2024-10-10 09:59:30.0
2024-10-10 10:00:30.0
Jark
2
2024-10-10 10:00:00.0
2024-10-10 10:01:00.0
Jark
3
2024-10-10 10:00:30.0
2024-10-10 10:01:30.0
Jark
2
2024-10-10 10:01:00.0
2024-10-10 10:02:00.0
Jark
2
2024-10-10 10:01:30.0
2024-10-10 10:02:30.0
Jark
1
2024-10-10 10:01:30.0
2024-10-10 10:02:30.0
Timo
1
2017-10-10 10:02:30.0
2017-10-10 10:03:30.0
Timo
1
HOP窗口無法讀取數(shù)據(jù)進入的時間,第一個窗口的開啟時間會前移。前移時長=窗口時長-滑動步長,示例如下表。
窗口時長(秒)
滑動步長(秒)
Event Time
第一個窗口StartTime
第一個窗口EndTime
120
30
2024-07-31 10:00:00.0
2024-07-31 09:58:30.0
2024-07-31 10:00:30.0
60
10
2024-07-31 10:00:00.0
2024-07-31 09:59:10.0
2024-07-31 10:00:10.0