Skip to Main Content

Java Development Tools

Announcement

For appeals, questions and feedback about Oracle Forums, please email oracle-forums-moderators_us@oracle.com. Technical questions should be asked in the appropriate category. Thank you!

How I finally achieved Asynchronous UI updates with WebSockets and JMS

wlovettApr 4 2014

Hi All,

I've been searching off and on for weeks on how to keep clients informed about long running asynchronous jobs.  For example, if you're inserting 100,000 rows into a database, the user needs to receive notifications DURING the job processing, not just at the start and end.  Most recommendations revolve around polling, but I prefer something a little more responsive.  Here's how I skinned the cat with WebSockets and JMS using JDeveloper 12.1.2.0.0.

Setting Up JMS

-----------------------

First, visit https://blogs.oracle.com/soaproactive/entry/how_to_create_a_simple and setup your JMS.  We're going to add an additional queue though: TestJMSReturnQueue.

Creating the Messaging Bean

-------------------------------------------

You can make this simple by creating an ADF Fusion Web Application.  We'll just use the "Model" as the Messaging Bean project.  Use the JDev wizard to add a Message Driven Bean.  The defaults should work fine, although you'll want to change the name of the class.  Note that you'll probably have to add the Java EE and AQJMS libraries to your Model project.  Here's the code for the Message Bean

@MessageDriven(mappedName = "jms/TestJMSQueue")

public class SimpleMessageBean implements MessageListener {

    public final static String JMS_FACTORY = "jms/TestConnectionFactory";

    private static final String RETURN_QUEUE = "jms/TestJMSReturnQueue";

    private QueueConnectionFactory qconFactory;

    private QueueConnection qcon;

    private QueueSession qsession;

    private QueueSender qsender;

    private Queue returnQueue;

    boolean isConnected;

    public SimpleMessageBean() {

        isConnected = false;

        try {

            Context ctx = new InitialContext();

            qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);

            qcon = qconFactory.createQueueConnection();

            qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

            returnQueue = (Queue) ctx.lookup(RETURN_QUEUE);

            qsender = qsession.createSender(returnQueue);

            qcon.start();

        } catch (NamingException e) {

            e.printStackTrace();

        } catch (JMSException e) {

            e.printStackTrace();

        }

    }

    @Override

    public void onMessage(Message msg) {

        System.out.println("Received message from web socket");

        System.out.println("Doing some long running work");

        try {

            if (msg instanceof MapMessage) {

                // Create a thread to handle updates

                // Would it be possible to create a pool of threads for updates?

                ExecutorService executor = Executors.newSingleThreadExecutor();

                for(int i = 0; i < 5; i++){

                    Thread.sleep(1500);

                    MapMessage mapMessage = qsession.createMapMessage();

                    mapMessage.setStringProperty("ticket", msg.getStringProperty("ticket") + " @ " + i);

                    mapMessage.setStringProperty("ipAddress", msg.getStringProperty("ipAddress"));

                 

                    // Send an update message

                    // Without a runnable object, all updates would be processed

                    // at once at the end.  Not what we want.

                    executor.execute(new MessageProcessor(mapMessage, qsender));

                }

            } else {

                System.out.println("Message is a " + msg.getClass().getName());

            }

        } catch (JMSException e) {

            e.printStackTrace();

        } catch (Throwable te) {

            te.printStackTrace();

        }

    }

}

Here's the MessageProcess Runnable Class

public class MessageProcessor implements Runnable {

    MapMessage msg;

    QueueSender qsender;

  

    public MessageProcessor(MapMessage msg, QueueSender qsender) {

        this.msg = msg;

        this.qsender = qsender;

    }

    @Override

    public void run() {

        try {

            qsender.send(msg);

        } catch (JMSException e) {

            e.printStackTrace();

        }

    }

}

Setting Up WebSockets

-----------------------------------

WebSockets are really cool and I'm excited they're supported now after initially playing with them in Node.js.  This blog post helped me a good bit setting them up: http://technology.amis.nl/2013/07/30/adf-12c-using-websockets-to-implement-client-to-client-push-in-a-scalable-manner/

Create a WebSocket class in your ViewController project.  The code is below.  You'll also want to add the WebLogic 12.1 API library to your project.

@WebSocket(timeout = -1, pathPatterns = { "/ws/*" })

public class SocketMediator extends WebSocketAdapter implements WebSocketListener, MessageListener {

    private static SocketMediator sm;

    public final static String JMS_FACTORY = "jms/TestConnectionFactory";

    private static final String RETURN_QUEUE = "jms/TestJMSReturnQueue";

    private static final String QUEUE = "jms/TestJMSQueue";

    private QueueConnectionFactory qconFactory;

    private QueueConnection qcon;

    private QueueSession qsession;

    private QueueReceiver qreceiver;

    private QueueSender qsender;

    private javax.jms.Queue queueReturn;

    private javax.jms.Queue queue;

    private boolean isConnected;

  

    public static SocketMediator getSm() {

        return sm;

    }

    public SocketMediator() {

        super();

        sm = this;

        isConnected = false;

        try {

            InitialContext ctx = new InitialContext();

            qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);

            qcon = qconFactory.createQueueConnection();

            qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

            queueReturn = (javax.jms.Queue) ctx.lookup(RETURN_QUEUE);

            qreceiver = qsession.createReceiver(queueReturn);

            qreceiver.setMessageListener(this);

            queue = (javax.jms.Queue) ctx.lookup(QUEUE);

            qsender = qsession.createSender(queue);

            qcon.start();

            isConnected = true;

        } catch (JMSException e) {

            e.printStackTrace();

        } catch (NamingException e) {

            e.printStackTrace();

        }

    }

    @Override

    public void onOpen(WebSocketConnection webSocketConnection) {

        System.out.println("New connection was created from a client " + webSocketConnection.getRemoteAddress());

        super.onOpen(webSocketConnection);

    }

    @Override

    public void onMessage(WebSocketConnection connection, String payload) {

        // Sends message from the browser back to the client.

        System.out.println("Does this ever actually fire?");

    }

    public void startJob(String message, String ipAddress) {

        if(isConnected){

            try {

                // send a JMS message for the long running job

                // adding the IP Address so we can match it with a

                // web socket connection

                MapMessage mapMessage = qsession.createMapMessage();

                mapMessage.setStringProperty("ticket", message);

                mapMessage.setStringProperty("ipAddress", ipAddress);

                System.out.println("    Sending: " + message);

                qsender.send(mapMessage);

            } catch (JMSException e) {

                e.printStackTrace();

            } catch (Throwable te) {

                te.printStackTrace();

            }

        }

    }

    @Override

    public void onClose(WebSocketConnection webSocketConnection, ClosingMessage closingMessage) {

        super.onClose(webSocketConnection, closingMessage);

        System.out.println("Connection was closed from a client " + webSocketConnection.getRemoteAddress());

    }

    @Override

    public void onError(WebSocketConnection webSocketConnection, Throwable throwable) {

        super.onError(webSocketConnection, throwable);

        System.out.println("Something went seriously wrong with client " + webSocketConnection.getRemoteAddress());

    }

    // Subscription for return messages

    @Override

    public void onMessage(javax.jms.Message msg) {

        System.out.println("Received message from bean!");

        try {

            String ticket = msg.getStringProperty("ticket");

            String ipAddress = msg.getStringProperty("ipAddress");

            for(WebSocketConnection conn : getWebSocketContext().getWebSocketConnections()){

                if(conn.getRemoteAddress().equals("/" + ipAddress)){

                    conn.send(ticket);

                }

            }

        } catch (JMSException e) {

            e.printStackTrace();

        } catch (IOException ioe) {

            ioe.printStackTrace();

        }

    }

}

Now let's create a Home.jspx page so we can hookup to this WebSocket class.

<?xml version='1.0' encoding='UTF-8'?>

<jsp:root xmlns:jsp="http://java.sun.com/JSP/Page" version="2.1" xmlns:f="http://java.sun.com/jsf/core"

          xmlns:af="http://xmlns.oracle.com/adf/faces/rich">

    <jsp:directive.page contentType="text/html;charset=UTF-8"/>

    <f:view>

        <af:document title="Home.jspx" id="d1">

            <af:resource type="javascript">

              var wsUri = "ws://" + "127.0.0.1:7101/AsyncWithUpdates" + "/ws";

              var webSocket = new WebSocket(wsUri);

              webSocket.onopen = function (evt) {

                  console.log('js onopen');

              }

              webSocket.onmessage = function (evt) {

                console.log("js message received: " + evt.data);

                var e = AdfPage.PAGE.findComponentByAbsoluteId("ot1");

                e.setValue(evt.data);

              };

            </af:resource>

            <af:form id="f1">

                <af:button text="Start Me Up!" id="btn" actionListener="#{viewScope.HomeBB.onBtn}"/>

                <!-- Make sure clientComponent is set to true so javascript can find component -->

                <af:outputText id="ot1" value="Ready for Updates" inlineStyle="margin-left:10px"

                               clientComponent="true"/>

            </af:form>

        </af:document>

    </f:view>

</jsp:root>

Here's the backing bean that fires our WebSocket jobStart() method

public class HomeBackingBean {

    public HomeBackingBean() {

    }

    public void onBtn(ActionEvent actionEvent) {

        // Generate something unique to pass back and forth between async services

        String time = ((Long)Calendar.getInstance().getTimeInMillis()).toString();

        // Get the IP Address of the client

        HttpServletRequest request = (HttpServletRequest)FacesContext.getCurrentInstance().getExternalContext().getRequest();

        String ipAddress = request.getHeader("X-FORWARDED-FOR");

        if(ipAddress == null){

            ipAddress = request.getRemoteAddr();

        }

        SocketMediator.getSm().startJob(time, ipAddress);

    }

}

Once all the files are created, you should be able to right-click on Home.jspx and start up the server.  Click the button and the output text will update with messages from the long-running process.  Nifty!

Please let me know your thoughts.  I still have a few questions such as ...

1) Why doesn't SocketMediator's onMessage(WebSocketConnection ... ) method ever fire?  How do we get it to fire?

2) Is ExecutorService executor = Executors.newSingleThreadExecutor(); really the way to go?  Each long running process gets its own Thread.  Wouldn't working from a thread pool be better?

Any other notes on how this works, doesn't work, unseen problems, etc. would be greatly appreciated.

Comments
Locked Post
New comments cannot be posted to this locked post.
Post Details
Locked on May 2 2014
Added on Apr 4 2014
0 comments
815 views