日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

維表JOIN語句

更新時(shí)間:

對于每條流式數(shù)據(jù),可以關(guān)聯(lián)一個(gè)外部維表數(shù)據(jù)源,為實(shí)時(shí)計(jì)算Flink版提供數(shù)據(jù)關(guān)聯(lián)查詢。

背景信息

大部分連接器的維表Join都可以使用Cache策略,不同連接器對Cache策略的支持情況稍有不同,請查看對應(yīng)的連接器文檔確定具體的支持情況。通用的Cache策略詳情如下:

  • None(默認(rèn)值):無緩存。

  • LRU:緩存維表里的部分?jǐn)?shù)據(jù)。源表的每條數(shù)據(jù)都會觸發(fā)系統(tǒng)先在Cache中查找數(shù)據(jù),如果沒有找到,則去物理維表中查找。

  • ALL:緩存維表里的所有數(shù)據(jù)。在Job運(yùn)行前,系統(tǒng)會將維表中所有數(shù)據(jù)加載到Cache中,之后所有的維表查找數(shù)據(jù)都會通過Cache進(jìn)行。如果在Cache中無法找到數(shù)據(jù),則KEY不存在。全量的Cache有一個(gè)過期時(shí)間,過期后會重新加載一遍全量Cache。適用于遠(yuǎn)程表數(shù)據(jù)量小且MISS KEY(源表數(shù)據(jù)和維表JOIN時(shí),ON條件無法關(guān)聯(lián))特別多的場景。

說明
  • 您需要根據(jù)具體業(yè)務(wù)需求,在平衡實(shí)時(shí)性和性能之間進(jìn)行權(quán)衡。如果對數(shù)據(jù)實(shí)時(shí)性要求非常高,需要實(shí)時(shí)更新,可以不使用Cache,直接從維表讀取。

  • 如果使用Cache策略,可以配合LRU和TTL來實(shí)現(xiàn)較新的緩存數(shù)據(jù)。TTL可以設(shè)置的較短,例如幾秒至幾十秒,定期從源表加載數(shù)據(jù)。

  • 使用ALL緩存策略時(shí),請注意節(jié)點(diǎn)內(nèi)存大小,防止出現(xiàn)OOM。

  • 因?yàn)橄到y(tǒng)會異步加載維表數(shù)據(jù),所以在使用ALL緩存策略時(shí),需要增加維表JOIN節(jié)點(diǎn)的內(nèi)存,增加的內(nèi)存大小為遠(yuǎn)程表數(shù)據(jù)量的兩倍。

使用限制

  • 維表JOIN僅支持對當(dāng)前時(shí)刻維表快照的關(guān)聯(lián)。

  • 維表支持INNER JOIN和LEFT JOIN,不支持RIGHT JOIN或FULL JOIN。

注意事項(xiàng)

  • 如果您有一對一JOIN需求,請確保連接條件中包含了維表中具有唯一性字段的等值連接條件。

  • 對每條流式數(shù)據(jù),只會關(guān)聯(lián)當(dāng)時(shí)維表的最新版本數(shù)據(jù),即JOIN行為只發(fā)生在處理時(shí)間(Processing Time)。如果JOIN行為發(fā)生后,維表中的數(shù)據(jù)發(fā)生了變化(新增、更新或刪除),則已關(guān)聯(lián)的維表數(shù)據(jù)不會被同步變化。具體的維表的行為請參見對應(yīng)連接器行為。

維表JOIN語法

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
說明
  • 必須加上FOR SYSTEM_TIME AS OF PROCTIME(),表示JOIN維表當(dāng)前時(shí)刻所看到的每條數(shù)據(jù)。

  • ON條件中必須包含維表實(shí)際能支持隨機(jī)查找的字段的等值條件。

  • ON條件中維表字段不能使用CAST等類型轉(zhuǎn)換函數(shù)。如果您有類型轉(zhuǎn)換需求,請?jiān)谠幢碜侄芜M(jìn)行操作。

維表JOIN Hints

您可以通過使用維表Hints(Hint功能參見Flink SQL Hints)對維表Join的策略進(jìn)行配置。維表Hints包含Lookup Hint與其他Join Hints。

說明
  • 僅VVR 8.0及以上版本支持Lookup Hint。

  • 僅VVR 8.0.8及以上版本支持通過Lookup Hint配置是否開啟shuffle策略。

  • VVR 8.0以上支持使用別名,如果維表定義了別名,Hint中必須使用別名。

  • 僅VVR 4.0及以上版本支持其他Join Hints。

Lookup Hint

Lookup Hint功能和社區(qū)保持一致,可以用于配置維表的同步、異步、重試查找策略,詳情參見Lookup Hint。VVR 8.0.8及以上版本對Lookup Hint的功能進(jìn)行了擴(kuò)展,支持配置通過'shuffle' = 'true'選項(xiàng)配置維表聯(lián)接時(shí)的shuffle策略,不同場景的shuffle策略如下表所示。

場景

聯(lián)接策略

不配置'shuffle' = 'true'選項(xiàng)

使用引擎默認(rèn)的shuffle策略。

不配置'shuffle' = 'true'選項(xiàng),且維表連接器不提供自定義聯(lián)接策略

配置'shuffle' = 'true' 選項(xiàng),且維表連接器不提供自定義聯(lián)接策略

默認(rèn)使用SHUFFLE_HASH策略,含義請參見SHUFFLE_HASH

配置'shuffle' = 'true' 選項(xiàng),且維表連接器提供自定義聯(lián)接策略

使用表連接器的自定義shuffle策略。

說明

目前僅流式數(shù)據(jù)湖倉Paimon會提供自定義shuffle策略,具體會在Join字段包含全部分桶字段的情況下基于bucket進(jìn)行shuffle。

對維表配置聯(lián)接時(shí)的shuffle策略代碼示例如下。

-- 只對維表dim1配置維表聯(lián)接shuffle策略。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */
FROM src AS T 
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b

-- 同時(shí)對維表dim1, dim2配置維表聯(lián)接shuffle策略。
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'),LOOKUP('table'='dim2', 'shuffle' = 'true') */
FROM src AS T 
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b

-- 對維表dim1必須使用別名D1配置維表聯(lián)接shuffle策略。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */
FROM src AS T 
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b

-- 同時(shí)對維表dim1, dim2通過別名配置維表聯(lián)接shuffle策略。
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'),LOOKUP('table'='D2', 'shuffle' = 'true') */
FROM src AS T 
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b

其他Join Hints

維表Join Hints僅用于配置維表聯(lián)接策略,包括SHUFFLE_HASH、REPLICATED_SHUFFLE_HASH和SKEW。維表Cache策略和聯(lián)接策略之間的適用場景詳情如下表所示。

Cache策略

SHUFFLE_HASH

REPLICATED_SHUFFLE_HASH

(和SKEW等價(jià))

None

不建議使用該聯(lián)接策略提示,主流會引入額外的網(wǎng)絡(luò)開銷。

不建議使用該聯(lián)接策略提示,主流會引入額外的網(wǎng)絡(luò)開銷。

LRU

在維表查找IO成為瓶頸時(shí),建議考慮使用該聯(lián)接策略提示。當(dāng)主流數(shù)據(jù)在Join Key上有時(shí)間局部性時(shí),可以提高Cache命中率,減少IO請求數(shù),從而提升總吞吐。

重要

主流會引入額外的網(wǎng)絡(luò)開銷,當(dāng)主流數(shù)據(jù)在Join Key上有傾斜,遇到性能瓶頸時(shí),建議考慮REPLICATED_SHUFFLE_HASH

在維表查找IO成為瓶頸且主流數(shù)據(jù)在Join Key上有傾斜時(shí),建議考慮該聯(lián)接策略提示。當(dāng)主流數(shù)據(jù)在Join Key上有時(shí)間局部性時(shí),可以提高Cache命中率,減少IO請求數(shù),從而提升總吞吐。

ALL

在維表內(nèi)存使用量成為瓶頸時(shí),建議使用該聯(lián)接策略提示。內(nèi)存使用率可降低為1/并發(fā)度

重要

主流會引入額外的網(wǎng)絡(luò)開銷,當(dāng)主流數(shù)據(jù)在Join Key上有傾斜,遇到性能瓶頸時(shí),建議考慮REPLICATED_SHUFFLE_HASH

在維表內(nèi)存使用量成為瓶頸且主流數(shù)據(jù)在Join Key上有傾斜時(shí),建議使用該聯(lián)接策略提示。內(nèi)存使用率降低為分桶數(shù)/并發(fā)度

SHUFFLE_HASH

  • 使用效果

    在維表Join中使用Shuffle Hash策略,可以將主流數(shù)據(jù)在Join之前根據(jù)Join Key做一次shuffle。在使用LRU Cache策略時(shí)可以提高Cache命中率,減少IO請求數(shù);在使用ALL Cache策略時(shí)可以減少內(nèi)存使用量。每個(gè)SHUFFLE_HASH聯(lián)接提示可指定多張維表。

  • 使用限制

    雖然SHUFFLE_HASH可以減少內(nèi)存開銷,但是由于上游數(shù)據(jù)需要按照J(rèn)oin Key做一次shuffle,引入額外的網(wǎng)絡(luò)開銷,因此下面兩種場景不適合使用SHUFFLE_HASH聯(lián)接策略。

    • 主流數(shù)據(jù)在Join Key上存在嚴(yán)重的數(shù)據(jù)傾斜,這種場景下如果使用SHUFFLE_HASH聯(lián)接,會因?yàn)閿?shù)據(jù)傾斜導(dǎo)致Join節(jié)點(diǎn)成為性能瓶頸,從而會導(dǎo)致流作業(yè)出現(xiàn)嚴(yán)重反壓或是批場景出現(xiàn)嚴(yán)重長尾,此時(shí)建議使用REPLICATED_SHUFFLE_HASH聯(lián)接。

    • 維表數(shù)據(jù)較小,ALL Cache策略加載沒有內(nèi)存瓶頸時(shí),如果使用SHUFFLE_HASH聯(lián)接,節(jié)約的內(nèi)存開銷和額外引入的網(wǎng)絡(luò)開銷相比,可能并不劃算。

  • 代碼示例

    -- 只對維表dim1開啟SHUFFLE_HASH聯(lián)接。
    SELECT /*+ SHUFFLE_HASH(dim1) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
    
    -- 同時(shí)對維表dim1, dim2均開啟SHUFFLE_HASH聯(lián)接。
    SELECT /*+ SHUFFLE_HASH(dim1, dim2) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
    
    -- 對維表dim1必須使用別名D1開啟SHUFFLE_HASH聯(lián)接。
    SELECT /*+ SHUFFLE_HASH(D1) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
    
    -- 同時(shí)對維表dim1, dim2通過別名開啟SHUFFLE_HASH聯(lián)接。
    SELECT /*+ SHUFFLE_HASH(D1, D2) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b

REPLICATED_SHUFFLE_HASH

  • 使用效果

    在維表Join中使用Replicated Shuffle Hash策略,其效果基本與SHUFFLE_HASH一致,但不同點(diǎn)是其會將主流具有相同key的數(shù)據(jù)隨機(jī)打散到指定的N個(gè)并發(fā)上,可以解決數(shù)據(jù)傾斜導(dǎo)致的性能瓶頸。每個(gè)REPLICATED_SHUFFLE_HASH聯(lián)接提示中可指定多張維表。

  • 使用限制

    • 需要配置傾斜數(shù)據(jù)分桶數(shù)量參數(shù)table.exec.skew-join.replicate-num,其默認(rèn)值為16,取值不能大于維表聯(lián)接節(jié)點(diǎn)的并發(fā)。配置方法請參見如何配置作業(yè)運(yùn)行參數(shù)?

    • 當(dāng)前不支持更新流,當(dāng)主流是更新流時(shí),使用REPLICATED_SHUFFLE_HASH策略會報(bào)錯(cuò)。

  • 代碼示例

    -- 對維表dim1開啟REPLICATED_SHUFFLE_HASH聯(lián)接
    SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */ 
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
    
    -- 對維表dim1通過別名開啟REPLICATED_SHUFFLE_HASH聯(lián)接
    SELECT /*+ REPLICATED_SHUFFLE_HASH(D1) */ 
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a

SKEW

  • 使用效果

    當(dāng)指定表存在數(shù)據(jù)傾斜時(shí),優(yōu)化器會在維表Join中使用Replicated Shuffle Hash策略(Skew只是一個(gè)語法糖,底層的實(shí)現(xiàn)是用的Replicated Shuffle Hash策略)。

  • 使用限制

    • 每個(gè)SKEW提示只能指定1張表。

    • 表名需要為存在數(shù)據(jù)傾斜的主表名稱,而不是維表名稱。

    • 當(dāng)前不支持更新流,當(dāng)主流是更新流時(shí),使用SKEW策略會報(bào)錯(cuò)。

  • 代碼示例

    SELECT /*+ SKEW(src) */  
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
重要
  • 當(dāng)前LOOKUP Hint的shuffle選項(xiàng)已能覆蓋 SHUFFLE_HASH hint功能,兩者同時(shí)使用時(shí),會優(yōu)先采納LOOKUP hint的shuffle選項(xiàng)。

  • 當(dāng)前LOOKUP Hint的shuffle選項(xiàng)還未支持解決數(shù)據(jù)傾斜的功能,當(dāng)和REPLICATED_SHUFFLE_HASH、SKEW同時(shí)使用時(shí),會優(yōu)先采納REPLICATED_SHUFFLE_HASH、SKEW對應(yīng)的shuffle策略。

使用示例

  • 測試數(shù)據(jù)

    • 表1 kafka_input

      id(bigint)

      name(varchar)

      age(bigint)

      1

      lilei

      22

      2

      hanmeimei

      20

      3

      libai

      28

    • 表2 phoneNumber

      name(varchar)

      phoneNumber(bigint)

      dufu

      1390000111

      baijuyi

      1390000222

      libai

      1390000333

      lilei

      1390000444

  • 測試語句

    CREATE TEMPORARY TABLE kafka_input (
      id   BIGINT,
      name VARCHAR,
      age  BIGINT
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE phoneNumber(
      name VARCHAR,
      phoneNumber BIGINT,
      PRIMARY KEY(name) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE result_infor(
      id BIGINT,
      phoneNumber BIGINT,
      name VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO result_infor
    SELECT
      t.id,
      w.phoneNumber,
      t.name
    FROM kafka_input as t
    JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w
    ON t.name = w.name;
  • 測試結(jié)果

    id(bigint)

    phoneNumber(bigint)

    name(varchar)

    1

    1390000444

    lilei

    3

    1390000333

    libai