Auditing & Change Data Capture in Oracle

Much like what I wrote for PG, I wrote this for Oracle Database.

I wrote a stored procedure that accepts a schema name as a parameter and loops through each table in that schema, attaching the trigger to track changes automatically. The triggers capture the OLD row and stores it as JSON in a central table called data_changes.

To make querying easier, I added another procedure that generates a set of “audit” views. These views reshape the JSON in data_changes to match the structure of the original tables.

With this setup, if you need to stream changes from Oracle into another system, you can simply read from the data_changes table. Currently, BLOB and CLOB datatypes are excluded.

CREATE USER AUDITCHANGES IDENTIFIED BY aGreatPassword1
    DEFAULT TABLESPACE DATA;


grant unlimited tablespace to AUDITCHANGES;    

grant select on dba_tables to AUDITCHANGES;

grant select on dba_triggers to AUDITCHANGES;

grant select on dba_tab_columns to AUDITCHANGES;

grant create any trigger to AUDITCHANGES;

grant create any view to AUDITCHANGES;


---****NOTE!!! the user/schema where data changes are tracked needs to be able to insert into auditchanges too!!

grant insert on  auditchanges.data_changes to your_schema;


CREATE TABLE AUDITCHANGES.data_changes
(
    id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
    schemaname VARCHAR2(255) NOT NULL,
    tablename VARCHAR2(255) NOT NULL,
    username VARCHAR2(255),
    createdon TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
    dmlaction VARCHAR2(1) NOT NULL,
    original_data CLOB,
    CONSTRAINT data_changes_dmlaction_check CHECK (dmlaction IN ('I', 'D', 'U'))
);

CREATE INDEX AUDITCHANGES.data_changes_001_idx
    ON AUDITCHANGES.data_changes (schemaname, tablename, createdon, dmlaction);

--for debuging
create table AUDITCHANGES.debugcalls(createdon date default sysdate, commstat clob);


CREATE OR REPLACE PROCEDURE AUDITCHANGES.admin_create_audit_triggers(t_schemaname VARCHAR2, t_replace NUMBER) 
IS
    t_tabname VARCHAR2(200);
    t_triggername VARCHAR2(200);
    t_exists NUMBER;
    t_json CLOB;
    t_buildjson CLOB;
    T_BUILDSTAT VARCHAR2(2000);
    T_COMMSTAT CLOB;

    CURSOR audtab_cur IS
        select table_name from dba_tables where owner = upper(t_schemaname)
        ORDER BY table_name;

BEGIN
    OPEN audtab_cur;
    LOOP
        FETCH audtab_cur INTO t_tabname;
        EXIT WHEN audtab_cur%NOTFOUND;

        t_triggername := 'tiud_' || t_schemaname || '_' || t_tabname || '_aud';
        t_json := '{';
        T_BUILDSTAT := '';
        T_COMMSTAT := '';
        SELECT COUNT(*) INTO t_exists 
        FROM dba_triggers 
        WHERE trigger_name = upper(t_triggername) 
        AND owner = upper(t_schemaname);

        IF t_exists = 0 or t_replace  = 1 THEN


            T_BUILDSTAT := 'CREATE OR REPLACE TRIGGER ' ||t_schemaname||'.'|| t_triggername || 
                          ' AFTER INSERT OR UPDATE OR DELETE ON ' || t_schemaname || '.' || t_tabname || 
                          ' FOR EACH ROW'||
                          ' BEGIN'||
                          ' IF INSERTING THEN ';

            T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;   

            --this part will be the same for insert or update
            SELECT listagg('"'||column_name||'": "''||:new.'||column_name, '||''", ') WITHIN GROUP (order by column_id) into t_buildjson
              FROM dba_tab_columns
              WHERE table_name = UPPER(t_tabname)
              AND owner = upper(t_schemaname)
              ORDER BY column_id;      
            
            t_json := t_json||t_buildjson||'||''"}''';
            --addional formating incase a column name is actually JSON
            --t_json := replace(replace(t_json, '"{','{'), '}"','}');

            T_BUILDSTAT := 'insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data)'||
                            ' values ('''||upper(t_schemaname)||''', '''||t_tabname||''', USER, ''I'','''||t_json||');';

            T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;

            T_BUILDSTAT := ' ELSIF UPDATING THEN ';

            T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;              

            T_BUILDSTAT := 'insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data)'||
                            ' values ('''||upper(t_schemaname)||''', '''||t_tabname||''', USER, ''U'','''||t_json||');';

            T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;

            t_json := '{';
            T_BUILDSTAT := ' ELSIF DELETING THEN ';

            T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;   

            SELECT listagg('"'||column_name||'": "''||:old.'||column_name, '||''", ') WITHIN GROUP (order by column_id) into t_buildjson
              FROM dba_tab_columns
              WHERE table_name = UPPER(t_tabname)
              AND owner = upper(t_schemaname)
              ORDER BY column_id;   

            t_json := t_json||t_buildjson||'||''"}''';
            --addional formating incase a column name is actually JSON
            --t_json := replace(replace(t_json, '"{','{'), '}"','}');

            T_BUILDSTAT := 'insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data)'||
                            ' values ('''||upper(t_schemaname)||''', '''||t_tabname||''', USER, ''D'','''||t_json||');';   

            T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;

            T_BUILDSTAT := ' END IF;';

            T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;
            
            T_BUILDSTAT := ' END;';

            T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;            

            EXECUTE IMMEDIATE T_COMMSTAT;
            --insert into AUDITCHANGES.debugcalls(commstat) values (t_commstat);
        END IF;
    END LOOP;
    CLOSE audtab_cur;
END;
/
CREATE OR REPLACE PROCEDURE  auditchanges.admin_create_views(t_schemaname varchar) AS
    t_buildstat varchar2(4000);
    t_commstat CLOB;
BEGIN
    FOR rec IN (select table_name from dba_tables where owner = upper(t_schemaname) ORDER BY table_name)
    LOOP
        
        BEGIN
            t_buildstat := 'CREATE OR REPLACE VIEW auditchanges.'||t_schemaname||'_'||rec.table_name||'_AUD_VW AS SELECT id as audit_id, schemaname, tablename, username as audit_username, createdon as audit_createdon, dmlaction, ';
            t_commstat := t_buildstat;
            select listagg(coldata, ',') within group (order by column_id) INTO t_buildstat from (
                select 'CAST(JSON_VALUE(original_data, ''$.'||column_name||''') AS '||data_type||case when data_type like '%CHAR%' THEN '('||TO_CHAR(DATA_LENGTH)||')' ELSE '' END||') '||column_name as coldata, column_id from dba_tab_columns where owner = upper(t_schemaname) and table_name = upper(rec.table_name) and data_type not in ('CLOB', 'BLOB')
                union
                select 'JSON_QUERY(replace(replace(original_data, ''"{'',''[{''), ''}"'',''}]''), ''$.'||column_name||''' RETURNING '||data_type||' pretty with wrapper) '||column_name as coldata, column_id from dba_tab_columns where owner = upper(t_schemaname) and table_name = upper(rec.table_name) and data_type in ('CLOB')
                ) ;
            t_commstat := t_commstat||t_buildstat;
            t_buildstat :=  ' from auditchanges.data_changes where schemaname = '''||upper(t_schemaname)||''' and tablename = '''||rec.table_name||'''';
            
            t_commstat := t_commstat||t_buildstat;
            EXECUTE IMMEDIATE t_commstat; 
        /*EXCEPTION
            WHEN OTHERS THEN
                -- Handle exceptions, e.g., index already exists
                -- This is simplistic; in real scenarios, you'd check the specific exception
                DBMS_OUTPUT.PUT_LINE('An error occurred: ' || SQLERRM);*/
        END;
    END LOOP;
END;
/
Previous
Previous

Batch Data Pipeline from Iterable to GCS to BigQuery (via Cloud Run)

Next
Next

Auditing & Change Data Capture in PostgreSQL