維表JOIN語句
對于每條流式數(shù)據(jù),可以關(guān)聯(lián)一個(gè)外部維表數(shù)據(jù)源,為實(shí)時(shí)計(jì)算Flink版提供數(shù)據(jù)關(guān)聯(lián)查詢。
背景信息
大部分連接器的維表Join都可以使用Cache策略,不同連接器對Cache策略的支持情況稍有不同,請查看對應(yīng)的連接器文檔確定具體的支持情況。通用的Cache策略詳情如下:
None(默認(rèn)值):無緩存。
LRU:緩存維表里的部分?jǐn)?shù)據(jù)。源表的每條數(shù)據(jù)都會觸發(fā)系統(tǒng)先在Cache中查找數(shù)據(jù),如果沒有找到,則去物理維表中查找。
ALL:緩存維表里的所有數(shù)據(jù)。在Job運(yùn)行前,系統(tǒng)會將維表中所有數(shù)據(jù)加載到Cache中,之后所有的維表查找數(shù)據(jù)都會通過Cache進(jìn)行。如果在Cache中無法找到數(shù)據(jù),則KEY不存在。全量的Cache有一個(gè)過期時(shí)間,過期后會重新加載一遍全量Cache。適用于遠(yuǎn)程表數(shù)據(jù)量小且MISS KEY(源表數(shù)據(jù)和維表JOIN時(shí),ON條件無法關(guān)聯(lián))特別多的場景。
您需要根據(jù)具體業(yè)務(wù)需求,在平衡實(shí)時(shí)性和性能之間進(jìn)行權(quán)衡。如果對數(shù)據(jù)實(shí)時(shí)性要求非常高,需要實(shí)時(shí)更新,可以不使用Cache,直接從維表讀取。
如果使用Cache策略,可以配合LRU和TTL來實(shí)現(xiàn)較新的緩存數(shù)據(jù)。TTL可以設(shè)置的較短,例如幾秒至幾十秒,定期從源表加載數(shù)據(jù)。
使用ALL緩存策略時(shí),請注意節(jié)點(diǎn)內(nèi)存大小,防止出現(xiàn)OOM。
因?yàn)橄到y(tǒng)會異步加載維表數(shù)據(jù),所以在使用ALL緩存策略時(shí),需要增加維表JOIN節(jié)點(diǎn)的內(nèi)存,增加的內(nèi)存大小為遠(yuǎn)程表數(shù)據(jù)量的兩倍。
使用限制
維表JOIN僅支持對當(dāng)前時(shí)刻維表快照的關(guān)聯(lián)。
維表支持INNER JOIN和LEFT JOIN,不支持RIGHT JOIN或FULL JOIN。
注意事項(xiàng)
如果您有一對一JOIN需求,請確保連接條件中包含了維表中具有唯一性字段的等值連接條件。
對每條流式數(shù)據(jù),只會關(guān)聯(lián)當(dāng)時(shí)維表的最新版本數(shù)據(jù),即JOIN行為只發(fā)生在處理時(shí)間(Processing Time)。如果JOIN行為發(fā)生后,維表中的數(shù)據(jù)發(fā)生了變化(新增、更新或刪除),則已關(guān)聯(lián)的維表數(shù)據(jù)不會被同步變化。具體的維表的行為請參見對應(yīng)連接器行為。
維表JOIN語法
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
必須加上FOR SYSTEM_TIME AS OF PROCTIME(),表示JOIN維表當(dāng)前時(shí)刻所看到的每條數(shù)據(jù)。
ON條件中必須包含維表實(shí)際能支持隨機(jī)查找的字段的等值條件。
ON條件中維表字段不能使用CAST等類型轉(zhuǎn)換函數(shù)。如果您有類型轉(zhuǎn)換需求,請?jiān)谠幢碜侄芜M(jìn)行操作。
維表JOIN Hints
您可以通過使用維表Hints(Hint功能參見Flink SQL Hints)對維表Join的策略進(jìn)行配置。維表Hints包含Lookup Hint與其他Join Hints。
僅VVR 8.0及以上版本支持Lookup Hint。
僅VVR 8.0.8及以上版本支持通過Lookup Hint配置是否開啟shuffle策略。
VVR 8.0以上支持使用別名,如果維表定義了別名,Hint中必須使用別名。
僅VVR 4.0及以上版本支持其他Join Hints。
Lookup Hint
Lookup Hint功能和社區(qū)保持一致,可以用于配置維表的同步、異步、重試查找策略,詳情參見Lookup Hint。VVR 8.0.8及以上版本對Lookup Hint的功能進(jìn)行了擴(kuò)展,支持配置通過'shuffle' = 'true'
選項(xiàng)配置維表聯(lián)接時(shí)的shuffle策略,不同場景的shuffle策略如下表所示。
場景 | 聯(lián)接策略 |
不配置'shuffle' = 'true'選項(xiàng) | 使用引擎默認(rèn)的shuffle策略。 |
不配置'shuffle' = 'true'選項(xiàng),且維表連接器不提供自定義聯(lián)接策略 | |
配置'shuffle' = 'true' 選項(xiàng),且維表連接器不提供自定義聯(lián)接策略 | 默認(rèn)使用SHUFFLE_HASH策略,含義請參見SHUFFLE_HASH。 |
配置'shuffle' = 'true' 選項(xiàng),且維表連接器提供自定義聯(lián)接策略 | 使用表連接器的自定義shuffle策略。 |
目前僅流式數(shù)據(jù)湖倉Paimon會提供自定義shuffle策略,具體會在Join字段包含全部分桶字段的情況下基于bucket進(jìn)行shuffle。
對維表配置聯(lián)接時(shí)的shuffle策略代碼示例如下。
-- 只對維表dim1配置維表聯(lián)接shuffle策略。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- 同時(shí)對維表dim1, dim2配置維表聯(lián)接shuffle策略。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'),LOOKUP('table'='dim2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- 對維表dim1必須使用別名D1配置維表聯(lián)接shuffle策略。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
-- 同時(shí)對維表dim1, dim2通過別名配置維表聯(lián)接shuffle策略。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'),LOOKUP('table'='D2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
其他Join Hints
維表Join Hints僅用于配置維表聯(lián)接策略,包括SHUFFLE_HASH、REPLICATED_SHUFFLE_HASH和SKEW。維表Cache策略和聯(lián)接策略之間的適用場景詳情如下表所示。
Cache策略 | SHUFFLE_HASH | REPLICATED_SHUFFLE_HASH (和SKEW等價(jià)) |
None | 不建議使用該聯(lián)接策略提示,主流會引入額外的網(wǎng)絡(luò)開銷。 | 不建議使用該聯(lián)接策略提示,主流會引入額外的網(wǎng)絡(luò)開銷。 |
LRU | 在維表查找IO成為瓶頸時(shí),建議考慮使用該聯(lián)接策略提示。當(dāng)主流數(shù)據(jù)在Join Key上有時(shí)間局部性時(shí),可以提高Cache命中率,減少IO請求數(shù),從而提升總吞吐。 重要 主流會引入額外的網(wǎng)絡(luò)開銷,當(dāng)主流數(shù)據(jù)在Join Key上有傾斜,遇到性能瓶頸時(shí),建議考慮REPLICATED_SHUFFLE_HASH。 | 在維表查找IO成為瓶頸且主流數(shù)據(jù)在Join Key上有傾斜時(shí),建議考慮該聯(lián)接策略提示。當(dāng)主流數(shù)據(jù)在Join Key上有時(shí)間局部性時(shí),可以提高Cache命中率,減少IO請求數(shù),從而提升總吞吐。 |
ALL | 在維表內(nèi)存使用量成為瓶頸時(shí),建議使用該聯(lián)接策略提示。內(nèi)存使用率可降低為1/并發(fā)度。 重要 主流會引入額外的網(wǎng)絡(luò)開銷,當(dāng)主流數(shù)據(jù)在Join Key上有傾斜,遇到性能瓶頸時(shí),建議考慮REPLICATED_SHUFFLE_HASH。 | 在維表內(nèi)存使用量成為瓶頸且主流數(shù)據(jù)在Join Key上有傾斜時(shí),建議使用該聯(lián)接策略提示。內(nèi)存使用率降低為分桶數(shù)/并發(fā)度。 |
SHUFFLE_HASH
使用效果
在維表Join中使用Shuffle Hash策略,可以將主流數(shù)據(jù)在Join之前根據(jù)Join Key做一次shuffle。在使用LRU Cache策略時(shí)可以提高Cache命中率,減少IO請求數(shù);在使用ALL Cache策略時(shí)可以減少內(nèi)存使用量。每個(gè)SHUFFLE_HASH聯(lián)接提示可指定多張維表。
使用限制
雖然SHUFFLE_HASH可以減少內(nèi)存開銷,但是由于上游數(shù)據(jù)需要按照J(rèn)oin Key做一次shuffle,引入額外的網(wǎng)絡(luò)開銷,因此下面兩種場景不適合使用SHUFFLE_HASH聯(lián)接策略。
主流數(shù)據(jù)在Join Key上存在嚴(yán)重的數(shù)據(jù)傾斜,這種場景下如果使用SHUFFLE_HASH聯(lián)接,會因?yàn)閿?shù)據(jù)傾斜導(dǎo)致Join節(jié)點(diǎn)成為性能瓶頸,從而會導(dǎo)致流作業(yè)出現(xiàn)嚴(yán)重反壓或是批場景出現(xiàn)嚴(yán)重長尾,此時(shí)建議使用REPLICATED_SHUFFLE_HASH聯(lián)接。
維表數(shù)據(jù)較小,ALL Cache策略加載沒有內(nèi)存瓶頸時(shí),如果使用SHUFFLE_HASH聯(lián)接,節(jié)約的內(nèi)存開銷和額外引入的網(wǎng)絡(luò)開銷相比,可能并不劃算。
代碼示例
-- 只對維表dim1開啟SHUFFLE_HASH聯(lián)接。 SELECT /*+ SHUFFLE_HASH(dim1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b -- 同時(shí)對維表dim1, dim2均開啟SHUFFLE_HASH聯(lián)接。 SELECT /*+ SHUFFLE_HASH(dim1, dim2) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b -- 對維表dim1必須使用別名D1開啟SHUFFLE_HASH聯(lián)接。 SELECT /*+ SHUFFLE_HASH(D1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b -- 同時(shí)對維表dim1, dim2通過別名開啟SHUFFLE_HASH聯(lián)接。 SELECT /*+ SHUFFLE_HASH(D1, D2) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
REPLICATED_SHUFFLE_HASH
使用效果
在維表Join中使用Replicated Shuffle Hash策略,其效果基本與SHUFFLE_HASH一致,但不同點(diǎn)是其會將主流具有相同key的數(shù)據(jù)隨機(jī)打散到指定的N個(gè)并發(fā)上,可以解決數(shù)據(jù)傾斜導(dǎo)致的性能瓶頸。每個(gè)REPLICATED_SHUFFLE_HASH聯(lián)接提示中可指定多張維表。
使用限制
需要配置傾斜數(shù)據(jù)分桶數(shù)量參數(shù)
table.exec.skew-join.replicate-num
,其默認(rèn)值為16,取值不能大于維表聯(lián)接節(jié)點(diǎn)的并發(fā)。配置方法請參見如何配置作業(yè)運(yùn)行參數(shù)?。當(dāng)前不支持更新流,當(dāng)主流是更新流時(shí),使用REPLICATED_SHUFFLE_HASH策略會報(bào)錯(cuò)。
代碼示例
-- 對維表dim1開啟REPLICATED_SHUFFLE_HASH聯(lián)接 SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a -- 對維表dim1通過別名開啟REPLICATED_SHUFFLE_HASH聯(lián)接 SELECT /*+ REPLICATED_SHUFFLE_HASH(D1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
SKEW
使用效果
當(dāng)指定表存在數(shù)據(jù)傾斜時(shí),優(yōu)化器會在維表Join中使用Replicated Shuffle Hash策略(Skew只是一個(gè)語法糖,底層的實(shí)現(xiàn)是用的Replicated Shuffle Hash策略)。
使用限制
每個(gè)SKEW提示只能指定1張表。
表名需要為存在數(shù)據(jù)傾斜的主表名稱,而不是維表名稱。
當(dāng)前不支持更新流,當(dāng)主流是更新流時(shí),使用SKEW策略會報(bào)錯(cuò)。
代碼示例
SELECT /*+ SKEW(src) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
當(dāng)前LOOKUP Hint的shuffle選項(xiàng)已能覆蓋 SHUFFLE_HASH hint功能,兩者同時(shí)使用時(shí),會優(yōu)先采納LOOKUP hint的shuffle選項(xiàng)。
當(dāng)前LOOKUP Hint的shuffle選項(xiàng)還未支持解決數(shù)據(jù)傾斜的功能,當(dāng)和REPLICATED_SHUFFLE_HASH、SKEW同時(shí)使用時(shí),會優(yōu)先采納REPLICATED_SHUFFLE_HASH、SKEW對應(yīng)的shuffle策略。
使用示例
測試數(shù)據(jù)
表1 kafka_input
id(bigint)
name(varchar)
age(bigint)
1
lilei
22
2
hanmeimei
20
3
libai
28
表2 phoneNumber
name(varchar)
phoneNumber(bigint)
dufu
1390000111
baijuyi
1390000222
libai
1390000333
lilei
1390000444
測試語句
CREATE TEMPORARY TABLE kafka_input ( id BIGINT, name VARCHAR, age BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); CREATE TEMPORARY TABLE phoneNumber( name VARCHAR, phoneNumber BIGINT, PRIMARY KEY(name) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE result_infor( id BIGINT, phoneNumber BIGINT, name VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO result_infor SELECT t.id, w.phoneNumber, t.name FROM kafka_input as t JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w ON t.name = w.name;
測試結(jié)果
id(bigint)
phoneNumber(bigint)
name(varchar)
1
1390000444
lilei
3
1390000333
libai