Skip to Main Content

Integration

Announcement

For appeals, questions and feedback about Oracle Forums, please email oracle-forums-moderators_us@oracle.com. Technical questions should be asked in the appropriate category. Thank you!

JMS Message Queue - stuck in

Stuart FlemingAug 23 2013

I am on SOA SUITE 11.1.17

I have a jms queue set up and am able to successfully send messages to it using the SOA JMS Adapter.  I am able to succesfully listen to a queue using a JMS adapter in a different project.

However, here when I try to "listen" to the queue using a java program, the message is "received" in my java program and processed as it should be.  The java program is running in a while loop, so it stays running.

On the queue, when the java program reads it, the message is moved to the "Messages Pending" column.  When I kill the java program, the message is returned to the "Messages Current"

Would anyone have any ideas about this?  We have an existing java based program isn't properly "closing" the message.

I tried:

1.  placing msg.acknowledge() in the code, since the documentation states that messages that are in "Messages Pending" have been read, but not acknowledged or committed -- but this does nothing.

2.  Setting the expiration on the message to 1, but that is a hack.  I would rather fix the problem than use a hack.

Anyone have any ideas what is wrong?

Thank you,

Stuart

import java.io.IOException;

import java.io.InputStreamReader;

import javax.jms.Connection;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.MessageListener;

import javax.jms.Session;

import javax.jms.TextMessage;

public class JMSListenAndRespond extends JMSQueueHandler implements MessageListener {

  private Session jmsSession = null;

  private MessageConsumer jmsMessageConsumer = null;

  private Connection jmsConnection = null;

  int counter = 1 ;

  public static void main(String[] args) throws Exception {

    InputStreamReader commandBuffer = null;

    char command = '\0';

      // first listen for any messages

      JmsDataObject q = new JmsDataObject();

      q.setJmsCF("jms/TestConnectionFactory");

      q.setJmsQueue("jms/TestJMSQueue");

      q.setServerPwd("welcome1");

      q.setServerLogin("weblogic");

      q.setUrl("t3://localhost:7101");

      q.setJmsMsg("na -- we are listinging...");

   

    JMSListenAndRespond lis = new JMSListenAndRespond();

     lis.consumeMessage(q);

    System.out.println("(type q to exit at any time...)\n");

    try {

      // Now just loop, waiting for messages until user types 'q' to quit

      commandBuffer = new InputStreamReader(System.in);

    while (!(command == 'q')) {

       try {

          command = (char)commandBuffer.read();

          System.out.println("Command: "+command);

          lis.stopListening();

        } catch (IOException e) {

          System.err.println("I/O Exception: ");

          e.printStackTrace();

       }

       } //while

    } finally {

            lis.stopListening();

        }

      lis.stopListening();

  }

   public void stopListening() {

    try {

      if (jmsMessageConsumer != null) {

        jmsMessageConsumer.close();

      }

      if (jmsSession != null) {

        jmsSession.close();

      }

      if (jmsConnection != null) {

        jmsConnection.close();

      }

    } catch (JMSException e) {

      System.err.println("Couldn't properly close all resources.\n");

      e.printStackTrace();

    }

  }

  private void prepareToConsumeMessage() {

    // create JMS connection, session, consumer and register listener - then start session

        try {

      // create connection

      jmsConnection = getJmsConnectionFactory().createConnection();

      // create session

      jmsSession =   jmsConnection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);

      jmsMessageConsumer = jmsSession.createConsumer(getJmsDestination());

      // register a message listener (onMessage)

      jmsMessageConsumer.setMessageListener(this);

     

      // initiate connection - make sure to register your listener first or you might lose messages

      jmsConnection.start();

      System.out.println("\n...listening for message!\n");

    } catch (JMSException e) {

      System.err.println("Problem initializing JMS session");

      e.printStackTrace();

      System.exit(-1);

    }

  }

  /*-----------------------------------------------------------------------

     * onMessage

     * The callback function invoked upon message reception

     *----------------------------------------------------------------------*/

  public void onMessage(Message msg) {

    try {

      String msgText;

      if (msg instanceof TextMessage) {

        msgText = ((TextMessage)msg).getText();

        // this method expects a CSV string with the following makeup:

        // firstName,lastName,gender,birthdate,address,city,zipcode,state,insurancePolicyId

        String messageId = msg.getJMSMessageID();

       

      // code to process goes here.....

       

      } else {

        // if message is not a TextMessage, attempt a conversion to string

        System.out.println("\n** NEW MESSAGE (" + msg.getClass().getName() +

                           "):\n" +

            msg.toString());

      }

    } catch (JMSException e) {

      e.printStackTrace();

    }

  }

  public void consumeMessage(JmsDataObject q ) {

    initializeJndiProperties(q);

    initializeJNDIandCF(q.getJmsQueue());

    prepareToConsumeMessage();

  }

}

/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

import java.util.Properties;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.naming.InitialContext;

import javax.naming.NamingException;

public abstract class JMSQueueHandler {

   

  protected void initializeJNDIandCF(String destinationQueueName) {

    // initialize JNDI context and lookup connection factory and destination

    try {

      // Initialize JNDI context

      jndiContext = new InitialContext(jndiProperties);

    } catch (NamingException e) {

      e.printStackTrace();

    }

    try {

      // lookup JMS connection factory in JNDI

      jmsConnectionFactory =

          (ConnectionFactory)jndiContext.lookup(jndiProperties.getProperty("factory.name"));

      // lookup JMS destination in JNDI

      jmsDestination = (Destination)jndiContext.lookup(destinationQueueName);

    } catch (NamingException e) {

      System.err.println("Problem during the JNDI lookup\n");

      e.printStackTrace();

      System.exit(-1);

    }

  }

  public void initializeJndiProperties(JmsDataObject qdo ) {

    // set JNDI properties

    jndiProperties.put("java.naming.factory.initial","weblogic.jndi.WLInitialContextFactory");

    jndiProperties.put("java.naming.provider.url", qdo.getUrl() );

   // jndiProperties.put("java.naming.security.principal", NAMING_PRINCIPAL);

    jndiProperties.put("java.naming.security.principal", qdo.getServerLogin() ) ;

   

    //  jndiProperties.put("java.naming.security.credentials", NAMING_CREDENTIAL);

    jndiProperties.put("java.naming.security.credentials", qdo.getServerPwd() );

    //jndiProperties.put("factory.name", JMS_CONNECTION_FACTORY);

    jndiProperties.put("factory.name", qdo.getJmsCF() );

  }

 

  //protected static final String NAMING_PROVIDER_URL = "t3://localhost:7101";

//protected static final String NAMING_PRINCIPAL = "weblogic";

// protected static final String NAMING_CREDENTIAL = "welcome1";

// protected static final String JMS_CONNECTION_FACTORY = "jms/patientsJmsCF";

// protected static final String JMS_CONNECTION_FACTORY = "jms/TestConnectionFactory";

  ConnectionFactory jmsConnectionFactory = null;

  Destination jmsDestination = null;

  InitialContext jndiContext = null;

  Properties jndiProperties = new Properties();

  public void setJmsConnectionFactory(ConnectionFactory jmsConnectionFactory) {

    this.jmsConnectionFactory = jmsConnectionFactory;

  }

  public ConnectionFactory getJmsConnectionFactory() {

    return jmsConnectionFactory;

  }

  public void setJmsDestination(Destination jmsDestination) {

    this.jmsDestination = jmsDestination;

  }

  public Destination getJmsDestination() {

    return jmsDestination;

  }

}

Comments
Locked Post
New comments cannot be posted to this locked post.
Post Details
Locked on Sep 20 2013
Added on Aug 23 2013
0 comments
996 views