日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Processing Time Temporal Join語句

對于每條流式數據,可以關聯一個外部維表數據源,為實時計算Flink版提供數據關聯查詢。

背景信息

Processing Time Temporal Join使用處理時間(Processing Time)屬性,將事實表中的每條數據與維表的的最新數據進行關聯處理。與事件時間(Event Time)不同,處理時間并不關注事件實際發生的時刻,而是依據數據到達處理系統的時間點。

使用限制

  • 僅實時計算引擎VVR 8.0.10及以上版本支持。

  • 僅MySQL維表支持使用Processing Time Temporal Join。

注意事項

  • 不支持全量同步階段下的Checkpoint,需要配置execution.checkpointing.interval-during-backlog: 0參數關閉Checkpoint,而增量同步階段不受影響。

  • 使用Processing Time Temporal Join時,需配置table.optimizer.proctime-temporal-join-strategy: TEMPORAL_JOIN參數。

語法格式

Processing Time Temporal Join語法與維表Join語法一樣:

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;

使用示例

  • 測試數據

    • 表1 kafka_input

      id(bigint)

      name(varchar)

      age(bigint)

      1

      lilei

      22

      2

      hanmeimei

      20

      3

      libai

      28

    • 表2 phoneNumber

      name(varchar)

      phoneNumber(bigint)

      dufu

      1390000111

      baijuyi

      1390000222

      libai

      1390000333

      lilei

      1390000444

  • 測試語句

    SET 'table.optimizer.proctime-temporal-join-strategy' = 'TEMPORAL_JOIN';  -- 使用Processing Time Temporal Join。
    SET 'execution.checkpointing.interval-during-backlog' = '0';              -- 關閉全量階段下的Checkpoint。
    
    CREATE TEMPORARY TABLE kafka_input (
      id   BIGINT,
      name VARCHAR,
      age  BIGINT,
      proc_time AS PROCTIME()
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'format' = 'csv'
    );
    
    -- 維表
    CREATE TEMPORARY TABLE phoneNumber(
      name VARCHAR,
      phoneNumber BIGINT,
      PRIMARY KEY(name) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE result_infor(
      id BIGINT,
      phoneNumber BIGINT,
      name VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO result_infor
    SELECT
      t.id,
      w.phoneNumber,
      t.name
    FROM kafka_input as t
    JOIN phoneNumber FOR SYSTEM_TIME AS OF t.proc_time as w
    ON t.name = w.name;
  • 測試結果

    id(bigint)

    phoneNumber(bigint)

    name(varchar)

    1

    1390000444

    lilei

    3

    1390000333

    libai