Auditing & Change Data Capture in Oracle
Much like what I wrote for PG, I wrote this for Oracle Database.
I wrote a stored procedure that accepts a schema name as a parameter and loops through each table in that schema, attaching the trigger to track changes automatically. The triggers capture the OLD
row and stores it as JSON in a central table called data_changes
.
To make querying easier, I added another procedure that generates a set of “audit” views. These views reshape the JSON in data_changes
to match the structure of the original tables.
With this setup, if you need to stream changes from Oracle into another system, you can simply read from the data_changes
table. Currently, BLOB and CLOB datatypes are excluded.
CREATE USER AUDITCHANGES IDENTIFIED BY aGreatPassword1 DEFAULT TABLESPACE DATA; grant unlimited tablespace to AUDITCHANGES; grant select on dba_tables to AUDITCHANGES; grant select on dba_triggers to AUDITCHANGES; grant select on dba_tab_columns to AUDITCHANGES; grant create any trigger to AUDITCHANGES; grant create any view to AUDITCHANGES; ---****NOTE!!! the user/schema where data changes are tracked needs to be able to insert into auditchanges too!! grant insert on auditchanges.data_changes to your_schema; CREATE TABLE AUDITCHANGES.data_changes ( id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, schemaname VARCHAR2(255) NOT NULL, tablename VARCHAR2(255) NOT NULL, username VARCHAR2(255), createdon TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL, dmlaction VARCHAR2(1) NOT NULL, original_data CLOB, CONSTRAINT data_changes_dmlaction_check CHECK (dmlaction IN ('I', 'D', 'U')) ); CREATE INDEX AUDITCHANGES.data_changes_001_idx ON AUDITCHANGES.data_changes (schemaname, tablename, createdon, dmlaction); --for debuging create table AUDITCHANGES.debugcalls(createdon date default sysdate, commstat clob); CREATE OR REPLACE PROCEDURE AUDITCHANGES.admin_create_audit_triggers(t_schemaname VARCHAR2, t_replace NUMBER) IS t_tabname VARCHAR2(200); t_triggername VARCHAR2(200); t_exists NUMBER; t_json CLOB; t_buildjson CLOB; T_BUILDSTAT VARCHAR2(2000); T_COMMSTAT CLOB; CURSOR audtab_cur IS select table_name from dba_tables where owner = upper(t_schemaname) ORDER BY table_name; BEGIN OPEN audtab_cur; LOOP FETCH audtab_cur INTO t_tabname; EXIT WHEN audtab_cur%NOTFOUND; t_triggername := 'tiud_' || t_schemaname || '_' || t_tabname || '_aud'; t_json := '{'; T_BUILDSTAT := ''; T_COMMSTAT := ''; SELECT COUNT(*) INTO t_exists FROM dba_triggers WHERE trigger_name = upper(t_triggername) AND owner = upper(t_schemaname); IF t_exists = 0 or t_replace = 1 THEN T_BUILDSTAT := 'CREATE OR REPLACE TRIGGER ' ||t_schemaname||'.'|| t_triggername || ' AFTER INSERT OR UPDATE OR DELETE ON ' || t_schemaname || '.' || t_tabname || ' FOR EACH ROW'|| ' BEGIN'|| ' IF INSERTING THEN '; T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT; --this part will be the same for insert or update SELECT listagg('"'||column_name||'": "''||:new.'||column_name, '||''", ') WITHIN GROUP (order by column_id) into t_buildjson FROM dba_tab_columns WHERE table_name = UPPER(t_tabname) AND owner = upper(t_schemaname) ORDER BY column_id; t_json := t_json||t_buildjson||'||''"}'''; --addional formating incase a column name is actually JSON --t_json := replace(replace(t_json, '"{','{'), '}"','}'); T_BUILDSTAT := 'insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data)'|| ' values ('''||upper(t_schemaname)||''', '''||t_tabname||''', USER, ''I'','''||t_json||');'; T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT; T_BUILDSTAT := ' ELSIF UPDATING THEN '; T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT; T_BUILDSTAT := 'insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data)'|| ' values ('''||upper(t_schemaname)||''', '''||t_tabname||''', USER, ''U'','''||t_json||');'; T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT; t_json := '{'; T_BUILDSTAT := ' ELSIF DELETING THEN '; T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT; SELECT listagg('"'||column_name||'": "''||:old.'||column_name, '||''", ') WITHIN GROUP (order by column_id) into t_buildjson FROM dba_tab_columns WHERE table_name = UPPER(t_tabname) AND owner = upper(t_schemaname) ORDER BY column_id; t_json := t_json||t_buildjson||'||''"}'''; --addional formating incase a column name is actually JSON --t_json := replace(replace(t_json, '"{','{'), '}"','}'); T_BUILDSTAT := 'insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data)'|| ' values ('''||upper(t_schemaname)||''', '''||t_tabname||''', USER, ''D'','''||t_json||');'; T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT; T_BUILDSTAT := ' END IF;'; T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT; T_BUILDSTAT := ' END;'; T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT; EXECUTE IMMEDIATE T_COMMSTAT; --insert into AUDITCHANGES.debugcalls(commstat) values (t_commstat); END IF; END LOOP; CLOSE audtab_cur; END; /
CREATE OR REPLACE PROCEDURE auditchanges.admin_create_views(t_schemaname varchar) AS t_buildstat varchar2(4000); t_commstat CLOB; BEGIN FOR rec IN (select table_name from dba_tables where owner = upper(t_schemaname) ORDER BY table_name) LOOP BEGIN t_buildstat := 'CREATE OR REPLACE VIEW auditchanges.'||t_schemaname||'_'||rec.table_name||'_AUD_VW AS SELECT id as audit_id, schemaname, tablename, username as audit_username, createdon as audit_createdon, dmlaction, '; t_commstat := t_buildstat; select listagg(coldata, ',') within group (order by column_id) INTO t_buildstat from ( select 'CAST(JSON_VALUE(original_data, ''$.'||column_name||''') AS '||data_type||case when data_type like '%CHAR%' THEN '('||TO_CHAR(DATA_LENGTH)||')' ELSE '' END||') '||column_name as coldata, column_id from dba_tab_columns where owner = upper(t_schemaname) and table_name = upper(rec.table_name) and data_type not in ('CLOB', 'BLOB') union select 'JSON_QUERY(replace(replace(original_data, ''"{'',''[{''), ''}"'',''}]''), ''$.'||column_name||''' RETURNING '||data_type||' pretty with wrapper) '||column_name as coldata, column_id from dba_tab_columns where owner = upper(t_schemaname) and table_name = upper(rec.table_name) and data_type in ('CLOB') ) ; t_commstat := t_commstat||t_buildstat; t_buildstat := ' from auditchanges.data_changes where schemaname = '''||upper(t_schemaname)||''' and tablename = '''||rec.table_name||''''; t_commstat := t_commstat||t_buildstat; EXECUTE IMMEDIATE t_commstat; /*EXCEPTION WHEN OTHERS THEN -- Handle exceptions, e.g., index already exists -- This is simplistic; in real scenarios, you'd check the specific exception DBMS_OUTPUT.PUT_LINE('An error occurred: ' || SQLERRM);*/ END; END LOOP; END; /