Hi All,
Environment: Oracle 12.2 on RHEL 7.6 (I’m testing on en Express Ed.)
Goal: I want to run an event-based job each time an application running on a remote DB processes a task using AQ (for now, I don’t want to use remote jobs mechanism or file-watcher event jobs).
Setup: A message is enqueued to a source queue IAQ_TEST_SRC and propagated to a destination queue IAQ_TEST_DST on a remote database. Each time a message arrives into the destination queue, a job is raised to execute some tasks.
I successfully setup a unitary test of queue-to-queue propagation between 2 databases
I successfully setup a unitary test of event-based job running each time a message is enqueued in the queue on which the job is defined
But I cannot put things work together: having an event-based job defined on a destination queue of a queue-to-queue propagation run when a message is enqueued.
1) Event-based Job Test:
for this test, I used the example provided in Event-Based Jobs
2) Propagation Test Setup:
2.1) Setup
-- DESTINATION DATABASE --
CREATE USER qdst IDENTIFIED BY qdst DEFAULT TABLESPACE USERS TEMPORARY TABLESPACE TEMP;
GRANT CONNECT, RESOURCE TO qdst;
GRANT AQ_ADMINISTRATOR_ROLE TO qdst;
GRANT EXECUTE ON dbms_aq TO qdst;
GRANT MANAGE SCHEDULER, CREATE PROCEDURE, CREATE ANY JOB TO qdst;
ALTER USER QDST QUOTA UNLIMITED ON USERS;
CONNECT qdst/qdst@destdb
-- Create object type for Payload
CREATE TYPE TYPE_EVTQ_OBJ AS OBJECT (TASK_ID INTEGER, RUN_ID INTEGER);
/
-- Create Destination queue table
begin
sys.dbms_aqadm.create_queue_table(
queue_table => 'QDST.IQT_TEST_DST',
queue_payload_type => 'QDST.TYPE_EVTQ_OBJ',
multiple_consumers => TRUE,
storage_clause => 'TABLESPACE USERS',
COMMENT => 'Destination Queue Table');
end;
/
-- Create the destination queue:
begin
sys.dbms_aqadm.create_queue(
queue_name => 'IAQ_TEST_DST',
queue_table => 'IQT_TEST_DST',
queue_type => sys.dbms_aqadm.normal_queue,
max_retries => 5,
retry_delay => 0,
retention_time => 0,
COMMENT => 'Destination Advanced Queue');
end;
/
-- Start The Queue:
BEGIN
Dbms_Aqadm.Start_Queue('QDST.IAQ_TEST_DST');
END;
/
-- SOURCE DATABASE --
CREATE USER qsrc IDENTIFIED BY qsrc DEFAULT TABLESPACE USERS TEMPORARY TABLESPACE TEMP;
GRANT CONNECT, RESOURCE TO qsrc;
GRANT AQ_ADMINISTRATOR_ROLE TO qsrc;
GRANT EXECUTE ON dbms_aq TO qsrc;
GRANT MANAGE SCHEDULER, CREATE PROCEDURE, CREATE ANY JOB TO qsrc;
ALTER USER qsrc QUOTA UNLIMITED ON USERS;
CONNECT qsrc/qsrc@sourcedb
-- Create database link from source to destination database.
CREATE DATABASE LINK QSRC2QDST CONNECT TO QDST IDENTIFIED BY qdst USING 'QDEST_ALIAS';
-- Create object type for Payload
CREATE TYPE TYPE_EVTQ_OBJ AS OBJECT (TASK_ID INTEGER, RUN_ID INTEGER);
/
-- Create queue_table in source schema
BEGIN
Dbms_Aqadm.Create_Queue_Table(
queue_table => 'QSRC.IQT_TEST_SRC',
queue_payload_type => 'QSRC.TYPE_EVTQ_OBJ',
storage_clause => 'TABLESPACE USERS',
multiple_consumers => TRUE,
COMMENT => 'Source Queue Table');
END;
/
-- Create the source queue:
BEGIN
Dbms_Aqadm.Create_Queue(
queue_name => 'IAQ_TEST_SRC',
queue_table => 'IQT_TEST_SRC',
queue_type => sys.dbms_aqadm.normal_queue,
max_retries => 5,
retry_delay => 0,
retention_time => 0,
COMMENT => 'Source Advanced Queue');
END;
/
-- Start the queue:
BEGIN
Dbms_Aqadm.Start_Queue('QSRC.IAQ_TEST_SRC');
END;
/
-- Add a subscriber to source queue:
BEGIN
Dbms_Aqadm.Add_Subscriber
(
queue_name => 'QSRC.IAQ_TEST_SRC',
subscriber => sys.aq$_agent(NAME => 'IQS_TEST_SRC', address => 'QDST.IAQ_TEST_DST@QSRC2QDST', protocol => NULL),
queue_to_queue => TRUE
);
END;
/
Test
Enqueue
DECLARE
rc BINARY_INTEGER;
nq_opt dbms_aq.enqueue_options_t;
nq_pro dbms_aq.message_properties_t;
datas TYPE_EVTQ_OBJ;
msgid RAW(16);
BEGIN
nq_opt.visibility := dbms_aq.immediate;
nq_pro.expiration := dbms_aq.never;
datas := TYPE_EVTQ_OBJ(&TASK_ID, &Run_ID);
dbms_aq.enqueue('IAQ_TEST_SRC', nq_opt, nq_pro, datas, msgid);
END;
/
TASK_ID
=
3111
RUN_ID
=
2
Message is successfully enqueued in the local source queue...
SELECT * FROM qsrc.aq$iqt_test_src -- I removed Null columns
QUEUE MSG_ID MSG_PRIORITY MSG_STATE ENQ_TIME ENQ_TIMESTAMP ENQ_USER_ID ENQ_TXN_ID DEQ_TIME DEQ_TIMESTAMP DEQ_USER_ID DEQ_TXN_ID USER_DATA.JOB_ID USER_DATA.RUN_ID PROPAGATED_MSGID SENDER_NAME SENDER_ADDRESS SENDER_PROTOCOL
IAQ_TEST_SRC C053F2DBB0004379E0530100007FD219 1 PROCESSED 19/04/2021 16:56:21 19-APR-21 04.56.21.727493 PM QSRC 1.15.4516 19/04/2021 16:56:21 19-APR-21 04.56.21.820362000 PM QSRC 7.3.4242 3111 2 C0565EFB122C6D70E0530100007F2426 IQS_TEST_SRC "QDST"."IAQ_TEST_DST"@QSRC2QDST 0
... and propagated to the remote destination queue
SELECT * FROM qdst.aq$iqt_test_dst -- I removed Null columns
QUEUE MSG_ID MSG_PRIORITY MSG_STATE ENQ_TIME ENQ_TIMESTAMP ENQ_USER_ID ENQ_TXN_ID USER_DATA.JOB_ID USER_DATA.RUN_ID SENDER_ADDRESS SENDER_PROTOCOL ORIGINAL_MSGID CONSUMER_NAME
IAQ_TEST_DST C0565EFB122C6D70E0530100007F2426 1 READY 19/04/2021 16:56:21 19-APR-21 04.56.21.819351 PM QDST 7.21.3825 3111 2 "QSRC"."IAQ_TEST_SRC"@UDRDATA 4 C053F2DBB0004379E0530100007FD219 IQS_TEST_SRC
Dequeue
DECLARE
dq_opt dbms_aq.dequeue_options_t;
dq_prop dbms_aq.message_properties_t;
datas TYPE_EVTQ_OBJ;
msg_id RAW(16);
BEGIN
dq_opt.consumer_name := 'IQS_TEST_SRC';
dq_opt.dequeue_mode := dbms_aq.remove;
dq_opt.navigation := dbms_aq.next_message;
dq_opt.wait := dbms_aq.no_wait;
dbms_aq.dequeue(
queue_name => 'IAQ_TEST_DST'
,dequeue_options => dq_opt
,message_properties => dq_prop
,payload => datas
,msgid => msg_id);
dbms_output.put_line('Message dequeued='||datas.task_id||'.'||datas.run_id);
END;
/
Message dequeued
=
3111.2
PL
/
SQL procedure successfully completed
SQL> commit;
Commit complete
Putting things together:
To have an end-to-end test, I combined the 2 tests in this way (changes only on destination database):
-- Create table and sequence to be used in the job
CREATE TABLE QDST.SCHD_DST (
ID NUMBER(10) NOT NULL,
CREATED_DATE DATE NOT NULL,
CONSTRAINT SCHD_DST_PK PRIMARY KEY (ID)
);
CREATE SEQUENCE QDST.SCHD_DST_SEQ;
-- Create the event-based job
BEGIN
DBMS_SCHEDULER.create_job (
job_name => 'QDST_EVT_JOB',
job_type => 'PLSQL_BLOCK',
job_action => 'BEGIN
INSERT INTO QDST.SCHD_DST (id, created_date)
VALUES (QDST.SCHD_DST_SEQ.NEXTVAL, SYSDATE);
COMMIT;
END;',
start_date => SYSTIMESTAMP,
event_condition => 'tab.user_data.task_id = 3111', --actually, we don't need an event condition since we want the job to run for any message, but I kept it to stick to the unitary test
queue_spec => 'QDST.IAQ_TEST_DST',
enabled => false);
dbms_scheduler.set_attribute('QDST_EVT_JOB', 'parallel_instances', TRUE);
dbms_scheduler.enable('QDST_EVT_JOB');
END;
/
Note: I read somewhere that event-based job must use a program or a stored procedure but in the unitary test I 've done it worked this way; more I did tested it with a scheduler program but things did not change
I tried to add the Subscriber name since the consumer_name = IQS_TEST_SRC in the destination queue table (no error raised, still not sure of the syntax: anyway, it did not work)
BEGIN
dbms_scheduler.set_attribute('QDST_EVT_JOB', 'event_spec',
'tab.user_data.task_id = 3111', 'QDST.IAQ_TEST_DST, IQS_TEST_SRC');
END;
/
I kept checking dba_scheduler_job_run_details for a job log entry but in vain.
If any one can help me with a solution or even a clue for troubleshooting I'd be thankfull.
Regards,