Auditing & Change Data Capture in PostgreSQL

I needed a quick way to capture DML changes across all tables in a specific PostgreSQL schema. I found a solution online that had trigger function that captures the OLD row and stores it as JSON in a central table called data_changes.

I then wrote a stored procedure that accepts a schema name as a parameter and loops through each table in that schema, attaching the trigger to track changes automatically.

To make querying easier, I added another procedure that generates a set of “audit” views. These views reshape the JSON in data_changes to match the structure of the original tables.

With this setup, if you need to stream changes from PostgreSQL into another system, you can simply read from the data_changes table.

CREATE SCHEMA auditchanges
    AUTHORIZATION postgres;

-- Table: auditchanges.data_changes

-- DROP TABLE auditchanges.data_changes;

CREATE TABLE auditchanges.data_changes
(
    id bigserial PRIMARY KEY,
    schemaname text COLLATE pg_catalog."default" NOT NULL,
    tablename text COLLATE pg_catalog."default" NOT NULL,
    username text COLLATE pg_catalog."default",
    createdon timestamp with time zone NOT NULL DEFAULT CURRENT_TIMESTAMP,
    dmlaction text COLLATE pg_catalog."default" NOT NULL,
    original_data json,
    CONSTRAINT data_changes_dmlaction_check CHECK (dmlaction = ANY (ARRAY['I'::text, 'D'::text, 'U'::text]))
)
TABLESPACE pg_default;

ALTER TABLE auditchanges.data_changes
    OWNER to postgres;


-- DROP INDEX auditchanges.data_changes_001_idx;

CREATE INDEX data_changes_001_idx
    ON auditchanges.data_changes USING btree
    (schemaname, tablename, createdon, dmlaction)
    TABLESPACE pg_default;


-- FUNCTION: auditchanges.capture_changes()

-- DROP FUNCTION auditchanges.capture_changes();

CREATE OR REPLACE FUNCTION auditchanges.capture_changes()
    RETURNS trigger
    LANGUAGE 'plpgsql'
    COST 100
    VOLATILE NOT LEAKPROOF
AS $BODY$
DECLARE
    v_old_data JSON;
BEGIN

    if (TG_OP = 'INSERT') then
        v_old_data := row_to_json(NEW.*);
        insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data) 
        values (TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,session_user::TEXT,substring(TG_OP,1,1),v_old_data);
        RETURN NEW;
    elsif (TG_OP = 'UPDATE') then
        v_old_data := row_to_json(NEW.*);
        insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data) 
        values (TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,session_user::TEXT,substring(TG_OP,1,1),v_old_data);
        RETURN NEW;
    elsif (TG_OP = 'DELETE') then
        v_old_data := row_to_json(OLD.*);
        insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data)
        values (TG_TABLE_SCHEMA::TEXT,TG_TABLE_NAME::TEXT,session_user::TEXT,substring(TG_OP,1,1),v_old_data);
        RETURN OLD;
    else
        RAISE WARNING '[auditchanges.capture_changes] - Other action occurred: %, at %',TG_OP,now();
        RETURN NULL;
    end if;

EXCEPTION
    WHEN data_exception THEN
        RAISE WARNING '[auditchanges.capture_changes] - UDF ERROR [DATA EXCEPTION] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
        RETURN NULL;
    WHEN unique_violation THEN
        RAISE WARNING '[auditchanges.capture_changes] - UDF ERROR [UNIQUE] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
        RETURN NULL;
    WHEN others THEN
        RAISE WARNING '[auditchanges.capture_changes] - UDF ERROR [OTHER] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
        RETURN NULL;
END;
$BODY$;

ALTER FUNCTION auditchanges.capture_changes()
    OWNER TO postgres;
	
-- add trigger to various tables


CREATE OR REPLACE PROCEDURE auditchanges.admin_create_audit_triggers(t_schemaname varchar, t_replace int)
    LANGUAGE 'plpgsql'
AS $BODY$
DECLARE
 t_tabname varchar(200);
 t_triggername varchar(200);
 t_exists integer;
 T_COMMSTAT varchar(500);
 audtab_cur CURSOR FOR select table_name from information_schema.tables where lower(table_schema) = lower(t_schemaname)
                and table_name in (select parent.relname FROM pg_inherits
                JOIN pg_class parent            ON pg_inherits.inhparent = parent.oid
                JOIN pg_class child             ON pg_inherits.inhrelid   = child.oid
                JOIN pg_namespace nmsp_parent   ON nmsp_parent.oid  = parent.relnamespace
                JOIN pg_namespace nmsp_child    ON nmsp_child.oid   = child.relnamespace) --this is to just get the main parent of a partitioned table
            UNION DISTINCT
            SELECT c.relname AS table_name FROM pg_class c 
                JOIN pg_namespace n ON n.oid = c.relnamespace
                LEFT JOIN pg_inherits i ON c.oid = i.inhrelid
            WHERE c.relkind = 'r'  -- 'r' indicates ordinary tables
                AND i.inhrelid IS NULL  -- Exclude partitioned tables
                AND lower(n.nspname) = lower(t_schemaname) order by table_name;  

BEGIN

	OPEN audtab_cur;
	LOOP
		FETCH audtab_cur INTO t_tabname;
		EXIT WHEN NOT FOUND;

		t_triggername := 'tiud_'||t_schemaname||'_'||t_tabname||'_aud';
		select count(*) into t_exists from pg_trigger where tgname = t_triggername;

		if t_exists = 0 or t_replace = 1 then
			T_COMMSTAT := 'CREATE OR REPLACE TRIGGER '||t_triggername||' AFTER INSERT OR UPDATE OR DELETE ON '||t_schemaname||'.'||t_tabname||' FOR EACH ROW EXECUTE PROCEDURE auditchanges.capture_changes();';
			EXECUTE T_COMMSTAT;
		end if;

		
	END LOOP;
	CLOSE audtab_cur;

END;
$BODY$;


--create audit triggers on all tables in the schema
--call auditchanges.admin_create_audit_triggers('your-schema', 1);
CREATE OR REPLACE PROCEDURE auditchanges.admin_create_views(t_schemaname varchar) 
   AS $$
    DECLARE 
    t_buildstat VARCHAR(4000);
    t_commstat TEXT;
    t_tabname varchar(128);
	t_check integer;

    audtab_cur CURSOR FOR select table_name from information_schema.tables where lower(table_schema) = lower(t_schemaname)
                and table_name in (select parent.relname FROM pg_inherits
                JOIN pg_class parent            ON pg_inherits.inhparent = parent.oid
                JOIN pg_class child             ON pg_inherits.inhrelid   = child.oid
                JOIN pg_namespace nmsp_parent   ON nmsp_parent.oid  = parent.relnamespace
                JOIN pg_namespace nmsp_child    ON nmsp_child.oid   = child.relnamespace) --this is to just get the main parent of a partitioned table
            UNION DISTINCT
            SELECT c.relname AS table_name FROM pg_class c 
                JOIN pg_namespace n ON n.oid = c.relnamespace
                LEFT JOIN pg_inherits i ON c.oid = i.inhrelid
            WHERE c.relkind = 'r'  -- 'r' indicates ordinary tables
                AND i.inhrelid IS NULL  -- Exclude partitioned tables
                AND lower(n.nspname) = lower(t_schemaname) order by table_name;  

BEGIN

	OPEN audtab_cur;
	LOOP
		FETCH audtab_cur INTO t_tabname;
		EXIT WHEN NOT FOUND;

		select count(*) into t_check FROM pg_catalog.pg_class c JOIN   pg_catalog.pg_namespace n ON n.oid = c.relnamespace
            WHERE  n.nspname = 'auditchanges' AND c.relname = lower(t_schemaname||'_'||t_tabname||'_aud_vw');

        if t_check = 1 then
            t_commstat := lower('DROP VIEW auditchanges.'||t_schemaname||'_'||t_tabname||'_aud_vw');
            EXECUTE t_commstat;
        end if;
        
        t_buildstat := lower('CREATE OR REPLACE VIEW auditchanges.'||t_schemaname||'_'||t_tabname||'_aud_vw AS SELECT id as audit_id, schemaname, tablename, username as audit_username, createdon as audit_createdon, dmlaction, ');
        t_commstat := t_buildstat;

        select string_agg('(original_data ->>'''||column_name||''')'||'::'||data_type||' as '||column_name,',' order by ordinal_position) INTO t_buildstat from information_schema.columns where lower(table_schema) = lower(t_schemaname) and lower(table_name) = lower(t_tabname) ;
   
        t_commstat := t_commstat||t_buildstat;
        t_buildstat := lower(' from auditchanges.data_changes where schemaname = '''||t_schemaname||''' and tablename = '''||t_tabname||'''');
        t_commstat := t_commstat||t_buildstat;
        EXECUTE t_commstat; 

    END LOOP;
	CLOSE audtab_cur;
END;
$$ LANGUAGE plpgsql;    
Previous
Previous

Auditing & Change Data Capture in Oracle

Next
Next

The DB Paginator?