Hi,
I am new to Oracle AQs. I am in the process of setting up a basic AQ example to work. I am using the Oracle AQ PL/SQL call back notification to dequeue messages. Below is the example that I am trying to make it work. I want to mention the below code was working fine until couple of days back when there was a deadlock while dequeue and the DBA tried killing the session, the slave jobs kept coming back again and again, so the DBA changed the job_queue_processes = 0, then changed it back to job_queue_processes = 4000. But nothing seem to have worked after this happened. Can someone please help me if you have knowledge on this.? I tried all the different examples which I saw on the web.
Current params:
job_queue_processes - 4000
aq_tm_processes - 1
begin
2 DBMS_AQADM.CREATE_QUEUE_TABLE
3 ( queue_table => 'aq.objmsgs80_qtab',
4 queue_payload_type => 'aq.Message_typ',
5 multiple_consumers => TRUE );
6
7 DBMS_AQADM.CREATE_QUEUE
8 ( queue_name => 'MSG_QUEUE',
9 queue_table => 'aq.objmsgs80_qtab');
10
11 DBMS_AQADM.START_QUEUE
12 ( queue_name => 'MSG_QUEUE');
13 end;
14 /
create procedure enqueue_msg( p_msg in varchar2 )
2 as
3 enqueue_options dbms_aq.enqueue_options_t;
4 message_properties dbms_aq.message_properties_t;
5 message_handle RAW(16);
6 message aq.message_typ;
7 BEGIN
8 message := message_typ('NORMAL MESSAGE', p_msg );
9 dbms_aq.enqueue(queue_name => 'msg_queue',
10 enqueue_options => enqueue_options,
11 message_properties => message_properties,
12 payload => message,
13 msgid => message_handle);
14 end;
create table message_table( msg varchar2(4000) );
create or replace procedure notifyCB( context raw,
2 reginfo sys.aq$_reg_info,
3 descr sys.aq$_descriptor,
4 payload raw,
5 payloadl number)
6 as
7 dequeue_options dbms_aq.dequeue_options_t;
8 message_properties dbms_aq.message_properties_t;
9 message_handle RAW(16);
10 message aq.message_typ;
11 BEGIN
12 dequeue_options.msgid := descr.msg_id;
13 dequeue_options.consumer_name := descr.consumer_name;
14 DBMS_AQ.DEQUEUE(queue_name => descr.queue_name,
15 dequeue_options => dequeue_options,
16 message_properties => message_properties,
17 payload => message,
18 msgid => message_handle);
19 insert into message_table values
20 ( 'Dequeued and processed "' || message.text || '"' );
21 COMMIT;
22 END;
23 /
15 /
begin
2 dbms_aqadm.add_subscriber
3 ( queue_name => 'aq.msg_queue',
4 subscriber => sys.aq$_agent( 'recipient', null, null ) );
5 end;
BEGIN
2 dbms_aq.register
3 ( sys.aq$_reg_info_list(
4 sys.aq$_reg_info('AQ.MSG_QUEUE:RECIPIENT',
5 DBMS_AQ.NAMESPACE_AQ,
6 'plsql://AQ.notifyCB',
7 HEXTORAW('FF')) ) ,
8 1 );
9 end;
select * from message_table;
no rows selected
exec enqueue_msg( 'This is a test....' );
commit;
Commit complete.
select * from message_table;
no rows selected