窗口函數
本文為您介紹Flink SQL支持的窗口函數以及窗口函數支持的時間屬性和窗口類型。
窗口函數
Flink SQL窗口函數支持基于無限大窗口的聚合(無需在SQL Query中,顯式定義任何窗口)以及對一個特定的窗口的聚合。例如,需要統計在過去的1分鐘內有多少用戶單擊了某個網頁,可以通過定義一個窗口來收集最近1分鐘內的數據,并對這個窗口內的數據進行實時計算。
Flink SQL支持的窗口聚合主要是兩種:Window聚合和Over聚合。本文檔主要為您介紹Window聚合。Window聚合支持Event Time和Processing Time兩種時間屬性定義窗口。每種時間屬性類型支持三種窗口類型:滾動窗口(TUMBLE)、滑動窗口(HOP)和會話窗口(SESSION)。
滾動窗口、滑動窗口和會話窗口不能與last_value、first_value、TopN函數合用,會造成數據亂序被丟棄,導致結果數據異常。
時間屬性
Flink SQL支持Event Time和Processing Time兩種時間屬性,時間屬性詳情請參見時間屬性。Flink可以基于這兩種時間屬性對數據進行窗口聚合。基于這兩種時間屬性開窗的區別如下:
Event Time:您提供的事件時間,通常是數據的最原始的創建時間。
系統會根據數據的Event Time生成的Watermark來進行關窗。只有當Watermark大于關窗時間,才會觸發窗口的結束,窗口結束才會輸出結果。如果一直沒有觸發窗口結束的數據流入Flink,則該窗口就無法輸出數據。單個subtask的Watermark是遞增的,多個subtask或者多個源表的Watermark取最小值。
重要如果源表中存在一條未來的亂序數據或者某個subtask或上游源表的某個分區中沒有數據,則可能會無法觸發窗口結束,從而導致結果數據異常。因此您需要根據數據亂序的程度設置合理的offset大小,并保證所有subtask和上游源表的所有分區中都有數據。如果某個subtask或上游源表的某個分區中沒有數據,導致Watermark無法推進,窗口無法及時結束,則可以在更多Flink配置中添加
table.exec.source.idle-timeout: 10s
來觸發窗口結束。該參數含義詳情請參見Configuration。數據經過GroupBy、雙流JOIN或OVER窗口節點后,會導致Watermark屬性丟失,無法再使用Event Time進行開窗。
Processing Time:對事件進行處理的本地系統時間。
Processing Time是Flink系統產生的,不包含在用戶的原始數據中。因此需要您顯式定義一個Processing Time列。
說明因為Processing Time容易受到事件到達Flink系統的速度及Flink內部處理數據順序的影響,所以每次回溯數據的結果可能不一致。
級聯窗口
Rowtime列在經過窗口操作后,其Event Time屬性將丟失。您可以使用輔助函數TUMBLE_ROWTIME
、HOP_ROWTIME
或SESSION_ROWTIME
,獲取窗口中的Rowtime列的最大值max(rowtime)
作為時間窗口的Rowtime,其類型是具有Rowtime屬性的TIMESTAMP,取值為window_end - 1
。 例如[00:00, 00:15)
的窗口,返回值為00:14:59.999
。
示例邏輯為:基于1分鐘的滾動窗口聚合結果,進行1小時的滾動窗口聚合,可以滿足您的多維度開窗需求。
CREATE TEMPORARY TABLE user_clicks(
username varchar,
click_url varchar,
eventtime varchar,
ts AS TO_TIMESTAMP(eventtime),
WATERMARK FOR ts AS ts - INTERVAL '2' SECOND --為Rowtime定義Watermark。
) with (
'connector'='sls',
...
);
CREATE TEMPORARY TABLE tumble_output(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) with (
'connector'='datahub' --目前SLS只支持輸出VARCHAR類型的DDL,所以使用DataHub存儲。
...
);
CREATE TEMPORARY VIEW one_minute_window_output AS
SELECT
TUMBLE_ROWTIME(ts, INTERVAL '1' MINUTE) as rowtime, --使用TUMBLE_ROWTIME作為二級Window的聚合時間。
username,
COUNT(click_url) as cnt
FROM user_clicks
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),username;
BEGIN statement set;
INSERT INTO tumble_output
SELECT
TUMBLE_START(rowtime, INTERVAL '1' HOUR),
TUMBLE_END(rowtime, INTERVAL '1' HOUR),
username,
SUM(cnt)
FROM one_minute_window_output
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), username;
END;
中間數據
窗口的中間數據分為Keyed State和定時器(Timer)兩類,可以分別存儲在不同的存儲介質中,您可以根據作業的特點選擇不同的搭配,目前支持以下四種選擇:
Keyed State存儲 | 定時器存儲介質 |
內存 | |
內存 | |
內存 | |
文件 |
有關狀態存儲后端的選擇請參見企業級狀態后端存儲介紹和HashMapStateBackend。定時器主要用于觸發過期的窗口,在內存充裕的情況下,將定時器存儲在內存中可以得到更好的性能;在定時器較多或內存資源緊張的情況下,您可以選擇RocksDB StateBackend將定時器存儲在RocksDB的文件中。