Auditing & Change Data Capture in PostgreSQL
I needed a quick way to capture DML changes across all tables in a specific PostgreSQL schema. I found a solution online that had trigger function that captures the OLD row and stores it as JSON in a central table called data_changes.
I then wrote a stored procedure that accepts a schema name as a parameter and loops through each table in that schema, attaching the trigger to track changes automatically.
To make querying easier, I added another procedure that generates a set of “audit” views. These views reshape the JSON in data_changes to match the structure of the original tables.
With this setup, if you need to stream changes from PostgreSQL into another system, you can simply read from the data_changes table.
CREATE SCHEMA auditchanges
AUTHORIZATION postgres;
-- Table: auditchanges.data_changes
-- DROP TABLE auditchanges.data_changes;
CREATE TABLE auditchanges.data_changes
(
id bigserial PRIMARY KEY,
schemaname text COLLATE pg_catalog."default" NOT NULL,
tablename text COLLATE pg_catalog."default" NOT NULL,
username text COLLATE pg_catalog."default",
createdon timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
dmlaction text COLLATE pg_catalog."default" NOT NULL,
original_data json,
CONSTRAINT data_changes_dmlaction_check CHECK (dmlaction = ANY (ARRAY['I'::text, 'D'::text, 'U'::text]))
)
TABLESPACE pg_default;
ALTER TABLE auditchanges.data_changes
OWNER to postgres;
-- DROP INDEX auditchanges.data_changes_001_idx;
CREATE INDEX data_changes_001_idx
ON auditchanges.data_changes USING btree
(schemaname, tablename, createdon, dmlaction)
TABLESPACE pg_default;
-- FUNCTION: auditchanges.capture_changes()
-- DROP FUNCTION auditchanges.capture_changes();
CREATE OR REPLACE FUNCTION auditchanges.capture_changes()
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS $BODY$
DECLARE
v_old_data JSON;
BEGIN
if (TG_OP = 'INSERT') then
v_old_data := row_to_json(NEW.*);
insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data)
values (TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,session_user::TEXT,substring(TG_OP,1,1),v_old_data);
RETURN NEW;
elsif (TG_OP = 'UPDATE') then
v_old_data := row_to_json(NEW.*);
insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data)
values (TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,session_user::TEXT,substring(TG_OP,1,1),v_old_data);
RETURN NEW;
elsif (TG_OP = 'DELETE') then
v_old_data := row_to_json(OLD.*);
insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data)
values (TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,session_user::TEXT,substring(TG_OP,1,1),v_old_data);
RETURN OLD;
else
RAISE WARNING '[auditchanges.capture_changes] - Other action occurred: %, at %',TG_OP,now();
RETURN NULL;
end if;
EXCEPTION
WHEN data_exception THEN
RAISE WARNING '[auditchanges.capture_changes] - UDF ERROR [DATA EXCEPTION] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
RETURN NULL;
WHEN unique_violation THEN
RAISE WARNING '[auditchanges.capture_changes] - UDF ERROR [UNIQUE] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
RETURN NULL;
WHEN others THEN
RAISE WARNING '[auditchanges.capture_changes] - UDF ERROR [OTHER] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
RETURN NULL;
END;
$BODY$;
ALTER FUNCTION auditchanges.capture_changes()
OWNER TO postgres;
-- add trigger to various tables
CREATE OR REPLACE PROCEDURE auditchanges.admin_create_audit_triggers(t_schemaname varchar, t_replace int)
LANGUAGE 'plpgsql'
AS $BODY$
DECLARE
t_tabname varchar(200);
t_triggername varchar(200);
t_exists integer;
T_COMMSTAT varchar(500);
audtab_cur CURSOR FOR select table_name from information_schema.tables where lower(table_schema) = lower(t_schemaname)
and table_name in (select parent.relname FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace) --this is to just get the main parent of a partitioned table
UNION DISTINCT
SELECT c.relname AS table_name FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_inherits i ON c.oid = i.inhrelid
WHERE c.relkind = 'r' -- 'r' indicates ordinary tables
AND i.inhrelid IS NULL -- Exclude partitioned tables
AND lower(n.nspname) = lower(t_schemaname) order by table_name;
BEGIN
OPEN audtab_cur;
LOOP
FETCH audtab_cur INTO t_tabname;
EXIT WHEN NOT FOUND;
t_triggername := 'tiud_'||t_schemaname||'_'||t_tabname||'_aud';
select count(*) into t_exists from pg_trigger where tgname = t_triggername;
if t_exists = 0 or t_replace = 1 then
T_COMMSTAT := 'CREATE OR REPLACE TRIGGER '||t_triggername||' AFTER INSERT OR UPDATE OR DELETE ON '||t_schemaname||'.'||t_tabname||' FOR EACH ROW EXECUTE PROCEDURE auditchanges.capture_changes();';
EXECUTE T_COMMSTAT;
end if;
END LOOP;
CLOSE audtab_cur;
END;
$BODY$;
--create audit triggers on all tables in the schema
--call auditchanges.admin_create_audit_triggers('your-schema', 1);
CREATE OR REPLACE PROCEDURE auditchanges.admin_create_views(t_schemaname varchar)
AS $$
DECLARE
t_buildstat VARCHAR(4000);
t_commstat TEXT;
t_tabname varchar(128);
t_check integer;
audtab_cur CURSOR FOR select table_name from information_schema.tables where lower(table_schema) = lower(t_schemaname)
and table_name in (select parent.relname FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace) --this is to just get the main parent of a partitioned table
UNION DISTINCT
SELECT c.relname AS table_name FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_inherits i ON c.oid = i.inhrelid
WHERE c.relkind = 'r' -- 'r' indicates ordinary tables
AND i.inhrelid IS NULL -- Exclude partitioned tables
AND lower(n.nspname) = lower(t_schemaname) order by table_name;
BEGIN
OPEN audtab_cur;
LOOP
FETCH audtab_cur INTO t_tabname;
EXIT WHEN NOT FOUND;
select count(*) into t_check FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'auditchanges' AND c.relname = lower(t_schemaname||'_'||t_tabname||'_aud_vw');
if t_check = 1 then
t_commstat := lower('DROP VIEW auditchanges.'||t_schemaname||'_'||t_tabname||'_aud_vw');
EXECUTE t_commstat;
end if;
t_buildstat := lower('CREATE OR REPLACE VIEW auditchanges.'||t_schemaname||'_'||t_tabname||'_aud_vw AS SELECT id as audit_id, schemaname, tablename, username as audit_username, createdon as audit_createdon, dmlaction, ');
t_commstat := t_buildstat;
select string_agg('(original_data ->>'''||column_name||''')'||'::'||data_type||' as '||column_name,',' order by ordinal_position) INTO t_buildstat from information_schema.columns where lower(table_schema) = lower(t_schemaname) and lower(table_name) = lower(t_tabname) ;
t_commstat := t_commstat||t_buildstat;
t_buildstat := lower(' from auditchanges.data_changes where schemaname = '''||t_schemaname||''' and tablename = '''||t_tabname||'''');
t_commstat := t_commstat||t_buildstat;
EXECUTE t_commstat;
END LOOP;
CLOSE audtab_cur;
END;
$$ LANGUAGE plpgsql;