Skip to Main Content

ODP.NET

Announcement

For appeals, questions and feedback about Oracle Forums, please email oracle-forums-moderators_us@oracle.com. Technical questions should be asked in the appropriate category. Thank you!

Interested in getting your voice heard by members of the Developer Marketing team at Oracle? Check out this post for AppDev or this post for AI focus group information.

How to consume Oracle AQ using .NET Core?

Leo FerreiraNov 26 2024

Im having a bit of difficulty in creating worker service that will continuously read a AQ…

my expectation is:

  1. Open Connection
  2. Create a Queue Object
  3. Hookup the “myqueue.MessageAvaiable” event
  4. Call “myqueue.Listen()”
  5. Process stuff on the code of item 3, and if everything goes well, “commit” the message (commit in the terms of message queues, not commit as in "commit transaction") like “myqueue.CommitMessage(messageReceived)” or “messageReceived.Commit()”

And i hope to have like 10-15 parallel consumers here because this queue will be very volumetric…

Problem is: its not working…

I'm using Oracle.ManagedDataAccess.Core" Version="23.6.1" and my code is

using Oracle.ManagedDataAccess.Client; 
using Oracle.ManagedDataAccess.Types; 
using System; 

namespace OracleAQReader; 

class Program 
{ 
static void Main() 
{ 
string connectionString = myConnectionString; 
using (OracleConnection connection = new OracleConnection(connectionString)) 
{ 
connection.Open(); 

OracleAQQueue queue = new OracleAQQueue("BR_AQADM.AQ_TRANSACTIONNOTIFICATION", connection, OracleAQMessageType.Udt, "BR_AQADM.TJSONRESPONSE") 
{ 
NotificationConsumers = ["TRANSACTIONSSUBSCRIBER"], 
DequeueOptions = new OracleAQDequeueOptions 
{ 
Wait = 0, 
ConsumerName = "TRANSACTIONSSUBSCRIBER", 
NavigationMode = OracleAQNavigationMode.NextMessage, 
DequeueMode = OracleAQDequeueMode.Remove 
} 
}; 

Console.WriteLine("Connection State:"+queue.Connection.State); 

queue.MessageAvailable += OnMessageAvailable; 

var msg = queue.Dequeue(); 
//THIS WORKS!, so yes, the queue is correctly connected, i think...
Console.WriteLine("msg:"+msg.Payload.ToString()); 

// Start listening for messages 
Console.WriteLine(queue.Listen(new[] { "TRANSACTIONSSUBSCRIBER" })); 
Console.WriteLine("Listening for messages..."); 

// Keep the application running 
Console.ReadLine(); 

} 
} 

static void OnMessageAvailable(object sender, OracleAQMessageAvailableEventArgs e) 
{ 
//we never get here...
OracleAQQueue queue = (OracleAQQueue)sender; 
OracleAQMessage message = queue.Dequeue(); 
byte[] payload = message.Payload as byte[]; 
string messageContent = System.Text.Encoding.UTF8.GetString(payload); 
Console.WriteLine($"Received message: {messageContent}"); 
} 
} 

public class TJsonResponse : IOracleCustomType, INullable 
{ 
[OracleObjectMapping("RESPONSE")] 
public string Response { get; set; } 

public bool IsNull { get; set; } 

public IOracleCustomType CreateObject() 
{ 
return new TJsonResponse(); 
} 

public void FromCustomObject(OracleConnection con, object udt) 
{ 
OracleUdt.SetValue(con, udt, "RESPONSE", Response); 
} 

public void ToCustomObject(OracleConnection con, object udt) 
{ 
Response = (string)OracleUdt.GetValue(con, udt, "RESPONSE"); 
} 
} 

[OracleCustomTypeMappingAttribute("BR_AQADM.TJSONRESPONSE")] 
public class TJsonResponseTypeFactory : IOracleCustomTypeFactory 
{ 
public IOracleCustomType CreateObject() 
{ 
return new TJsonResponse(); 
} 
}
This post has been answered by Alex Keh-Oracle on Nov 28 2024
Jump to Answer
Comments
Post Details
Added on Nov 26 2024
6 comments
107 views