I am on SOA SUITE 11.1.17
I have a jms queue set up and am able to successfully send messages to it using the SOA JMS Adapter. I am able to succesfully listen to a queue using a JMS adapter in a different project.
However, here when I try to "listen" to the queue using a java program, the message is "received" in my java program and processed as it should be. The java program is running in a while loop, so it stays running.
On the queue, when the java program reads it, the message is moved to the "Messages Pending" column. When I kill the java program, the message is returned to the "Messages Current"
Would anyone have any ideas about this? We have an existing java based program isn't properly "closing" the message.
I tried:
1. placing msg.acknowledge() in the code, since the documentation states that messages that are in "Messages Pending" have been read, but not acknowledged or committed -- but this does nothing.
2. Setting the expiration on the message to 1, but that is a hack. I would rather fix the problem than use a hack.
Anyone have any ideas what is wrong?
Thank you,
Stuart
import java.io.IOException;
import java.io.InputStreamReader;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
public class JMSListenAndRespond extends JMSQueueHandler implements MessageListener {
private Session jmsSession = null;
private MessageConsumer jmsMessageConsumer = null;
private Connection jmsConnection = null;
int counter = 1 ;
public static void main(String[] args) throws Exception {
InputStreamReader commandBuffer = null;
char command = '\0';
// first listen for any messages
JmsDataObject q = new JmsDataObject();
q.setJmsCF("jms/TestConnectionFactory");
q.setJmsQueue("jms/TestJMSQueue");
q.setServerPwd("welcome1");
q.setServerLogin("weblogic");
q.setUrl("t3://localhost:7101");
q.setJmsMsg("na -- we are listinging...");
JMSListenAndRespond lis = new JMSListenAndRespond();
lis.consumeMessage(q);
System.out.println("(type q to exit at any time...)\n");
try {
// Now just loop, waiting for messages until user types 'q' to quit
commandBuffer = new InputStreamReader(System.in);
while (!(command == 'q')) {
try {
command = (char)commandBuffer.read();
System.out.println("Command: "+command);
lis.stopListening();
} catch (IOException e) {
System.err.println("I/O Exception: ");
e.printStackTrace();
}
} //while
} finally {
lis.stopListening();
}
lis.stopListening();
}
public void stopListening() {
try {
if (jmsMessageConsumer != null) {
jmsMessageConsumer.close();
}
if (jmsSession != null) {
jmsSession.close();
}
if (jmsConnection != null) {
jmsConnection.close();
}
} catch (JMSException e) {
System.err.println("Couldn't properly close all resources.\n");
e.printStackTrace();
}
}
private void prepareToConsumeMessage() {
// create JMS connection, session, consumer and register listener - then start session
try {
// create connection
jmsConnection = getJmsConnectionFactory().createConnection();
// create session
jmsSession = jmsConnection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
jmsMessageConsumer = jmsSession.createConsumer(getJmsDestination());
// register a message listener (onMessage)
jmsMessageConsumer.setMessageListener(this);
// initiate connection - make sure to register your listener first or you might lose messages
jmsConnection.start();
System.out.println("\n...listening for message!\n");
} catch (JMSException e) {
System.err.println("Problem initializing JMS session");
e.printStackTrace();
System.exit(-1);
}
}
/*-----------------------------------------------------------------------
* onMessage
* The callback function invoked upon message reception
*----------------------------------------------------------------------*/
public void onMessage(Message msg) {
try {
String msgText;
if (msg instanceof TextMessage) {
msgText = ((TextMessage)msg).getText();
// this method expects a CSV string with the following makeup:
// firstName,lastName,gender,birthdate,address,city,zipcode,state,insurancePolicyId
String messageId = msg.getJMSMessageID();
// code to process goes here.....
} else {
// if message is not a TextMessage, attempt a conversion to string
System.out.println("\n** NEW MESSAGE (" + msg.getClass().getName() +
"):\n" +
msg.toString());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
public void consumeMessage(JmsDataObject q ) {
initializeJndiProperties(q);
initializeJNDIandCF(q.getJmsQueue());
prepareToConsumeMessage();
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public abstract class JMSQueueHandler {
protected void initializeJNDIandCF(String destinationQueueName) {
// initialize JNDI context and lookup connection factory and destination
try {
// Initialize JNDI context
jndiContext = new InitialContext(jndiProperties);
} catch (NamingException e) {
e.printStackTrace();
}
try {
// lookup JMS connection factory in JNDI
jmsConnectionFactory =
(ConnectionFactory)jndiContext.lookup(jndiProperties.getProperty("factory.name"));
// lookup JMS destination in JNDI
jmsDestination = (Destination)jndiContext.lookup(destinationQueueName);
} catch (NamingException e) {
System.err.println("Problem during the JNDI lookup\n");
e.printStackTrace();
System.exit(-1);
}
}
public void initializeJndiProperties(JmsDataObject qdo ) {
// set JNDI properties
jndiProperties.put("java.naming.factory.initial","weblogic.jndi.WLInitialContextFactory");
jndiProperties.put("java.naming.provider.url", qdo.getUrl() );
// jndiProperties.put("java.naming.security.principal", NAMING_PRINCIPAL);
jndiProperties.put("java.naming.security.principal", qdo.getServerLogin() ) ;
// jndiProperties.put("java.naming.security.credentials", NAMING_CREDENTIAL);
jndiProperties.put("java.naming.security.credentials", qdo.getServerPwd() );
//jndiProperties.put("factory.name", JMS_CONNECTION_FACTORY);
jndiProperties.put("factory.name", qdo.getJmsCF() );
}
//protected static final String NAMING_PROVIDER_URL = "t3://localhost:7101";
//protected static final String NAMING_PRINCIPAL = "weblogic";
// protected static final String NAMING_CREDENTIAL = "welcome1";
// protected static final String JMS_CONNECTION_FACTORY = "jms/patientsJmsCF";
// protected static final String JMS_CONNECTION_FACTORY = "jms/TestConnectionFactory";
ConnectionFactory jmsConnectionFactory = null;
Destination jmsDestination = null;
InitialContext jndiContext = null;
Properties jndiProperties = new Properties();
public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory) {
this.jmsConnectionFactory = jmsConnectionFactory;
}
public ConnectionFactory getJmsConnectionFactory() {
return jmsConnectionFactory;
}
public void setJmsDestination(Destination jmsDestination) {
this.jmsDestination = jmsDestination;
}
public Destination getJmsDestination() {
return jmsDestination;
}
}