對于每條流式數據,可以關聯一個外部維表數據源,為實時計算Flink版提供數據關聯查詢。
背景信息
Processing Time Temporal Join使用處理時間(Processing Time)屬性,將事實表中的每條數據與維表的的最新數據進行關聯處理。與事件時間(Event Time)不同,處理時間并不關注事件實際發生的時刻,而是依據數據到達處理系統的時間點。
使用限制
僅實時計算引擎VVR 8.0.10及以上版本支持。
僅MySQL維表支持使用Processing Time Temporal Join。
注意事項
不支持全量同步階段下的Checkpoint,需要配置
execution.checkpointing.interval-during-backlog: 0
參數關閉Checkpoint,而增量同步階段不受影響。使用Processing Time Temporal Join時,需配置
table.optimizer.proctime-temporal-join-strategy: TEMPORAL_JOIN
參數。
語法格式
Processing Time Temporal Join語法與維表Join語法一樣:
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
使用示例
測試數據
表1 kafka_input
id(bigint)
name(varchar)
age(bigint)
1
lilei
22
2
hanmeimei
20
3
libai
28
表2 phoneNumber
name(varchar)
phoneNumber(bigint)
dufu
1390000111
baijuyi
1390000222
libai
1390000333
lilei
1390000444
測試語句
SET 'table.optimizer.proctime-temporal-join-strategy' = 'TEMPORAL_JOIN'; -- 使用Processing Time Temporal Join。 SET 'execution.checkpointing.interval-during-backlog' = '0'; -- 關閉全量階段下的Checkpoint。 CREATE TEMPORARY TABLE kafka_input ( id BIGINT, name VARCHAR, age BIGINT, proc_time AS PROCTIME() ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'properties.group.id' = '<yourKafkaConsumerGroupId>', 'format' = 'csv' ); -- 維表 CREATE TEMPORARY TABLE phoneNumber( name VARCHAR, phoneNumber BIGINT, PRIMARY KEY(name) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); CREATE TEMPORARY TABLE result_infor( id BIGINT, phoneNumber BIGINT, name VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO result_infor SELECT t.id, w.phoneNumber, t.name FROM kafka_input as t JOIN phoneNumber FOR SYSTEM_TIME AS OF t.proc_time as w ON t.name = w.name;
測試結果
id(bigint)
phoneNumber(bigint)
name(varchar)
1
1390000444
lilei
3
1390000333
libai