Auditing & Change Data Capture in Oracle
Much like what I wrote for PG, I wrote this for Oracle Database.
I wrote a stored procedure that accepts a schema name as a parameter and loops through each table in that schema, attaching the trigger to track changes automatically. The triggers capture the OLD row and stores it as JSON in a central table called data_changes.
To make querying easier, I added another procedure that generates a set of “audit” views. These views reshape the JSON in data_changes to match the structure of the original tables.
With this setup, if you need to stream changes from Oracle into another system, you can simply read from the data_changes table. Currently, BLOB and CLOB datatypes are excluded.
CREATE USER AUDITCHANGES IDENTIFIED BY aGreatPassword1
DEFAULT TABLESPACE DATA;
grant unlimited tablespace to AUDITCHANGES;
grant select on dba_tables to AUDITCHANGES;
grant select on dba_triggers to AUDITCHANGES;
grant select on dba_tab_columns to AUDITCHANGES;
grant create any trigger to AUDITCHANGES;
grant create any view to AUDITCHANGES;
---****NOTE!!! the user/schema where data changes are tracked needs to be able to insert into auditchanges too!!
grant insert on auditchanges.data_changes to your_schema;
CREATE TABLE AUDITCHANGES.data_changes
(
id NUMBER GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
schemaname VARCHAR2(255) NOT NULL,
tablename VARCHAR2(255) NOT NULL,
username VARCHAR2(255),
createdon TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP NOT NULL,
dmlaction VARCHAR2(1) NOT NULL,
original_data CLOB,
CONSTRAINT data_changes_dmlaction_check CHECK (dmlaction IN ('I', 'D', 'U'))
);
CREATE INDEX AUDITCHANGES.data_changes_001_idx
ON AUDITCHANGES.data_changes (schemaname, tablename, createdon, dmlaction);
--for debuging
create table AUDITCHANGES.debugcalls(createdon date default sysdate, commstat clob);
CREATE OR REPLACE PROCEDURE AUDITCHANGES.admin_create_audit_triggers(t_schemaname VARCHAR2, t_replace NUMBER)
IS
t_tabname VARCHAR2(200);
t_triggername VARCHAR2(200);
t_exists NUMBER;
t_json CLOB;
t_buildjson CLOB;
T_BUILDSTAT VARCHAR2(2000);
T_COMMSTAT CLOB;
CURSOR audtab_cur IS
select table_name from dba_tables where owner = upper(t_schemaname)
ORDER BY table_name;
BEGIN
OPEN audtab_cur;
LOOP
FETCH audtab_cur INTO t_tabname;
EXIT WHEN audtab_cur%NOTFOUND;
t_triggername := 'tiud_' || t_schemaname || '_' || t_tabname || '_aud';
t_json := '{';
T_BUILDSTAT := '';
T_COMMSTAT := '';
SELECT COUNT(*) INTO t_exists
FROM dba_triggers
WHERE trigger_name = upper(t_triggername)
AND owner = upper(t_schemaname);
IF t_exists = 0 or t_replace = 1 THEN
T_BUILDSTAT := 'CREATE OR REPLACE TRIGGER ' ||t_schemaname||'.'|| t_triggername ||
' AFTER INSERT OR UPDATE OR DELETE ON ' || t_schemaname || '.' || t_tabname ||
' FOR EACH ROW'||
' BEGIN'||
' IF INSERTING THEN ';
T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;
--this part will be the same for insert or update
SELECT listagg('"'||column_name||'": "''||:new.'||column_name, '||''", ') WITHIN GROUP (order by column_id) into t_buildjson
FROM dba_tab_columns
WHERE table_name = UPPER(t_tabname)
AND owner = upper(t_schemaname)
ORDER BY column_id;
t_json := t_json||t_buildjson||'||''"}''';
--addional formating incase a column name is actually JSON
--t_json := replace(replace(t_json, '"{','{'), '}"','}');
T_BUILDSTAT := 'insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data)'||
' values ('''||upper(t_schemaname)||''', '''||t_tabname||''', USER, ''I'','''||t_json||');';
T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;
T_BUILDSTAT := ' ELSIF UPDATING THEN ';
T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;
T_BUILDSTAT := 'insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data)'||
' values ('''||upper(t_schemaname)||''', '''||t_tabname||''', USER, ''U'','''||t_json||');';
T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;
t_json := '{';
T_BUILDSTAT := ' ELSIF DELETING THEN ';
T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;
SELECT listagg('"'||column_name||'": "''||:old.'||column_name, '||''", ') WITHIN GROUP (order by column_id) into t_buildjson
FROM dba_tab_columns
WHERE table_name = UPPER(t_tabname)
AND owner = upper(t_schemaname)
ORDER BY column_id;
t_json := t_json||t_buildjson||'||''"}''';
--addional formating incase a column name is actually JSON
--t_json := replace(replace(t_json, '"{','{'), '}"','}');
T_BUILDSTAT := 'insert into auditchanges.data_changes (schemaname,tablename,username,dmlaction,original_data)'||
' values ('''||upper(t_schemaname)||''', '''||t_tabname||''', USER, ''D'','''||t_json||');';
T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;
T_BUILDSTAT := ' END IF;';
T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;
T_BUILDSTAT := ' END;';
T_COMMSTAT := T_COMMSTAT||T_BUILDSTAT;
EXECUTE IMMEDIATE T_COMMSTAT;
--insert into AUDITCHANGES.debugcalls(commstat) values (t_commstat);
END IF;
END LOOP;
CLOSE audtab_cur;
END;
/
CREATE OR REPLACE PROCEDURE auditchanges.admin_create_views(t_schemaname varchar) AS
t_buildstat varchar2(4000);
t_commstat CLOB;
BEGIN
FOR rec IN (select table_name from dba_tables where owner = upper(t_schemaname) ORDER BY table_name)
LOOP
BEGIN
t_buildstat := 'CREATE OR REPLACE VIEW auditchanges.'||t_schemaname||'_'||rec.table_name||'_AUD_VW AS SELECT id as audit_id, schemaname, tablename, username as audit_username, createdon as audit_createdon, dmlaction, ';
t_commstat := t_buildstat;
select listagg(coldata, ',') within group (order by column_id) INTO t_buildstat from (
select 'CAST(JSON_VALUE(original_data, ''$.'||column_name||''') AS '||data_type||case when data_type like '%CHAR%' THEN '('||TO_CHAR(DATA_LENGTH)||')' ELSE '' END||') '||column_name as coldata, column_id from dba_tab_columns where owner = upper(t_schemaname) and table_name = upper(rec.table_name) and data_type not in ('CLOB', 'BLOB')
union
select 'JSON_QUERY(replace(replace(original_data, ''"{'',''[{''), ''}"'',''}]''), ''$.'||column_name||''' RETURNING '||data_type||' pretty with wrapper) '||column_name as coldata, column_id from dba_tab_columns where owner = upper(t_schemaname) and table_name = upper(rec.table_name) and data_type in ('CLOB')
) ;
t_commstat := t_commstat||t_buildstat;
t_buildstat := ' from auditchanges.data_changes where schemaname = '''||upper(t_schemaname)||''' and tablename = '''||rec.table_name||'''';
t_commstat := t_commstat||t_buildstat;
EXECUTE IMMEDIATE t_commstat;
/*EXCEPTION
WHEN OTHERS THEN
-- Handle exceptions, e.g., index already exists
-- This is simplistic; in real scenarios, you'd check the specific exception
DBMS_OUTPUT.PUT_LINE('An error occurred: ' || SQLERRM);*/
END;
END LOOP;
END;
/