Skip to Main Content

Database Software

Announcement

For appeals, questions and feedback about Oracle Forums, please email oracle-forums-moderators_us@oracle.com. Technical questions should be asked in the appropriate category. Thank you!

Messages rolled back blocking new dequeues

878269Jul 26 2011 — edited Nov 4 2011
Oracle Database 10g Enterprise Edition Release 10.2.0.4.0 - 64bit Production

I have an application that interacts with external systems, these external systems can have transient failures and can take a significant amount of time. As such I have made these interaction asynchronous and am doing retries all using Oracle AQ. The problem is, when I have a failure and rollback the dequeue transaction it seems to roll back ok and it increments the retry_count, but that process seems to wait to immediately dequeue the message again but can't because of the retry delay. So it sits there and waits, and waits until the retry delay is reached, then calls the subscriber.
Here is a stripped down version of what I am doing,
First block (between horizontal lines) is setup,
second block is inserts to trigger trigger,
third block is select statement to show processes just sitting there (my typical result is 5 processors so the 5 "bad records".
last block is cleanup ddl

I would like to know how to get the processors to ignore the rolled back messages until after the retry delay has elapsed. It seems there is something but I am unable to find exact documentation on the issue. Closest I can come is in the Oracle Documentation here which says "A bad condition can cause the transaction receiving a message to end. Oracle Streams AQ allows users to hide the bad message for a specified retry delay interval, during which it is in the WAITING state. After the retry delay, the failed message is again available for dequeue."
My observation is the messages are always in READY state.

Please help, I have no idea how to get the processors to actually ignore the bad messages.
Thanks for your time.

-----
h1. Setup database
--Basic data table assume INSERTS and SELECTS only
CREATE TABLE DT_TBL(
ID_NO NUMBER(10) PRIMARY KEY,
DT_VAL VARCHAR2(3) NOT NULL
)
/

--Function simulating transient external failure (I know this is not truly transient failure but it illustrates the concept)
CREATE OR REPLACE FUNCTION DT_FUNC(dt_record DT_TBL%ROWTYPE)
RETURN VARCHAR2
IS
result_val VARCHAR2(12);
BEGIN
CASE dt_record.DT_VAL
WHEN 'ADD' THEN result_val := 'successful';
WHEN 'ERR' THEN result_val := 'unsuccessful';
END CASE;
RETURN result_val;
EXCEPTION -- exception handlers begin
WHEN CASE_NOT_FOUND THEN
RETURN 'unsuccessful';
END;
/

-- Message type for advanced queues
CREATE OR REPLACE TYPE dt_id_type AS OBJECT (dt_id NUMBER(10))
/

BEGIN
-- Create queue-table
DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table => 'DT_Q_TBL',
queue_payload_type => 'dt_id_type',
multiple_consumers => TRUE
);
-- Create queue this sets redelivery parameters too
DBMS_AQADM.CREATE_QUEUE (
queue_name => 'DT_Q',
queue_table => 'DT_Q_TBL',
max_retries => 5,
retry_delay => 90
);
-- Start the queue and enable sending and recieving messages.
DBMS_AQADM.START_QUEUE(
queue_name =>'DT_Q',
enqueue => TRUE,
dequeue => TRUE
);
-- Start the default error queue and enable removing messages
DBMS_AQADM.START_QUEUE(
queue_name => 'AQ$_DT_Q_TBL_E',
enqueue => FALSE,
dequeue => TRUE
);
END;
/

--This procedure consumes messages off queue.
CREATE OR REPLACE PROCEDURE DT_Q_CONSUMER(
context RAW,
reginfo SYS.AQ$_REG_INFO,
descr SYS.AQ$_DESCRIPTOR,
payload RAW,
payloadl NUMBER
)
AS
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW(16);
o_payload dt_id_type;
dt_record DT_TBL%ROWTYPE;
BEGIN
-- dequeue by message id.
dequeue_options.msgid := descr.msg_id;
dequeue_options.consumer_name := descr.consumer_name;
DBMS_AQ.DEQUEUE(
queue_name => descr.queue_name,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => o_payload,
msgid => message_handle
);
-- get record from data table by primary key
SELECT * INTO dt_record FROM DT_TBL WHERE ID_NO=o_payload.dt_id;

-- call function subject to external transient failures
IF DT_FUNC(dt_record) = 'unsuccessful' THEN
-- there was a problem, raise an error so transaction rolls back
RAISE_APPLICATION_ERROR (-20001,'Problem with external resource, I will try again later.');
END IF;
END;
/

-- hook the consumer procedure defined above up to the queue so it automatically gets messages
DECLARE
v_agent SYS.AQ$_AGENT;
v_reg_info SYS.AQ$_REG_INFO;
BEGIN
-- create an agent for our, name is arbitrary
v_agent := SYS.AQ$_AGENT ('DT_Q_SUB', NULL, NULL);
DBMS_AQADM.ADD_SUBSCRIBER (queue_name =>'DT_Q',
subscriber => v_agent );
-- create registration information
v_reg_info := SYS.AQ$_REG_INFO('DT_Q:DT_Q_SUB',
DBMS_AQ.NAMESPACE_AQ,
'plsql://DT_Q_CONSUMER',
HEXTORAW('FF')
);
-- register subscriber
DBMS_AQ.REGISTER (SYS.AQ$_REG_INFO_LIST(v_reg_info),1);
END;
/

CREATE OR REPLACE PROCEDURE DT_Q_PRODUCER( dt_id_in in NUMBER )
AS
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW(16);
message dt_id_type;
BEGIN
--create the message
message := dt_id_type(dt_id_in);
--put it on the queue
DBMS_AQ.ENQUEUE (
queue_name => 'DT_Q',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
);
END;
/

CREATE OR REPLACE TRIGGER DT_TBL_TRIGG
AFTER INSERT ON DT_TBL for each row
BEGIN
-- call procedure to put PK of inserted row on queue
DT_Q_PRODUCER(:NEW.ID_NO);
EXCEPTION
WHEN OTHERS THEN -- handles all errors no this trigger can never raise an error unless the DBMS_OUTPUT.PUT_LINE here fails
DBMS_OUTPUT.PUT_LINE('Failed to queue message. DT_ID='|| :NEW.ID_NO ||' Error code ' || SQLCODE || ': ' || SUBSTR(SQLERRM, 1 , 128));
END;
/
-----
h1. Insert data to trigger trigger
insert into DT_TBL (ID_NO, DT_VAL) values ( 0, 'ADD');
insert into DT_TBL (ID_NO, DT_VAL) values ( 1, 'ADD');
insert into DT_TBL (ID_NO, DT_VAL) values ( 2, 'ADD');
insert into DT_TBL (ID_NO, DT_VAL) values ( 3, 'ADD');
insert into DT_TBL (ID_NO, DT_VAL) values ( 4, 'ADD');
-- now put "bad" messages into the table
insert into DT_TBL (ID_NO, DT_VAL) values ( 5, 'ERR');
insert into DT_TBL (ID_NO, DT_VAL) values ( 6, 'ERR');
insert into DT_TBL (ID_NO, DT_VAL) values ( 7, 'ERR');
insert into DT_TBL (ID_NO, DT_VAL) values ( 8, 'ERR');
insert into DT_TBL (ID_NO, DT_VAL) values ( 9, 'ERR');
-- now "good" ones again
insert into DT_TBL (ID_NO, DT_VAL) values ( 10, 'ADD');
insert into DT_TBL (ID_NO, DT_VAL) values ( 11, 'ADD');
insert into DT_TBL (ID_NO, DT_VAL) values ( 12, 'ADD');
insert into DT_TBL (ID_NO, DT_VAL) values ( 13, 'ADD');
insert into DT_TBL (ID_NO, DT_VAL) values ( 14, 'ADD');
insert into DT_TBL (ID_NO, DT_VAL) values ( 15, 'ADD');
insert into DT_TBL (ID_NO, DT_VAL) values ( 16, 'ADD');
insert into DT_TBL (ID_NO, DT_VAL) values ( 17, 'ADD');
insert into DT_TBL (ID_NO, DT_VAL) values ( 18, 'ADD');
insert into DT_TBL (ID_NO, DT_VAL) values ( 19, 'ADD');
COMMIT;

-----
h1. Display current message states run repeatedly to watch retry count increment
SELECT QUEUE, MSG_STATE, DELAY, RETRY_COUNT FROM AQ$DT_Q_TBL

-----
h1. cleanup
BEGIN
-- clean up previously created artifacts
DBMS_AQADM.STOP_QUEUE(
queue_name => 'DT_Q'
);

DBMS_AQADM.DROP_QUEUE(
queue_name => 'DT_Q'
);

DBMS_AQADM.DROP_QUEUE_TABLE(
queue_table => 'DT_Q_TBL'
);
END;
/
DROP TRIGGER DT_TBL_TRIGG
/
DROP PROCEDURE DT_Q_PRODUCER
/
DROP PROCEDURE DT_Q_CONSUMER
/
DROP FUNCTION DT_FUNC
/
DROP TYPE dt_id_type
/
DROP TABLE DT_TBL;
Comments
Locked Post
New comments cannot be posted to this locked post.
Post Details
Locked on Dec 2 2011
Added on Jul 26 2011
13 comments
1,372 views