Tuesday, November 25, 2014

Creating a simple WebSocket Server to send ActiveMQ events from a queue

WebSocket 

WebSocket is a protocol providing full-duplex communications channels over a single TCP connection. The WebSocket protocol was standardized by the IETF as RFC 6455 in 2011, and the WebSocket API in Web IDL is being standardized by the W3C.- Wikipedia

ActiveMQ


Apache ActiveMQ is a popular open source messaging server. it supports many cross language clients and protocols. Mainly it supports JMS 1.1 and J2EE 1.4

This application dequeue events(or whatever things in the queue) and send them to a TCP clients using websocket.

This is the WebSocketHandler class. This contains the methods that use to initiate websocket connection with clients. In onConnect method, connection is established with the ActiveMQ and starts dequeuing events from the queue. Please make sure create a queue named "mydataque" in ActiveMQ before continuing.


/**

 * Created by IntelliJ IDEA.

 * User: Chann

 * Date: 11/23/14

 * Time: 11:06 PM

 * To change this template use File | Settings | File Templates.

 */

import java.io.IOException;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.Future;

import java.util.concurrent.TimeUnit;



import org.apache.activemq.ActiveMQConnectionFactory;

import org.eclipse.jetty.websocket.api.Session;

import org.eclipse.jetty.websocket.api.StatusCode;

import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;

import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;

import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;

import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;

import org.eclipse.jetty.websocket.api.annotations.WebSocket;



import javax.jms.*;



@WebSocket

public class MyWebSocketHandler {





    private static String url = "tcp://localhost:61616";

    private static String subject = "mydataque";



    //private final CountDownLatch closeLatch;



    @SuppressWarnings("unused")

    private Session session;







    @OnWebSocketClose

    public void onClose(int statusCode, String reason) {

        System.out.println("Close: statusCode=" + statusCode + ", reason=" + reason);

    }



    @OnWebSocketError

    public void onError(Throwable t) {

        System.out.println("Error: " + t.getMessage());

    }



    @OnWebSocketConnect

    public void onConnect(Session session) {

        System.out.println("Connect: " + session.getRemoteAddress().getAddress());

        try {

            session.getRemote().sendString("Hello Webbrowser");

        } catch (IOException e) {

            e.printStackTrace();

        }

        /////////////////////////////////////////////////////////////////////

        System.out.printf("Got connect: %s%n", session);

        this.session = session;

        try {



            Future<Void> fut;



            int i = 0;



            // Getting JMS connection from the server

            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(

                    url);

            Connection connection = connectionFactory.createConnection();

            connection.start();



            // Creating session for sending messages

            javax.jms.Session session2 = connection.createSession(false,

                    javax.jms.Session.AUTO_ACKNOWLEDGE);



            // Getting the messages from the queue

            Destination destination = session2.createQueue(subject);



            // MessageConsumer is used for receiving (consuming) messages

            MessageConsumer consumer = session2.createConsumer(destination);



            try {



                // Receive the message.

                System.out.println("The Consumer : " + consumer.toString());

                while (true) {



                    Message message = consumer.receive();

                    System.out.println("Received message '"

                            + ((TextMessage) message).getText() + "'");

                    fut = session.getRemote().sendStringByFuture(((TextMessage) message).getText());

                    fut.get(2, TimeUnit.SECONDS);

                    Thread.sleep(50);

                    i++;

                }



            } catch (Exception e) {

                System.out.println("Error : " + e.getMessage());

            }



            finally {

                session.close(StatusCode.NORMAL,

                        "[Consumer]Closing the session with the Server!!");

                connection.close();

                session2.close();



            }





        } catch (Throwable t) {

            t.printStackTrace();

        }

        /////////////////////////////////

    }



    @OnWebSocketMessage

    public void onMessage(String message) {

        System.out.println("Message: " + message);

    }

}
This is the class that contains main methode that initiate webSocket.  After running this class you can access your webSocket using any browser which supports webSockets using url ws://localhost:8081
You can use simple Chrome extensions like this

/**

 * Created by IntelliJ IDEA.

 * User: Chann

 * Date: 11/23/14

 * Time: 11:01 PM

 * To change this template use File | Settings | File Templates.

 */

import org.eclipse.jetty.server.Server;

import org.eclipse.jetty.websocket.server.WebSocketHandler;

import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;

import javax.servlet.ServletException;



public class WebSocketTest {



    public static void main(String[] args) throws Exception {

        Server server = new Server(8081);

        WebSocketHandler wsHandler = new WebSocketHandler() {

        //MessageSocket wsHandler = new MessageSocket()

            @Override

            public void configure(WebSocketServletFactory factory) {

                factory.register(MyWebSocketHandler.class);

            }

        };

        server.setHandler(wsHandler);

        server.start();

        server.join();

    }


}

No comments:

Post a Comment