Im having a bit of difficulty in creating worker service that will continuously read a AQ…
my expectation is:
- Open Connection
- Create a Queue Object
- Hookup the “myqueue.MessageAvaiable” event
- Call “myqueue.Listen()”
- Process stuff on the code of item 3, and if everything goes well, “commit” the message (commit in the terms of message queues, not commit as in "commit transaction") like “myqueue.CommitMessage(messageReceived)” or “messageReceived.Commit()”
And i hope to have like 10-15 parallel consumers here because this queue will be very volumetric…
Problem is: its not working…
I'm using Oracle.ManagedDataAccess.Core" Version="23.6.1" and my code is
using Oracle.ManagedDataAccess.Client;
using Oracle.ManagedDataAccess.Types;
using System;
namespace OracleAQReader;
class Program
{
static void Main()
{
string connectionString = myConnectionString;
using (OracleConnection connection = new OracleConnection(connectionString))
{
connection.Open();
OracleAQQueue queue = new OracleAQQueue("BR_AQADM.AQ_TRANSACTIONNOTIFICATION", connection, OracleAQMessageType.Udt, "BR_AQADM.TJSONRESPONSE")
{
NotificationConsumers = ["TRANSACTIONSSUBSCRIBER"],
DequeueOptions = new OracleAQDequeueOptions
{
Wait = 0,
ConsumerName = "TRANSACTIONSSUBSCRIBER",
NavigationMode = OracleAQNavigationMode.NextMessage,
DequeueMode = OracleAQDequeueMode.Remove
}
};
Console.WriteLine("Connection State:"+queue.Connection.State);
queue.MessageAvailable += OnMessageAvailable;
var msg = queue.Dequeue();
//THIS WORKS!, so yes, the queue is correctly connected, i think...
Console.WriteLine("msg:"+msg.Payload.ToString());
// Start listening for messages
Console.WriteLine(queue.Listen(new[] { "TRANSACTIONSSUBSCRIBER" }));
Console.WriteLine("Listening for messages...");
// Keep the application running
Console.ReadLine();
}
}
static void OnMessageAvailable(object sender, OracleAQMessageAvailableEventArgs e)
{
//we never get here...
OracleAQQueue queue = (OracleAQQueue)sender;
OracleAQMessage message = queue.Dequeue();
byte[] payload = message.Payload as byte[];
string messageContent = System.Text.Encoding.UTF8.GetString(payload);
Console.WriteLine($"Received message: {messageContent}");
}
}
public class TJsonResponse : IOracleCustomType, INullable
{
[OracleObjectMapping("RESPONSE")]
public string Response { get; set; }
public bool IsNull { get; set; }
public IOracleCustomType CreateObject()
{
return new TJsonResponse();
}
public void FromCustomObject(OracleConnection con, object udt)
{
OracleUdt.SetValue(con, udt, "RESPONSE", Response);
}
public void ToCustomObject(OracleConnection con, object udt)
{
Response = (string)OracleUdt.GetValue(con, udt, "RESPONSE");
}
}
[OracleCustomTypeMappingAttribute("BR_AQADM.TJSONRESPONSE")]
public class TJsonResponseTypeFactory : IOracleCustomTypeFactory
{
public IOracleCustomType CreateObject()
{
return new TJsonResponse();
}
}