本文為您介紹變更SQL除Query、Source和Sink之外的其他限制。
當前不支持檢查Flink版本變化后狀態的兼容性,所以您需要保證更新后的作業Flink版本和產生Checkpoint或Savepoint對應作業的Flink版本一致。
當前作業的依賴必須和產生Checkpoint或Savepoint對應作業的依賴保持兼容。當前Flink系統暫不無法識別自定義連接器、自定義函數依賴的狀態的兼容性變化,因此請您自己保證它們的兼容性。
狀態兼容性檢測不支持檢查同時進行的多個修改。單個修改包括只修改會影響狀態計算的Where條件、只修改統計指標(Aggregate Function)、只修改Sink。
-- 原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ) WITH ( 'connector' = 'datagen' ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable group by a; -- 修改Sink表:MySink -> MySink2,和修改統計函數:max(c) -> min(c),該修改屬于未知兼容修改。 CREATE TABLE MySink2 ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink2 SELECT a, sum(b), min(c) FROM MyTable group by a; -- 添加Where條件:a > 10,設置table.optimizer.state-compatibility.ignore-filter: true;并且同時修改統計函數:max(c) -> min(c)。該修改屬于未知兼容修改。 INSERT INTO MySink SELECT a, sum(b), min(c) FROM ( SELECT * FROM MyTable where a > 10 ) GROUP BY a;
新增全新狀態的Query ,該修改屬于不兼容修改。
--原始SQL。 CREATE TABLE MyTable ( a int, b bigint, c varchar ) WITH ( 'connector' = 'datagen' ); CREATE TABLE MySink ( a int, b bigint, c varchar ) WITH ( 'connector' = 'print' ); INSERT INTO MySink SELECT a, b, c FROM MyTable; --新增group aggregate query,該修改屬于不兼容修改。 INSERT INTO MySink SELECT a, sum(b), min(c) FROM MyTable GROUP BY a;
如果刪除了一路Sink的同時,將Sink或Source的TEMPORARY TABLE DDL也進行了修改或刪除,該修改屬于未知兼容。如果只是刪除了一路Sink,但是TEMPORARY TABLE DDL未更新,該修改屬于完全兼容。
--原始SQL --源表1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); --源表2 CREATE TEMPORARY TABLE MyTable2 ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); --結果表1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); --結果表2 CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print'); --Query BEGIN STATEMENT SET; INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a; INSERT INTO MySink2 SELECT a, b FROM MyTable2 where a > 10; END; --刪除了一路Sink的同時,將Sink或Source的TEMPORARY TABLE DDL也進行了修改或刪除,該修改屬于未知兼容。 --源表1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, d bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); --結果表1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); --Query INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a; --如果只是刪除了一路Sink,但是TEMPORARY TABLE DDL未更新,當前修改屬于完全兼容。 --源表1 CREATE TEMPORARY TABLE MyTable ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); -- 源表2 CREATE TEMPORARY TABLE MyTable2 ( a int, b bigint, c bigint, ts timestamp(3), proctime as proctime(), watermark for ts AS ts - interval '1' second ) WITH ('connector' = 'datagen'); --結果表1 CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector'='print'); --結果表2 CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector'='print'); --Query INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;
文檔內容是否對您有幫助?