日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

IntervalJoin語句

本文為您介紹如何使用IntervalJoin語句。

背景信息

IntervalJoin語句可以讓兩個流進行JOIN時,左流和右流中每條記錄只關(guān)聯(lián)另外一條流上滿足定義的時間范圍內(nèi)的數(shù)據(jù),且進行完JOIN后,仍然保留輸入流上的時間列,讓您繼續(xù)進行基于Event Time的操作。

語法格式

SELECT column-names
FROM table1  [AS <alias1>]
[INNER | LEFT | RIGHT |FULL ] JOIN table2 
ON table1.column-name1 = table2.key-name1 AND TIMEBOUND_EXPRESSION
說明
  • 支持INNER JOIN、LEFT JOIN、RIGHT JOIN和FULL JOIN,如果直接使用JOIN,默認為INNER JOIN。

  • 暫不支持SEMI JOIN和ANTI JOIN。

  • TIMEBOUND_EXPRESSION為左右兩個流時間屬性列上的區(qū)間條件表達式,支持以下三種條件表達式:

    • ltime = rtime

    • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE

    • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

示例1(基于Event Time)

統(tǒng)計下單后4個小時內(nèi)的物流信息。

  • 測試數(shù)據(jù)

    • 訂單表(Orders)

      id

      productName

      orderTime

      1

      iphone

      2020-04-01 10:00:00.0

      2

      mac

      2020-04-01 10:02:00.0

      3

      huawei

      2020-04-01 10:03:00.0

      4

      pad

      2020-04-01 10:05:00.0

    • 物流表(Shipments)

      shipId

      orderId

      status

      shiptime

      0

      1

      shipped

      2020-04-01 11:00:00.0

      1

      2

      delivered

      2020-04-01 17:00:00.0

      2

      3

      shipped

      2020-04-01 12:00:00.0

      3

      4

      shipped

      2020-04-01 11:30:00.0

  • 測試語句

    CREATE TEMPORARY TABLE Orders(
      id BIGINT,
      productName VARCHAR,
      orderTime TIMESTAMP,
      WATERMARK wk FOR orderTime as withOffset(orderTime, 2000)  --為rowtime定義Watermark。
    ) WITH (
      type='datahub',
      endpoint='<yourEndpoint>',
      accessId='<yourAccessID>',
      accessKey='<yourAccessSecret>',
      projectName='<yourProjectName>',
      topic='<yourTopic>',
      project='<yourProjectName>'
    );
    
    CREATE TEMPORARY TABLE Shipments(
      shipId BIGINT,
      orderId BIGINT,
      status VARCHAR,
      shiptime TIMESTAMP,
      WATERMARK wk FOR shiptime as withOffset(shiptime, 2000)  --為rowtime定義Watermark。
    ) WITH (
      type='datahub',
      endpoint='<yourEndpoint>',
      accessId='<yourAccessID>',
      accessKey='<yourAccessSecret>',
      projectName='<yourProjectName>',
      topic='<yourTopic>',
      project='<yourProjectName>'
    );
    
    --使用RDS作為結(jié)果表。
    CREATE TEMPORARY TABLE rds_output(
      id BIGINT,
      productName VARCHAR,
      status VARCHAR
    ) WITH (
      type='rds',
      url='<yourDatabaseURL>',
      tableName='<yourDatabaseTablename>',
      userName='<yourDatabaseUserName>',
      password='<yourDatabasePassword>'
    );
    
    INSERT INTO rds_output
    SELECT id, productName, status
    FROM Orders AS o
    JOIN Shipments AS s on o.id = s.orderId AND
         o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime;
  • 測試結(jié)果

    id(bigint)

    productName(varchar)

    status(varchar)

    1

    iphone

    shipped

    3

    huawei

    shipped

    4

    pad

    shipped

示例2(基于Processing Time)

  • 測試數(shù)據(jù)

    • datahub_stream1

      k1

      v1

      1

      val1

      2

      val2

      3

      val3

    • datahub_stream2

      k1

      v1

      1

      val1

      2

      val2

      3

      val3

  • 測試語句

    CREATE TEMPORARY TABLE datahub_stream1 (
      k1 BIGINT,
      v1 VARCHAR,
      d AS PROCTIME()
    ) WITH (
      'connector' = 'datahub',
      'subId' = '<yourSubId>',
      'endPoint' = '<yourEndPoint>',
      'project' = '<yourProjectName>',
      'topic' = '<yourTopicName>',
      'accessId' = '<yourAccessId>',
      'accessKey' = '<yourAccessKey>'
    );
    
    CREATE TEMPORARY TABLE datahub_stream2 (
      k2 BIGINT,
      v2 VARCHAR,
      e AS PROCTIME()
    ) WITH (
      'connector' = 'datahub',
      'subId' = '<yourSubId>',
      'endPoint' = '<yourEndPoint>',
      'project' = '<yourProjectName>',
      'topic' = '<yourTopicName>',
      'accessId' = '<yourAccessId>',
      'accessKey' = '<yourAccessKey>'
    );
    
    --使用RDS作為結(jié)果表。
    CREATE TEMPORARY TABLE rds_output(
      k1 BIGINT,
      v1 VARCHAR,
      v2 VARCHAR
    ) WITH (
      'connector'='rds',
      'tableName'='your-table-name',
      'userName'='your-user-name',
      'password'='your-password',
      'url'='your-url'
    );
    
    INSERT INTO rds_output
    SELECT k1, v1, v2
    FROM datahub_stream1 AS o
    JOIN datahub_stream2 AS s on o.k1 = s.k2 AND
         o.d BETWEEN s.e - INTERVAL '4' MINUTE AND s.e;
說明

由于結(jié)果取決于兩個流里每條數(shù)據(jù)進入系統(tǒng)的時間,具有不確定性,因此該示例暫不提供預期結(jié)果。