日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

變更Sink

本文為您介紹Sink變更的可兼容性和不可兼容性詳情。

可兼容的變更

  • 刪除多路Sink的某路Sink,該變更屬于完全兼容變更。

    -- 原始SQL。
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    CREATE TABLE MySink1 (
      a int,
      b bigint,
      c varchar
    );
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
    
    -- 刪除MySink1對應(yīng)的Query,該變更屬于完全兼容變更。
    -- 該Query中的group aggregate對應(yīng)的狀態(tài)會被丟棄。
    INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
    
    -- 刪除MySink2對應(yīng)的Query,該變更屬于完全兼容變更。
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
  • 新增Sink且不帶有狀態(tài)的Query,該變更屬于完全兼容變更。

    -- 原始SQL。
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink1 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- 新增無狀態(tài)的Query,該變更屬于完全兼容變更。
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;
  • 默認(rèn)情況下,Sink被認(rèn)為是無狀態(tài)算子(大部分的Sink連接器沒有狀態(tài))。因此變更Sink表名、連接器類型、WITH屬性都是兼容變更。

    -- 原始SQL。
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- 修改表名、連接器類型等,該變更屬于完全兼容變更。
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'kafka',
      ...
    );
    INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;

不兼容的變更

  • 新增Sink且?guī)в袪顟B(tài)的Query,該變更屬于不兼容變更。

    -- 原始SQL。
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink1 (
      a int,
      b bigint,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- 新增有狀態(tài)的Query,該變更屬于不兼容變更。
    CREATE TABLE MySink2 (
      b bigint,
      a int,
      c varchar
    );
    INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    INSERT INTO MySink2 SELECT b, sum(a), max(c) FROM MyTable GROUP BY b;
  • 設(shè)置table.optimizer.state-compatibility.ignore-sink=false,將Sink視為有狀態(tài)算子,修改表名、連接器類型,此變更屬于不兼容變更。

    -- 原始SQL。
    CREATE TABLE MyTable (
      a int,
      b bigint,
      c varchar
    );
    
    CREATE TABLE MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- 設(shè)置table.optimizer.state-compatibility.ignore-sink=false,
    -- 將Sink視為有狀態(tài)算子,修改表名,該變更屬于不兼容變更
    CREATE TABLE MySink2 (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'print'
    );
    INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
    
    -- 設(shè)置table.optimizer.state-compatibility.ignore-sink=false,
    -- 將Sink視為有狀態(tài)算子,修改連接器類型(例如將print改為blackhole),該變更屬于不兼容變更
    create table MySink (
      a int,
      b bigint,
      c varchar
    ) WITH (
      'connector' = 'blackhole',
      ...
    );
    INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
  • 刪除了一路sink的同時,將Sink或Source的TEMPORARY TABLE DDL語句也進行了修改或刪除,屬于不兼容變更。詳情請參見其他限制

  • 一般情況下Sink是無狀態(tài)的算子(默認(rèn)table.optimizer.state-compatibility.ignore-sink為true來忽略Sink)。但在有數(shù)據(jù)亂序風(fēng)險時,框架將基于Sink生成有狀態(tài)的SinkMaterializer節(jié)點來消除亂序、保障數(shù)據(jù)正確性,該節(jié)點為有狀態(tài)節(jié)點,詳情請參見Flink SQL中Changelog事件亂序處理原理。因此變更后可能出現(xiàn)作業(yè)啟動時兼容性檢測顯示完全兼容,但實際可能不兼容的情況。例如更換Source的Primary Key導(dǎo)致Sink上游Upsert Key發(fā)生變化。

    --原始query
    CREATE TEMPORARY TABLE MyTable (
      a int primary key not enforced,
      b bigint,
      c bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    CREATE TEMPORARY TABLE MySink (a int, b bigint, c bigint  primary key not enforced) with ('connector'='print');
    
    INSERT INTO MySink SELECT a, b, c FROM MyTable;
    
    --修改主鍵所屬的變量
    CREATE TEMPORARY TABLE MyTable (
      a int,
      b bigint,
      c bigint primary key not enforced,
      d bigint,
      ts timestamp(3),
      proctime as proctime(),
      watermark for ts AS ts - interval '1' second
    ) WITH ('connector' = 'datagen');
    
    CREATE TEMPORARY TABLE MySink (a int, b bigint primary key not enforced, c bigint) with ('connector'='print');
    
    INSERT INTO MySink SELECT a, b, c FROM MyTable;
  • 參數(shù)table.optimizer.state-compatibility.ignore-sink由true變更為false(將Sink視為有狀態(tài)算子,納入狀態(tài)兼容檢測范圍),屬于不兼容變更。