本文介紹如果數據傳輸源端發生了 DDL 變更(例如新增表字段),您如何通過觸發器來捕獲 DDL 相關事件信息,以確保 DML 變更操作正常同步。
背景信息
使用數據傳輸執行源端 PostgreSQL 數據庫增量同步時,默認僅支持 DML 語句(包括 INSERT
、DELETE
和 UPDATE
)的同步。但如果源端發生了 DDL 變更(例如新增表字段),而數據傳輸無法獲取新的結構信息,將導致后續和該對象相關的 DML 同步失敗,造成任務中斷的問題。所以,您需要先在源端 PostgreSQL 數據庫創建 DDL 記錄表和觸發器來捕獲 DDL 信息,以確保 DML 的正常同步。
使用限制
源端 PostgreSQL 數據庫必須為 V10.x 及以上版本。
操作步驟
登錄源端數據庫后,切換至待遷移數據庫。
以 PSQL 工具為例,通過執行命令
\c <數據庫名稱>
,切換至待遷移數據庫。創建下述兩張表,用于記錄 DDL 事件變更。
非 DROP DDL 事件信息表
----postgreSQL DDL EVENT TABLE---- CREATE TABLE public.oms_non_dropped_ddl_command ( id bigserial primary key, ddl_text text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", object_type character varying COLLATE pg_catalog."default", objid integer, top_objid integer, object_identity text COLLATE pg_catalog."default", top_object_identity text COLLATE pg_catalog."default", top_schema text COLLATE pg_catalog."default", all_children_table text COLLATE pg_catalog."default" ); ALTER TABLE public.oms_non_dropped_ddl_command REPLICA IDENTITY FULL;
DROP DDL 事件信息表
----postgreSQL DROP DDL EVENT TABLE---- CREATE TABLE public.oms_dropped_ddl_command ( id bigserial primary key, ddl_text text COLLATE pg_catalog."default", tag text COLLATE pg_catalog."default", database character varying COLLATE pg_catalog."default", schema character varying COLLATE pg_catalog."default", objid integer, object_type text COLLATE pg_catalog."default", object_name text COLLATE pg_catalog."default" ); ALTER TABLE public.oms_dropped_ddl_command REPLICA IDENTITY FULL;
創建下述兩種事件觸發器函數。
非 DROP DDL 事件觸發器函數
CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_non_dropped() RETURNS event_trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF SECURITY DEFINER AS $BODY$ DECLARE ddl_text text; DECLARE record_object record; DECLARE obj record; DECLARE top_objid integer; DECLARE parent_oid integer; DECLARE top_object_identity text; DECLARE top_schema text; DECLARE all_children_table text; BEGIN SELECT current_query() INTO ddl_text; FOR obj IN (SELECT * FROM pg_event_trigger_ddl_commands() WHERE TG_TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA')) LOOP record_object:=obj; SELECT inhparent FROM pg_inherits WHERE inhrelid = obj.objid AND inhparent::regclass::text NOT LIKE 'pg_%' ORDER BY inhseqno DESC LIMIT 1 INTO parent_oid; WHILE parent_oid IS NOT NULL LOOP SELECT inhparent FROM pg_inherits WHERE inhrelid = parent_oid AND inhparent::regclass::text NOT LIKE 'pg_%' ORDER BY inhseqno DESC LIMIT 1 INTO top_objid; IF top_objid IS NULL THEN top_objid := parent_oid; exit; END IF; parent_oid := top_objid; END LOOP; IF top_objid IS NULL THEN top_objid :=record_object.objid; ELSE END IF; IF regexp_matches(ddl_text, 'ALTER\s+TABLE\s+.+\s+RENAME\s+TO\s+.+', 'i') IS NOT NULL THEN WITH RECURSIVE child_tables AS ( SELECT oid AS table_objid FROM pg_class WHERE oid = top_objid UNION SELECT c.oid AS table_objid FROM pg_class c JOIN pg_inherits i ON c.oid = i.inhrelid JOIN child_tables ct ON i.inhparent = ct.table_objid ) SELECT string_agg(table_objid::text, ', ') INTO all_children_table FROM child_tables WHERE table_objid <> top_objid; ELSE END IF; SELECT pn.nspname, ss.relname INTO obj from pg_catalog.pg_class ss JOIN pg_catalog.pg_namespace pn ON ss.relnamespace = pn.oid WHERE ss.oid = top_objid; top_object_identity:=obj.relname; top_schema:=obj.nspname; IF TG_TAG ='CREATE TABLE' AND record_object.object_type='table' THEN EXECUTE 'ALTER TABLE ' || record_object.object_identity || ' REPLICA IDENTITY FULL'; ELSE END IF; INSERT INTO public.oms_non_dropped_ddl_command(id,ddl_text,tag,database,schema,object_type, objid, top_objid, object_identity,top_object_identity,top_schema,all_children_table) VALUES (default,ddl_text, TG_TAG,current_database(),current_schema,record_object.object_type, record_object.objid, top_objid ,record_object.object_identity,top_object_identity,top_schema,all_children_table); END LOOP; EXCEPTION WHEN OTHERS THEN RAISE LOG 'OMS ddl trriger error during command process: %', SQLERRM; END $BODY$;
DROP DDL 事件觸發器函數
CREATE OR REPLACE FUNCTION public.oms_capture_ddl_for_dropped() RETURNS event_trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF SECURITY DEFINER AS $BODY$ DECLARE ddl_text text; DECLARE record_object record; BEGIN SELECT current_query() INTO ddl_text; FOR record_object in (select * from pg_event_trigger_dropped_objects() WHERE TG_TAG NOT IN ('ALTER TABLE','DROP PUBLICATION','ALTER PUBLICATION')) LOOP IF record_object.object_type = 'type' THEN ELSE INSERT INTO public.oms_dropped_ddl_command(id,ddl_text,tag,database,schema, objid, object_type, object_name) VALUES (default,ddl_text, TG_TAG,current_database(),current_schema, record_object.objid, record_object.object_type, record_object.object_name); END IF; END LOOP; EXCEPTION WHEN OTHERS THEN RAISE LOG 'OMS drop ddl trriger error during command process: %', SQLERRM; END $BODY$;
將創建函數的所有者修改為數據傳輸連接源端數據庫的賬號。此處以
postgres
為例:ALTER FUNCTION public.oms_capture_ddl_for_non_dropped() OWNER TO postgres; ALTER FUNCTION public.oms_capture_ddl_for_dropped() OWNER TO postgres;
執行下述命令,創建全局事件觸發器。
CREATE EVENT TRIGGER oms_capture_ddl_for_non_dropped ON ddl_command_end WHEN TAG IN ('CREATE TABLE', 'ALTER TABLE','CREATE INDEX','CREATE SCHEMA') EXECUTE PROCEDURE public.oms_capture_ddl_for_non_dropped(); CREATE EVENT TRIGGER oms_capture_ddl_for_dropped ON sql_drop EXECUTE PROCEDURE public.oms_capture_ddl_for_dropped();
后續操作
釋放數據遷移任務后,您需要登錄源端數據庫,執行下述命令刪除觸發器函數和對應表。
drop EVENT trigger oms_capture_ddl_for_dropped;
drop EVENT trigger oms_capture_ddl_for_non_dropped;
drop function public.oms_capture_ddl_for_non_dropped();
drop function public.oms_capture_ddl_for_dropped();
drop table public.oms_non_dropped_ddl_command;
drop table public.oms_dropped_ddl_command;
文檔內容是否對您有幫助?