本文為您介紹如何使用Flink會話窗口函數(shù)。
定義
會話窗口(SESSION)通過SESSION活動來對元素進行分組。會話窗口與滾動窗口和滑動窗口相比,沒有窗口重疊,沒有固定窗口大小。相反,當它在一個固定的時間周期內不再收到元素,即會話斷開時,該窗口就會關閉。
函數(shù)語法
SESSION函數(shù)用于在GROUP BY子句中定義會話窗口。
SESSION(<time-attr>, <gap-interval>)
入?yún)?/h2>
參數(shù) | 說明 | 示例 |
time-attr | 參數(shù)必須是流中的一個合法的時間屬性字段,指定為Processing Time或Event Time。詳情請參見時間屬性。 | - |
gap-interval | 會話的超時時間或不活動間隔。如果在一個會話的最后一個元素到達后的 |
|
標識函數(shù)
使用標識函數(shù)選出窗口的起始時間或者結束時間,窗口的時間屬性用于下級Window的聚合。
窗口標識函數(shù) | 返回類型 | 描述 |
| Timestamp | 返回窗口的起始時間(包含邊界)。例如 |
| Timestamp | 返回窗口的結束時間(包含邊界)。例如 |
| Timestamp(rowtime-attr) | 返回窗口的結束時間(不包含邊界)。例如 |
| Timestamp(rowtime-attr) | 返回窗口的結束時間(不包含邊界)。例如 |
示例
統(tǒng)計每個用戶在每個活躍會話期間的點擊次數(shù),會話超時時長為30秒。
測試表user_clicks數(shù)據(jù)
username (VARCHAR)
click_url (VARCHAR)
eventtime (VARCHAR)
Jark
http://taobao.com/xxx
2024-10-10 10:00:00.0
Jark
http://taobao.com/xxx
2024-10-10 10:00:10.0
Jark
http://taobao.com/xxx
2024-10-10 10:00:49.0
Jark
http://taobao.com/xxx
2024-10-10 10:01:05.0
Jark
http://taobao.com/xxx
2024-10-10 10:01:58.0
Timo
http://taobao.com/xxx
2024-10-10 10:02:10.0
測試語句
CREATE TEMPORARY TABLE user_clicks( username varchar, click_url varchar, eventtime varchar, ts AS TO_TIMESTAMP(eventtime), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --為Rowtime定義Watermark。 ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<brokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE session_output( window_start TIMESTAMP, window_end TIMESTAMP, username VARCHAR, clicks BIGINT ) WITH ( 'connector'='print', 'logger'='true' ); INSERT INTO session_output SELECT SESSION_START(ts, INTERVAL '30' SECOND), SESSION_END(ts, INTERVAL '30' SECOND), username, COUNT(click_url) FROM user_clicks GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;
測試結果
window_start (TIMESTAMP)
window_end (TIMESTAMP)
username (VARCHAR)
clicks (BIGINT)
2024-10-10 10:00:00.0
2024-10-10 10:00:40.0
Jark
2
2024-10-10 10:00:49.0
2024-10-10 10:01:35.0
Jark
2
2024-10-10 10:01:58.0
2024-10-10 10:02:28.0
Jark
1
2024-10-10 10:02:10.0
2024-10-10 10:02:40.0
Timo
1