日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

會話窗口

本文為您介紹如何使用Flink會話窗口函數(shù)。

定義

會話窗口(SESSION)通過SESSION活動來對元素進行分組。會話窗口與滾動窗口和滑動窗口相比,沒有窗口重疊,沒有固定窗口大小。相反,當它在一個固定的時間周期內不再收到元素,即會話斷開時,該窗口就會關閉。

函數(shù)語法

SESSION函數(shù)用于在GROUP BY子句中定義會話窗口。

SESSION(<time-attr>, <gap-interval>)

入?yún)?/h2>

參數(shù)

說明

示例

time-attr

參數(shù)必須是流中的一個合法的時間屬性字段,指定為Processing Time或Event Time。詳情請參見時間屬性

-

gap-interval

會話的超時時間或不活動間隔。如果在一個會話的最后一個元素到達后的 <gap-interval> 時間內沒有新的元素到達,則該會話將關閉。任何后續(xù)到達的元素都將被分配到一個新的會話中。格式為INTERVAL 'num' timeUnit

INTERVAL '10' SECOND設置會話的超時時間為10秒。

標識函數(shù)

使用標識函數(shù)選出窗口的起始時間或者結束時間,窗口的時間屬性用于下級Window的聚合。

窗口標識函數(shù)

返回類型

描述

SESSION_START(<time-attr>, <gap-interval>)

Timestamp

返回窗口的起始時間(包含邊界)。例如[00:10,00:15]的窗口,返回00:10,即為此會話窗口內第一條記錄的時間。

SESSION_END(<time-attr>, <gap-interval>)

Timestamp

返回窗口的結束時間(包含邊界)。例如[00:00,00:15]的窗口,返回 00:15,即為此會話窗口內最后一條記錄的時間+<gap-interval>

SESSION_ROWTIME(<time-attr>, <gap-interval>)

Timestamp(rowtime-attr)

返回窗口的結束時間(不包含邊界)。例如(00:00,00:15)的窗口,返回00:14:59.999 。返回值是一個rowtime attribute,也就是可以基于該字段進行時間類型的操作,例如級聯(lián)窗口。該參數(shù)只能用于基于Event Time的Window。

SESSION_PROCTIME(<time-attr>, <gap-interval>)

Timestamp(rowtime-attr)

返回窗口的結束時間(不包含邊界)。例如(00:00,00:15)的窗口,返回 00:14:59.999 。返回值是一個Proctime Attribute,也就是可以基于該字段進行時間類型的操作,例如級聯(lián)窗口。該參數(shù)只能用于基于Processing Time的Window。

示例

統(tǒng)計每個用戶在每個活躍會話期間的點擊次數(shù),會話超時時長為30秒。

  • 測試表user_clicks數(shù)據(jù)

    username (VARCHAR)

    click_url (VARCHAR)

    eventtime (VARCHAR)

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:00.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:10.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:00:49.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:01:05.0

    Jark

    http://taobao.com/xxx

    2024-10-10 10:01:58.0

    Timo

    http://taobao.com/xxx

    2024-10-10 10:02:10.0

  • 測試語句

    CREATE TEMPORARY TABLE user_clicks(
      username varchar,
      click_url varchar,
      eventtime varchar,                            
      ts AS TO_TIMESTAMP(eventtime),
      WATERMARK FOR ts AS ts - INTERVAL '2' SECOND  --為Rowtime定義Watermark。
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<brokers>',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE session_output(
      window_start TIMESTAMP,
      window_end TIMESTAMP,
      username VARCHAR,
      clicks BIGINT
    ) WITH (
      'connector'='print',
      'logger'='true'
    );
    
    INSERT INTO session_output
    SELECT
    SESSION_START(ts, INTERVAL '30' SECOND),
    SESSION_END(ts, INTERVAL '30' SECOND),
    username,
    COUNT(click_url)
    FROM user_clicks
    GROUP BY SESSION(ts, INTERVAL '30' SECOND), username;
  • 測試結果

    window_start (TIMESTAMP)

    window_end (TIMESTAMP)

    username (VARCHAR)

    clicks (BIGINT)

    2024-10-10 10:00:00.0

    2024-10-10 10:00:40.0

    Jark

    2

    2024-10-10 10:00:49.0

    2024-10-10 10:01:35.0

    Jark

    2

    2024-10-10 10:01:58.0

    2024-10-10 10:02:28.0

    Jark

    1

    2024-10-10 10:02:10.0

    2024-10-10 10:02:40.0

    Timo

    1