日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過觸發器和函數實現PostgreSQL的DDL增量遷移

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。

在使用DTS執行PostgreSQL數據庫間的數據遷移前,可通過本文介紹的方法在源庫創建觸發器和函數獲取源庫的DDL信息,然后再由DTS執行數據遷移,在增量數據遷移階段即可實現DDL操作的增量遷移。

前提條件

  • 源庫需滿足以下要求:

    • 如果源庫為自建PostgreSQL,則數據庫版本需大于等于9.4。

    • 如果源庫為RDS PostgreSQL,則RDS PostgreSQL實例版本需大于等于10。

      • RDS PostgreSQL 9.4暫不支持創建事件觸發器,因此無法實現此功能。

      • RDS PostgreSQL 10、11和12內核版本需大于等于20201130。

      • RDS PostgreSQL 13內核版本需大于等于20210228。

      說明

      升級RDS PostgreSQL內核版本,請參見升級內核小版本

  • 數據遷移任務需在2020年10月1日之后創建。

背景信息

通過DTS執行PostgreSQL數據庫間的數據遷移時,在增量數據遷移階段,DTS僅支持DML操作(INSERT、DELETE、UPDATE)的同步,不支持DDL操作的同步。

通過本文的方法先在源庫中創建觸發器和函數來捕獲DDL信息,再由DTS執行數據遷移,即可實現DDL操作的同步。

說明

僅支持表級別DDL操作的同步:CREATE TABLE、DROP TABLE、ALTER TABLE(包含RENAME TABLE、ADD COLUMN、DROP COLUMN)。

操作步驟

警告

如果源庫中有多個數據庫需要執行增量數據遷移,您需要重復執行步驟2到步驟5。

  1. 登錄源PostgreSQL數據庫,相關方法請參見連接PostgreSQL實例psql工具介紹

  2. 切換至待遷移的數據庫。

    說明

    本案例以psql工具為例介紹,您可以使用\c <數據庫名>命令來切換數據庫,例如\c dtststdata

  3. 執行下述命令創建存放DDL信息的表。

    CREATE TABLE public.dts_ddl_command
    (
        ddl_text text COLLATE pg_catalog."default",
       id bigserial primary key,
       event text COLLATE pg_catalog."default",
       tag text COLLATE pg_catalog."default",
       username character varying COLLATE pg_catalog."default",
       database character varying COLLATE pg_catalog."default",
       schema character varying COLLATE pg_catalog."default",
       object_type character varying COLLATE pg_catalog."default",
       object_name character varying COLLATE pg_catalog."default",
       client_address character varying COLLATE pg_catalog."default",
       client_port integer,
       event_time timestamp with time zone,
       txid_current character varying(128) COLLATE pg_catalog."default",
       message text COLLATE pg_catalog."default"
    );
  4. 執行下述命令創建捕獲DDL信息的函數。

    CREATE FUNCTION public.dts_capture_ddl()
        RETURNS event_trigger
        LANGUAGE 'plpgsql'
        COST 100
        VOLATILE NOT LEAKPROOF SECURITY DEFINER
    AS $BODY$
      declare ddl_text text;
      declare max_rows int := 10000;
      declare current_rows int;
      declare pg_version_95 int := 90500;
      declare pg_version_10 int := 100000;
      declare current_version int;
      declare object_id varchar;
      declare alter_table varchar;
      declare record_object record;
      declare message text;
      declare pub RECORD;
    begin
    
      select current_query() into ddl_text;
    
      if TG_TAG = 'CREATE TABLE' then -- ALTER TABLE schema.TABLE REPLICA IDENTITY FULL;
        show server_version_num into current_version;
        if current_version >= pg_version_95 then
          for record_object in (select * from pg_event_trigger_ddl_commands()) loop
            if record_object.command_tag = 'CREATE TABLE' then
              object_id := record_object.object_identity;
            end if;
          end loop;
        else
          select btrim(substring(ddl_text from '[ \t\r\n\v\f]*[c|C][r|R][e|E][a|A][t|T][e|E][ \t\r\n\v\f]*.*[ \t\r\n\v\f]*[t|T][a|A][b|B][l|L][e|E][ \t\r\n\v\f]+(.*)\(.*'),' \t\r\n\v\f') into object_id;
        end if;
        if object_id = '' or object_id is null then
          message := 'CREATE TABLE, but ddl_text=' || ddl_text || ', current_query=' || current_query();
        else
          alter_table := 'ALTER TABLE ' || object_id || ' REPLICA IDENTITY FULL';
          message := 'alter_sql=' || alter_table;
          execute alter_table;
        end if;
        if current_version >= pg_version_10 then
          for pub in (select * from pg_publication where pubname like 'dts_sync_%') loop
            raise notice 'pubname=%',pub.pubname;
            BEGIN
              execute 'alter publication ' || pub.pubname || ' add table ' || object_id;
            EXCEPTION WHEN OTHERS THEN
            END;
          end loop;
        end if;
      end if;
    
      insert into public.dts_ddl_command(id,event,tag,username,database,schema,object_type,object_name,client_address,client_port,event_time,ddl_text,txid_current,message)
      values (default,TG_EVENT,TG_TAG,current_user,current_database(),current_schema,'','',inet_client_addr(),inet_client_port(),current_timestamp,ddl_text,cast(TXID_CURRENT() as varchar(16)),message);
    
      select count(id) into current_rows from public.dts_ddl_command;
      if current_rows > max_rows then
        delete from public.dts_ddl_command where id in (select min(id) from public.dts_ddl_command);
      end if;
    end
    $BODY$;
  5. 將剛創建的函數的所有者修改為DTS連接源庫的賬號,以postgresql為例。

    ALTER FUNCTION public.dts_capture_ddl()
        OWNER TO postgres;
  6. 執行下述命令創建全局事件觸發器。

    CREATE EVENT TRIGGER dts_intercept_ddl ON ddl_command_end
    EXECUTE PROCEDURE public.dts_capture_ddl();

后續步驟

根據源庫的版本,配置增量數據遷移任務。更多信息,請參見自建PostgreSQL遷移至RDS PostgreSQL

說明
  • 遷移類型僅需勾選增量遷移

  • 數據遷移任務釋放后,您需要登錄源PostgreSQL數據庫,執行下述命令刪除觸發器和函數。

    drop EVENT trigger dts_intercept_ddl;
    drop function public.dts_capture_ddl();
    drop table public.dts_ddl_command;