Skip to Main Content

Database Software

Announcement

For appeals, questions and feedback about Oracle Forums, please email oracle-forums-moderators_us@oracle.com. Technical questions should be asked in the appropriate category. Thank you!

Event-based job based on a destination queue in a queue-to-queue propagation never raises

User_5Y3W5Apr 19 2021

Hi All,
Environment: Oracle 12.2 on RHEL 7.6 (I’m testing on en Express Ed.)
Goal: I want to run an event-based job each time an application running on a remote DB processes a task using AQ (for now, I don’t want to use remote jobs mechanism or file-watcher event jobs).
Setup: A message is enqueued to a source queue IAQ_TEST_SRC and propagated to a destination queue IAQ_TEST_DST on a remote database. Each time a message arrives into the destination queue, a job is raised to execute some tasks.
I successfully setup a unitary test of queue-to-queue propagation between 2 databases
I successfully setup a unitary test of event-based job running each time a message is enqueued in the queue on which the job is defined
But I cannot put things work together: having an event-based job defined on a destination queue of a queue-to-queue propagation run when a message is enqueued.
 
1) Event-based Job Test:
for this test, I used the example provided in Event-Based Jobs
 
2) Propagation Test Setup:
2.1) Setup
-- DESTINATION DATABASE --

CREATE USER qdst IDENTIFIED BY qdst DEFAULT TABLESPACE USERS TEMPORARY TABLESPACE TEMP;
GRANT CONNECT, RESOURCE TO qdst;
GRANT AQ_ADMINISTRATOR_ROLE TO qdst;
GRANT EXECUTE ON dbms_aq TO qdst;
GRANT MANAGE SCHEDULER, CREATE PROCEDURE, CREATE ANY JOB TO qdst;
ALTER USER QDST QUOTA UNLIMITED ON USERS;

CONNECT qdst/qdst@destdb
-- Create object type for Payload
CREATE TYPE TYPE_EVTQ_OBJ AS OBJECT (TASK_ID INTEGER, RUN_ID INTEGER);
/
-- Create Destination queue table
begin
 sys.dbms_aqadm.create_queue_table(
   queue_table        => 'QDST.IQT_TEST_DST',
   queue_payload_type => 'QDST.TYPE_EVTQ_OBJ',
   multiple_consumers => TRUE,
   storage_clause     => 'TABLESPACE USERS',
   COMMENT            => 'Destination Queue Table');
end;
/
-- Create the destination queue:
begin
 sys.dbms_aqadm.create_queue(
   queue_name     => 'IAQ_TEST_DST',
   queue_table    => 'IQT_TEST_DST',
   queue_type     => sys.dbms_aqadm.normal_queue,
   max_retries    => 5,
   retry_delay    => 0,
   retention_time => 0,
   COMMENT        => 'Destination Advanced Queue');
end;
/
-- Start The Queue:
BEGIN
 Dbms_Aqadm.Start_Queue('QDST.IAQ_TEST_DST');
END;
/

-- SOURCE DATABASE --

CREATE USER qsrc IDENTIFIED BY qsrc DEFAULT TABLESPACE USERS TEMPORARY TABLESPACE TEMP;
GRANT CONNECT, RESOURCE TO qsrc;
GRANT AQ_ADMINISTRATOR_ROLE TO qsrc;
GRANT EXECUTE ON dbms_aq TO qsrc;
GRANT MANAGE SCHEDULER, CREATE PROCEDURE, CREATE ANY JOB TO qsrc;
ALTER USER qsrc QUOTA UNLIMITED ON USERS;

CONNECT qsrc/qsrc@sourcedb

-- Create database link from source to destination database.
CREATE DATABASE LINK QSRC2QDST CONNECT TO QDST IDENTIFIED BY qdst USING 'QDEST_ALIAS';
-- Create object type for Payload
CREATE TYPE TYPE_EVTQ_OBJ AS OBJECT (TASK_ID INTEGER, RUN_ID INTEGER);
/
-- Create queue_table in source schema
BEGIN

 Dbms_Aqadm.Create_Queue_Table(
   queue_table        => 'QSRC.IQT_TEST_SRC',
   queue_payload_type => 'QSRC.TYPE_EVTQ_OBJ',
   storage_clause     => 'TABLESPACE USERS',
   multiple_consumers => TRUE,
   COMMENT            => 'Source Queue Table');
END;
/
-- Create the source queue:
BEGIN

  Dbms_Aqadm.Create_Queue(
   queue_name     => 'IAQ_TEST_SRC',
   queue_table    => 'IQT_TEST_SRC',
   queue_type     => sys.dbms_aqadm.normal_queue,
   max_retries    => 5,
   retry_delay    => 0,
   retention_time => 0,
   COMMENT        => 'Source Advanced Queue');

END;
/
-- Start the queue:
BEGIN
 Dbms_Aqadm.Start_Queue('QSRC.IAQ_TEST_SRC');
END;
/
-- Add a subscriber to source queue:
BEGIN
 Dbms_Aqadm.Add_Subscriber
   (
     queue_name     => 'QSRC.IAQ_TEST_SRC',
     subscriber     => sys.aq$_agent(NAME => 'IQS_TEST_SRC', address => 'QDST.IAQ_TEST_DST@QSRC2QDST', protocol => NULL),
     queue_to_queue => TRUE
   );
END;
/

 Test
Enqueue

DECLARE
 rc    BINARY_INTEGER;
 nq_opt dbms_aq.enqueue_options_t;
 nq_pro dbms_aq.message_properties_t;
 datas TYPE_EVTQ_OBJ;
 msgid RAW(16);
BEGIN
 nq_opt.visibility := dbms_aq.immediate;
 nq_pro.expiration := dbms_aq.never;
 datas := TYPE_EVTQ_OBJ(&TASK_ID, &Run_ID);

 dbms_aq.enqueue('IAQ_TEST_SRC', nq_opt, nq_pro, datas, msgid);
END;
/

TASK_ID=3111
RUN_ID=2
Message is successfully enqueued in the local source queue...

SELECT * FROM  qsrc.aq$iqt_test_src -- I removed Null columns

QUEUE	    MSG_ID	             MSG_PRIORITY	MSG_STATE	ENQ_TIME	 ENQ_TIMESTAMP	            ENQ_USER_ID	ENQ_TXN_ID	DEQ_TIME	 DEQ_TIMESTAMP	             DEQ_USER_ID	DEQ_TXN_ID	USER_DATA.JOB_ID	USER_DATA.RUN_ID	PROPAGATED_MSGID	        SENDER_NAME	 SENDER_ADDRESS	        SENDER_PROTOCOL
IAQ_TEST_SRC	C053F2DBB0004379E0530100007FD219	1	      PROCESSED	19/04/2021 16:56:21	19-APR-21 04.56.21.727493 PM	QSRC	   1.15.4516	 19/04/2021 16:56:21	19-APR-21 04.56.21.820362000 PM	QSRC	   7.3.4242		3111	      2	        C0565EFB122C6D70E0530100007F2426	IQS_TEST_SRC	"QDST"."IAQ_TEST_DST"@QSRC2QDST	0

... and propagated to the remote destination queue

SELECT * FROM  qdst.aq$iqt_test_dst -- I removed Null columns
QUEUE	    MSG_ID	             MSG_PRIORITY	 MSG_STATE	ENQ_TIME	 ENQ_TIMESTAMP	            ENQ_USER_ID	ENQ_TXN_ID	USER_DATA.JOB_ID	USER_DATA.RUN_ID	SENDER_ADDRESS	       SENDER_PROTOCOL	ORIGINAL_MSGID	         CONSUMER_NAME
IAQ_TEST_DST	C0565EFB122C6D70E0530100007F2426	1	       READY			19/04/2021 16:56:21	19-APR-21 04.56.21.819351 PM	QDST	   7.21.3825		3111	      2			      "QSRC"."IAQ_TEST_SRC"@UDRDATA	4	       C053F2DBB0004379E0530100007FD219	IQS_TEST_SRC

Dequeue

DECLARE
 dq_opt  dbms_aq.dequeue_options_t;
 dq_prop dbms_aq.message_properties_t;
 datas   TYPE_EVTQ_OBJ;
 msg_id  RAW(16);
BEGIN
 dq_opt.consumer_name := 'IQS_TEST_SRC';
 dq_opt.dequeue_mode := dbms_aq.remove;
 dq_opt.navigation   := dbms_aq.next_message;
 dq_opt.wait         := dbms_aq.no_wait;
 dbms_aq.dequeue(
   queue_name         => 'IAQ_TEST_DST'
   ,dequeue_options   => dq_opt
   ,message_properties => dq_prop
   ,payload           => datas
   ,msgid             => msg_id);

   dbms_output.put_line('Message dequeued='||datas.task_id||'.'||datas.run_id);
END;
/

Message dequeued=3111.2
PL/SQL procedure successfully completed

SQL> commit;

Commit complete
 
Putting things together:
To have an end-to-end test, I combined the 2 tests in this way (changes only on destination database):

-- Create table and sequence to be used in the job 
CREATE TABLE QDST.SCHD_DST (
 ID           NUMBER(10)   NOT NULL,
 CREATED_DATE DATE         NOT NULL,
 CONSTRAINT SCHD_DST_PK PRIMARY KEY (ID)
);


CREATE SEQUENCE QDST.SCHD_DST_SEQ;
-- Create the event-based job
BEGIN
  DBMS_SCHEDULER.create_job (
     job_name       => 'QDST_EVT_JOB',
     job_type       => 'PLSQL_BLOCK',
     job_action     => 'BEGIN
                           INSERT INTO QDST.SCHD_DST (id, created_date)
                           VALUES (QDST.SCHD_DST_SEQ.NEXTVAL, SYSDATE);
                           COMMIT;
                         END;',
     start_date     => SYSTIMESTAMP,
     event_condition => 'tab.user_data.task_id = 3111', --actually, we don't need an event condition since we want the job to run for any message, but I kept it to stick to the unitary test 
     queue_spec     => 'QDST.IAQ_TEST_DST',
     enabled        => false);
  
   dbms_scheduler.set_attribute('QDST_EVT_JOB', 'parallel_instances', TRUE);  
 
 dbms_scheduler.enable('QDST_EVT_JOB');
END;
/

Note: I read somewhere that event-based job must use a program or a stored procedure but in the unitary test I 've done it worked this way; more I did tested it with a scheduler program but things did not change

I tried to add the Subscriber name since the consumer_name = IQS_TEST_SRC in the destination queue table (no error raised, still not sure of the syntax: anyway, it did not work)

BEGIN
 dbms_scheduler.set_attribute('QDST_EVT_JOB', 'event_spec', 
  'tab.user_data.task_id = 3111', 'QDST.IAQ_TEST_DST, IQS_TEST_SRC');
END;
/

I kept checking dba_scheduler_job_run_details for a job log entry but in vain.
 
If any one can help me with a solution or even a clue for troubleshooting I'd be thankfull.

Regards,

Comments
Post Details
Added on Apr 19 2021
0 comments
189 views