Auditing & Change Data Capture in PostgreSQL
I needed a quick way to capture DML changes across all tables in a specific PostgreSQL schema. I found a solution online that had trigger function that captures the OLD
row and stores it as JSON in a central table called data_changes
.
I then wrote a stored procedure that accepts a schema name as a parameter and loops through each table in that schema, attaching the trigger to track changes automatically.
To make querying easier, I added another procedure that generates a set of “audit” views. These views reshape the JSON in data_changes
to match the structure of the original tables.
With this setup, if you need to stream changes from PostgreSQL into another system, you can simply read from the data_changes
table.
CREATE SCHEMA auditchanges AUTHORIZATION postgres; -- Table: auditchanges.data_changes -- DROP TABLE auditchanges.data_changes; CREATE TABLE auditchanges.data_changes ( id bigserial PRIMARY KEY, schemaname text COLLATE pg_catalog."default" NOT NULL, tablename text COLLATE pg_catalog."default" NOT NULL, username text COLLATE pg_catalog."default", createdon timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP, dmlaction text COLLATE pg_catalog."default" NOT NULL, original_data json, CONSTRAINT data_changes_dmlaction_check CHECK (dmlaction = ANY (ARRAY['I'::text, 'D'::text, 'U'::text])) ) TABLESPACE pg_default; ALTER TABLE auditchanges.data_changes OWNER to postgres; -- DROP INDEX auditchanges.data_changes_001_idx; CREATE INDEX data_changes_001_idx ON auditchanges.data_changes USING btree (schemaname, tablename, createdon, dmlaction) TABLESPACE pg_default; -- FUNCTION: auditchanges.capture_changes() -- DROP FUNCTION auditchanges.capture_changes(); CREATE OR REPLACE FUNCTION auditchanges.capture_changes() RETURNS trigger LANGUAGE 'plpgsql' COST 100 VOLATILE NOT LEAKPROOF AS $BODY$ DECLARE v_old_data JSON; BEGIN if (TG_OP = 'INSERT') then v_old_data := row_to_json(NEW.*); insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data) values (TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,session_user::TEXT,substring(TG_OP,1,1),v_old_data); RETURN NEW; elsif (TG_OP = 'UPDATE') then v_old_data := row_to_json(NEW.*); insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data) values (TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,session_user::TEXT,substring(TG_OP,1,1),v_old_data); RETURN NEW; elsif (TG_OP = 'DELETE') then v_old_data := row_to_json(OLD.*); insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data) values (TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,session_user::TEXT,substring(TG_OP,1,1),v_old_data); RETURN OLD; else RAISE WARNING '[auditchanges.capture_changes] - Other action occurred: %, at %',TG_OP,now(); RETURN NULL; end if; EXCEPTION WHEN data_exception THEN RAISE WARNING '[auditchanges.capture_changes] - UDF ERROR [DATA EXCEPTION] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM; RETURN NULL; WHEN unique_violation THEN RAISE WARNING '[auditchanges.capture_changes] - UDF ERROR [UNIQUE] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM; RETURN NULL; WHEN others THEN RAISE WARNING '[auditchanges.capture_changes] - UDF ERROR [OTHER] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM; RETURN NULL; END; $BODY$; ALTER FUNCTION auditchanges.capture_changes() OWNER TO postgres; -- add trigger to various tables CREATE OR REPLACE PROCEDURE auditchanges.admin_create_audit_triggers(t_schemaname varchar, t_replace int) LANGUAGE 'plpgsql' AS $BODY$ DECLARE t_tabname varchar(200); t_triggername varchar(200); t_exists integer; T_COMMSTAT varchar(500); audtab_cur CURSOR FOR select table_name from information_schema.tables where lower(table_schema) = lower(t_schemaname) and table_name in (select parent.relname FROM pg_inherits JOIN pg_class parent ON pg_inherits.inhparent = parent.oid JOIN pg_class child ON pg_inherits.inhrelid = child.oid JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace) --this is to just get the main parent of a partitioned table UNION DISTINCT SELECT c.relname AS table_name FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace LEFT JOIN pg_inherits i ON c.oid = i.inhrelid WHERE c.relkind = 'r' -- 'r' indicates ordinary tables AND i.inhrelid IS NULL -- Exclude partitioned tables AND lower(n.nspname) = lower(t_schemaname) order by table_name; BEGIN OPEN audtab_cur; LOOP FETCH audtab_cur INTO t_tabname; EXIT WHEN NOT FOUND; t_triggername := 'tiud_'||t_schemaname||'_'||t_tabname||'_aud'; select count(*) into t_exists from pg_trigger where tgname = t_triggername; if t_exists = 0 or t_replace = 1 then T_COMMSTAT := 'CREATE OR REPLACE TRIGGER '||t_triggername||' AFTER INSERT OR UPDATE OR DELETE ON '||t_schemaname||'.'||t_tabname||' FOR EACH ROW EXECUTE PROCEDURE auditchanges.capture_changes();'; EXECUTE T_COMMSTAT; end if; END LOOP; CLOSE audtab_cur; END; $BODY$; --create audit triggers on all tables in the schema --call auditchanges.admin_create_audit_triggers('your-schema', 1);
CREATE OR REPLACE PROCEDURE auditchanges.admin_create_views(t_schemaname varchar) AS $$ DECLARE t_buildstat VARCHAR(4000); t_commstat TEXT; t_tabname varchar(128); t_check integer; audtab_cur CURSOR FOR select table_name from information_schema.tables where lower(table_schema) = lower(t_schemaname) and table_name in (select parent.relname FROM pg_inherits JOIN pg_class parent ON pg_inherits.inhparent = parent.oid JOIN pg_class child ON pg_inherits.inhrelid = child.oid JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace) --this is to just get the main parent of a partitioned table UNION DISTINCT SELECT c.relname AS table_name FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace LEFT JOIN pg_inherits i ON c.oid = i.inhrelid WHERE c.relkind = 'r' -- 'r' indicates ordinary tables AND i.inhrelid IS NULL -- Exclude partitioned tables AND lower(n.nspname) = lower(t_schemaname) order by table_name; BEGIN OPEN audtab_cur; LOOP FETCH audtab_cur INTO t_tabname; EXIT WHEN NOT FOUND; select count(*) into t_check FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = 'auditchanges' AND c.relname = lower(t_schemaname||'_'||t_tabname||'_aud_vw'); if t_check = 1 then t_commstat := lower('DROP VIEW auditchanges.'||t_schemaname||'_'||t_tabname||'_aud_vw'); EXECUTE t_commstat; end if; t_buildstat := lower('CREATE OR REPLACE VIEW auditchanges.'||t_schemaname||'_'||t_tabname||'_aud_vw AS SELECT id as audit_id, schemaname, tablename, username as audit_username, createdon as audit_createdon, dmlaction, '); t_commstat := t_buildstat; select string_agg('(original_data ->>'''||column_name||''')'||'::'||data_type||' as '||column_name,',' order by ordinal_position) INTO t_buildstat from information_schema.columns where lower(table_schema) = lower(t_schemaname) and lower(table_name) = lower(t_tabname) ; t_commstat := t_commstat||t_buildstat; t_buildstat := lower(' from auditchanges.data_changes where schemaname = '''||t_schemaname||''' and tablename = '''||t_tabname||''''); t_commstat := t_commstat||t_buildstat; EXECUTE t_commstat; END LOOP; CLOSE audtab_cur; END; $$ LANGUAGE plpgsql;