雙流JOIN語(yǔ)句
Flink SQL支持對(duì)動(dòng)態(tài)表進(jìn)行復(fù)雜而靈活的連接操作,本文為您介紹如何使用雙流JOIN語(yǔ)句。
背景信息
實(shí)時(shí)計(jì)算的JOIN和傳統(tǒng)批處理JOIN的語(yǔ)義一致,都用于將兩張表關(guān)聯(lián)起來。區(qū)別為實(shí)時(shí)計(jì)算關(guān)聯(lián)的是兩張動(dòng)態(tài)表,關(guān)聯(lián)的結(jié)果也會(huì)動(dòng)態(tài)更新,以保證最終結(jié)果和批處理結(jié)果一致。
雙流JOIN語(yǔ)法
tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference:表名稱。
tableExpression:表達(dá)式。
joinCondition:JOIN條件。
雙流JOIN hints
從實(shí)時(shí)計(jì)算引擎VVR 8.0.1 開始,您可以通過提示(Hints)單獨(dú)為雙流JOIN的左右流狀態(tài)設(shè)置不同生命周期 (TTL)來減少維護(hù)的狀態(tài)大小。
語(yǔ)法
-- VVR 8.0.1 開始 SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ... -- VVR 8.0.7 開始,您也可以使用社區(qū)的Join State TTL Hint語(yǔ)法 SELECT /*+ STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...
注意事項(xiàng)
JOIN STATE TTL HINT僅支持在雙流JOIN場(chǎng)景使用,不支持維表JOIN、Interval Join或Window Join。
若雙流JOIN時(shí)JOIN STATE TTL HINT僅指定某一條流的在JOIN節(jié)點(diǎn)的狀態(tài)生命周期,則另外一條流的狀態(tài)生命周期使用Flink SQL作業(yè)級(jí)別的狀態(tài)生命周期,由table.exec.state.ttl控制(參見基本配置),默認(rèn)值為1.5天。
tableReference支持表名,視圖名和別名,一旦為表名指定別名時(shí),則需使用別名。
這是一個(gè)實(shí)驗(yàn)性質(zhì)的特性,HINT語(yǔ)法未來可能會(huì)發(fā)生變化。
示例
-- HINT使用別名 SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */ o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid; -- VVR 8.0.7及以上版本也可以使用新語(yǔ)法 SELECT /*+ STATE_TTL('o' = '3d', 'p' = '1d') */ o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid; -- HINT使用表名 SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ * FROM Orders JOIN Products ON Orders.productid = Products.productid; -- VVR 8.0.7及以上版本也可以使用新語(yǔ)法 SELECT /*+ STATE_TTL('Orders' = '3d', 'Products' = '1d') */ * FROM Orders JOIN Products ON Orders.productid = Products.productid; -- HINT使用視圖名 CREATE TEMPORARY VIEW v AS SELECT id, ... FROM ( SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rn FROM src1 WHERE ... ) tmp WHERE rn = 1; SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.* FROM v LEFT JOIN src2 AS b ON v.id = b.id; -- VVR 8.0.7及以上版本也可以使用新語(yǔ)法 SELECT /*+ STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.* FROM v LEFT JOIN src2 AS b ON v.id = b.id;
Orders JOIN Products表的數(shù)據(jù)示例
測(cè)試數(shù)據(jù)
表 1. Orders
rowtime
productid
orderid
units
10:17:00
30
5
4
10:17:05
10
6
1
10:18:05
20
7
2
10:18:07
30
8
20
11:02:00
10
9
6
11:04:00
10
10
1
11:09:30
40
11
12
11:24:11
10
12
4
表 2. Products
productid
name
unitprice
30
Cheese
17
10
Beer
0.25
20
Wine
6
30
Cheese
17
10
Beer
0.25
10
Beer
0.25
40
Bread
100
10
Beer
0.25
測(cè)試語(yǔ)句
SELECT o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid;
測(cè)試結(jié)果
o.rowtime
o.productid
o.orderid
o.units
p.name
p.unitprice
10:17:00
30
5
4
Cheese
17.00
10:17:00
30
5
4
Cheese
17.00
10:17:05
10
6
1
Beer
0.25
10:17:05
10
6
1
Beer
0.25
10:17:05
10
6
1
Beer
0.25
10:17:05
10
6
1
Beer
0.25
10:18:05
20
7
2
Wine
6.00
10:18:07
30
8
20
Cheese
17.00
10:18:07
30
8
20
Cheese
17.00
11:02:00
10
9
6
Beer
0.25
11:02:00
10
9
6
Beer
0.25
11:02:00
10
9
6
Beer
0.25
11:02:00
10
9
6
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:09:30
40
11
12
Bread
100.00
11:24:11
10
12
4
Beer
0.25
11:24:11
10
12
4
Beer
0.25
11:24:11
10
12
4
Beer
0.25
11:24:11
10
12
4
Beer
0.25
datahub_stream1 JOIN datahub_stream2表的數(shù)據(jù)示例
測(cè)試數(shù)據(jù)
表 3. datahub_stream1
a(BIGINT)
b(BIGINT)
c(VARCHAR)
0
10
test11
1
10
test21
表 4. datahub_stream2
a(BIGINT)
b(BIGINT)
c(VARCHAR)
0
10
test11
1
10
test21
0
10
test31
1
10
test41
測(cè)試語(yǔ)句
SELECT s1.c,s2.c FROM datahub_stream1 AS s1 JOIN datahub_stream2 AS s2 ON s1.a = s2.a WHERE s1.a = 0;
測(cè)試結(jié)果
s1.c(VARCHAR)
s2.c(VARCHAR)
test11
test11
test11
test31