IntervalJoin語句
本文為您介紹如何使用IntervalJoin語句。
背景信息
IntervalJoin語句可以讓兩個流進行JOIN時,左流和右流中每條記錄只關(guān)聯(lián)另外一條流上滿足定義的時間范圍內(nèi)的數(shù)據(jù),且進行完JOIN后,仍然保留輸入流上的時間列,讓您繼續(xù)進行基于Event Time的操作。
語法格式
SELECT column-names
FROM table1 [AS <alias1>]
[INNER | LEFT | RIGHT |FULL ] JOIN table2
ON table1.column-name1 = table2.key-name1 AND TIMEBOUND_EXPRESSION
支持INNER JOIN、LEFT JOIN、RIGHT JOIN和FULL JOIN,如果直接使用JOIN,默認為INNER JOIN。
暫不支持SEMI JOIN和ANTI JOIN。
TIMEBOUND_EXPRESSION為左右兩個流時間屬性列上的區(qū)間條件表達式,支持以下三種條件表達式:
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
示例1(基于Event Time)
統(tǒng)計下單后4個小時內(nèi)的物流信息。
測試數(shù)據(jù)
訂單表(Orders)
id
productName
orderTime
1
iphone
2020-04-01 10:00:00.0
2
mac
2020-04-01 10:02:00.0
3
huawei
2020-04-01 10:03:00.0
4
pad
2020-04-01 10:05:00.0
物流表(Shipments)
shipId
orderId
status
shiptime
0
1
shipped
2020-04-01 11:00:00.0
1
2
delivered
2020-04-01 17:00:00.0
2
3
shipped
2020-04-01 12:00:00.0
3
4
shipped
2020-04-01 11:30:00.0
測試語句
CREATE TEMPORARY TABLE Orders( id BIGINT, productName VARCHAR, orderTime TIMESTAMP, WATERMARK wk FOR orderTime as withOffset(orderTime, 2000) --為rowtime定義Watermark。 ) WITH ( type='datahub', endpoint='<yourEndpoint>', accessId='<yourAccessID>', accessKey='<yourAccessSecret>', projectName='<yourProjectName>', topic='<yourTopic>', project='<yourProjectName>' ); CREATE TEMPORARY TABLE Shipments( shipId BIGINT, orderId BIGINT, status VARCHAR, shiptime TIMESTAMP, WATERMARK wk FOR shiptime as withOffset(shiptime, 2000) --為rowtime定義Watermark。 ) WITH ( type='datahub', endpoint='<yourEndpoint>', accessId='<yourAccessID>', accessKey='<yourAccessSecret>', projectName='<yourProjectName>', topic='<yourTopic>', project='<yourProjectName>' ); --使用RDS作為結(jié)果表。 CREATE TEMPORARY TABLE rds_output( id BIGINT, productName VARCHAR, status VARCHAR ) WITH ( type='rds', url='<yourDatabaseURL>', tableName='<yourDatabaseTablename>', userName='<yourDatabaseUserName>', password='<yourDatabasePassword>' ); INSERT INTO rds_output SELECT id, productName, status FROM Orders AS o JOIN Shipments AS s on o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime;
測試結(jié)果
id(bigint)
productName(varchar)
status(varchar)
1
iphone
shipped
3
huawei
shipped
4
pad
shipped
示例2(基于Processing Time)
測試數(shù)據(jù)
datahub_stream1
k1
v1
1
val1
2
val2
3
val3
datahub_stream2
k1
v1
1
val1
2
val2
3
val3
測試語句
CREATE TEMPORARY TABLE datahub_stream1 ( k1 BIGINT, v1 VARCHAR, d AS PROCTIME() ) WITH ( 'connector' = 'datahub', 'subId' = '<yourSubId>', 'endPoint' = '<yourEndPoint>', 'project' = '<yourProjectName>', 'topic' = '<yourTopicName>', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessKey>' ); CREATE TEMPORARY TABLE datahub_stream2 ( k2 BIGINT, v2 VARCHAR, e AS PROCTIME() ) WITH ( 'connector' = 'datahub', 'subId' = '<yourSubId>', 'endPoint' = '<yourEndPoint>', 'project' = '<yourProjectName>', 'topic' = '<yourTopicName>', 'accessId' = '<yourAccessId>', 'accessKey' = '<yourAccessKey>' ); --使用RDS作為結(jié)果表。 CREATE TEMPORARY TABLE rds_output( k1 BIGINT, v1 VARCHAR, v2 VARCHAR ) WITH ( 'connector'='rds', 'tableName'='your-table-name', 'userName'='your-user-name', 'password'='your-password', 'url'='your-url' ); INSERT INTO rds_output SELECT k1, v1, v2 FROM datahub_stream1 AS o JOIN datahub_stream2 AS s on o.k1 = s.k2 AND o.d BETWEEN s.e - INTERVAL '4' MINUTE AND s.e;
由于結(jié)果取決于兩個流里每條數(shù)據(jù)進入系統(tǒng)的時間,具有不確定性,因此該示例暫不提供預期結(jié)果。