Writing WebSocket Clients using AsyncHttpClient

The AsyncHttpClient version newly released 1.7.0 now supports WebSocket. Both Netty and Grizzly provider supports the latest version of the specification.

Like HTTP support with AHC, WebSoket is quite simple. Starting with AHC 1.7, a new interface called UpgradeHandler is available to the client so any kind of protocol can be used. You can event replace the actual WebSocket implementation with your in case you don’t like mine :-)

public interface UpgradeHandler<T> {

    /**
     * If the HTTP Upgrade succeed (response's status code equals 101),
     * the {@link AsyncHttpProvider} will invoke that
     * method
     *
     * @param t an Upgradable entity
     */
    void onSuccess(T t);

    /**
     * If the upgrade fail.
     * @param t a {@link Throwable}
     */
    void onFailure(Throwable t);

For WebSocket, I wrote a simple one called WebSocketUpgradeHandler which extends the usual AsyncHandler (always required with AHC) and UpgradeHandler. The interesting part is the WebSocketUpgradeHandler.Builder, which allow you to add listeners and WebSocket properties:

        /**
         * Add a {@link WebSocketListener} that
         * will be added to the {@link WebSocket}
         *
         * @param listener a {@link WebSocketListener}
         * @return this
         */
        public Builder addWebSocketListener(WebSocketListener listener) {
            l.add(listener);
            return this;
        }

        /**
         * Remove a {@link WebSocketListener}
         *
         * @param listener a {@link WebSocketListener}
         * @return this
         */
        public Builder removeWebSocketListener(WebSocketListener listener) {
            l.remove(listener);
            return this;
        }

        /**
         * Set the WebSocket protocol.
         *
         * @param protocol the WebSocket protocol.
         * @return this
         */
        public Builder setProtocol(String protocol) {
            this.protocol = protocol;
            return this;
        }

        /**
         * Set the max size of the WebSocket byte message that will be sent.
         *
         * @param maxByteSize max size of the WebSocket byte message
         * @return this
         */
        public Builder setMaxByteSize(long maxByteSize) {
            this.maxByteSize = maxByteSize;
            return this;
        }

        /**
         * Set the max size of the WebSocket text message that will be sent.
         *
         * @param maxTextSize max size of the WebSocket byte message
         * @return this
         */
        public Builder setMaxTextSize(long maxTextSize) {
            this.maxTextSize = maxTextSize;
            return this;
        }

        /**
         * Build a {@link WebSocketUpgradeHandler}
         * @return a {@link WebSocketUpgradeHandler}
         */
        public WebSocketUpgradeHandler build() {
            return new WebSocketUpgradeHandler(this);
        }

You can add several type of listeners like WebSocketTextListener, WebSocketByteListener, etc. To create a WebSocket, you just need to do:

        AsyncHttpClient c = new AsyncHttpClient();
           // or new AsyncHttpClient(new GrizzlyAsyncHttpprovider(config))

        WebSocket websocket = c.prepareGet("ws://something:port)
                .execute(
                  new WebSocketUpgradeHandler.Builder().addWebSocketListener(
                     new WebSocketByteListener() {

                    @Override
                    public void onOpen(WebSocket websocket) {
                    }

                    @Override
                    public void onClose(WebSocket websocket) {
                    }

                    @Override
                    public void onError(Throwable t) {
                    }

                    @Override
                    public void onMessage(byte[] message) {
                    }

                    @Override
                    public void onFragment(byte[] fragment, boolean last) {
                    }
                }).build()).get();

The AHC Future returned is a WebSocket, and it is returned as soon as the handshake is successful. The WebSocket API is also simple:

public interface WebSocket {

    /**
     * Sen a byte message.
     * @param message a byte message
     * @return this
     */
    WebSocket sendMessage(byte[] message);

    /**
     * Send a text message
     * @param message a text message
     * @return this.
     */
    WebSocket sendTextMessage(String message);

    /**
     * Add a {@link WebSocketListener}
     * @param l a {@link WebSocketListener}
     * @return this
     */
    WebSocket addMessageListener(WebSocketListener l);

    /**
     * Close the WebSocket.
     */
    void close();
}

Conclusion, writing WebSocket is super simple!!! Here is a fully asynchronous echo client

        WebSocket websocket = c.prepareGet("ws://localhost:80")
                .execute(
                  new WebSocketUpgradeHandler.Builder()
                    .addWebSocketListener(new WebSocketTextListener() {

                    @Override
                    public void onMessage(String message) {
                        System.out.println(message);
                    }

                    @Override
                    public void onFragment(String fragment, boolean last) {
                    }

                    @Override
                    public void onOpen(WebSocket websocket) {
                        websocket
                          .sendTextMessage("ECHO")
                          .sendTextMessage("ECHO");
                    }

                    @Override
                    public void onClose(WebSocket websocket) {
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();

                    }
                }).build()).get();

For any questions you can use our Google Group, irc.freenode.net #asynchttpclient or use Twitter to reach me! You can checkout the code on Github as well!

Atmosphere 0.8: Jersey on Steroid, WebSocket Sub Protocol, Native WebSocket, JQuery Plugin CORS, REST over HTTP

November 25, 2011 5 comments

The Atmosphere Framework 0.8 version has been released. This is our biggest release ever! This blog will describe the new feature covered by this fantastic release.

Jersey on Steroid with WebSocket

The Jersey Framework can now be fully run on top of a WebSocket connection seamlessly. A single WebSocket connection can now be used to invoke Jersey. That’s open the door to request pipeline and asynchronous processing of REST call transparently. This will be discussed soon in more details, but this feature is implemented via the new WebSocketProtocol API.

Browser WebSocket Support

Atmosphere fully support Opera, Firefox (MozWebSocket), Chrome, Safari and IE 10 Preview WebSocket transparently when used with the Atmosphere JQuery Plug In. Safari on iOS is also supported.

Native WebSocket Support

It is now possible to write native WebSocket application. A simple PubSub application can consist only of:

    public AtmosphereRequest onMessage(WebSocket webSocket, String message) {
        AtmosphereResource r = (AtmosphereResource) webSocket.resource();
        Broadcaster b = lookupBroadcaster(r.getRequest().getPathInfo());

        b.broadcast(message);

        //Do not dispatch to another Container
        return null;
    }

    public void onOpen(WebSocket webSocket) {
        // Accept the handshake by suspending the response.
        AtmosphereResource r = (AtmosphereResource) webSocket.resource();
        Broadcaster b = lookupBroadcaster(r.getRequest().getPathInfo());
        r.setBroadcaster(b);
        r.addEventListener(new WebSocketEventListenerAdapter());

        // Keep Alive the WebSocket Connection Forever
        r.suspend(-1);
    }

    public void onClose(WebSocket webSocket) {
        webSocket.resource().resume();
    }

    public void onError(WebSocket webSocket,
                        WebSocketProcessor.WebSocketException t) {
        logger.error(t.getMessage() + " Status {} Message {}",
                     t.response().getStatus(),
                     t.response().getStatusMessage());
    }

WebSocket Sub Protocol Implementation Support

Writing WebSocket sub protocol on top of a WebSocket connection is now extremely simple, thanks to the WebSocketProtocol API

public interface WebSocketProtocol extends AsyncProtocol{

    /**
     * Allow an implementation to query the
     * AtmosphereConfig of init-param, etc.
     */
    void configure(AtmosphereServlet.AtmosphereConfig config);

    /**
     * Parse the WebSocket message, and delegate the processing
     * to the {@link org.atmosphere.cpr.AtmosphereServlet#cometSupport} or
     * to any existing technology. Invoking
     * {@link org.atmosphere.cpr.AtmosphereServlet#cometSupport} will
     * delegate the request processing
     * to the {@link org.atmosphere.cpr.AtmosphereHandler} implementation.
     * Returning null means this implementation will
     * handle itself the processing/dispatching of the WebSocket's request;
     * /
    AtmosphereRequest onMessage(WebSocket webSocket, String data);

    AtmosphereRequest onMessage(WebSocket webSocket, byte[] data, int offset, int length);

    /**
     * Invoked when a WebSocket is opened
     * @param webSocket {@link WebSocket}
     */
    void onOpen(WebSocket webSocket);

    /**
     * Invoked when a WebSocket is closed
     * @param webSocket {@link WebSocket}
     */
    void onClose(WebSocket webSocket);

    /**
     * Invoked when an error occurs.
     * @param webSocket {@link WebSocket}
     * @param t a WebSocketProcessor.WebSocketException
     */
    void onError(WebSocket webSocket, WebSocketProcessor.WebSocketException t);

By default, Atmosphere uses the SimpleHttpProtocol to dispatch WebSocket message to Servlet based container. As an example, Atmosphere dispatch WebSockets messages to Jersey by wrapping the message inside an HttpServletRequest and  an asynchronous I/O HttpServletResponse. By default message are considered as POST when dispatched to Jersey, but all HTTP property are configurable (content-type, headers, cookies, etc.). Another example is the EchoProtocol, which just echo message to all connected WebSocket.

Improved Cross-Origin Resource Sharing (CORS) Support

The Atmosphere JQuery Plug In has an improved support for CORS, specially with IE 8/9. You can either turn it on globally or per request. As simple as:

                    jQuery.atmosphere.subscribe(
                        this.url,
                        this.atmosphereCallback,
                        jQuery.atmosphere.request = {
                            method : 'POST',
                            data : json,
                            transport: "websocket" ,
                            fallbackMethod: "POST",
                            enableXDR : 'true',
 attachHeadersAsQueryString: true });

Trackability Support and Multi Tab

Trackability of remote AtmosphereResource was available in previous version only with Jersey. With 0.8, Trackability is now available to all modules and natively implemented in the JQuery Atmosphere Plug In. When the Plug In execute a request, a unique ID is assigned to the request. The server will read that unique id and will try to look up an AtmosphereResource linked to that ID. That allow application to remotely manipulate AtmosphereResource without the need to keep track, inside the application itself, a list of AtmosphereResource. That allow an application to suspend more than one AtmosphereResource per connection, opening the possibility to implement Browser multi-tab support by assigning a unique ID per tab, represented by different AtmosphereResource on the server side.

Atmosphere also support Trackable injection like:

  @Path("/subscribe")
  @POST def subscribeAndPublish(
    @HeaderParam(X_ATMOSPHERE_TRACKING_ID) trackedResource:
                    TrackableResource[AtmosphereResource[_, _]],
    @HeaderParam(X_ATMOSPHERE_TRACKING_ID) trackingID: String,
    @HeaderParam(X_ATMOSPHERE_TRANSPORT) transport: String, message: String)
            : SuspendResponse[TrackableResource[AtmosphereResource[_, _]]] =
    {
     ....
    }

Headers as Query String

Some environment are just allowing the GET operation. An example if the WebSocket Handshake operation, which by default execute a GET without allowing the client to configure any headers. IE CORS also only support this model. The good news is Atmosphere can encode the headers as a QueryString and decode it on the server side as header, allowing application to pass information during the handshake operation. As simple as:

                    jQuery.atmosphere.subscribe(
                        this.url,
                        this.atmosphereCallback,
                        jQuery.atmosphere.request = {
                            method : 'POST',
                            data : json,
                            transport: "websocket" ,
                            fallbackMethod: "POST",
                            attachHeadersAsQueryString: true

                        });

There is much more new features, take a look at the changes log for more info.

For any questions or to download Atmosphere Client and Server Framework, go to our main site, use our Google Group forum, follow the team or myself and tweet your questions there! .

Categories: Atmosphere, Comet, JQuery, Websocket

Hitchiker Guide to the Atmosphere Framework using WebSocket, Long-Polling and Http Streaming

November 7, 2011 1 comment

The Atmosphere Framework easily allow the writing of web application that support, transparently, WebSocket, Long-Polling and Http Streaming. The Atmosphere Framework also hide the complexity of the current asynchronous API, which differ from Server to Server and make your application portable among them. More important, it is much more easy to write an Atmosphere application than using the Servlet 3.0 API.

There are several APIs available in Atmosphere to write an asynchronous application: AtmosphereHandler, Meteor or using Jersey‘s Atmosphere extension. In this blog I will take the famous JQuery PubSub sample to demonstrate those APIs. Note that I will not discuss the JQuery Atmosphere Plugin as it is the same for all APIs. Important, all code snippet below support WebSocket, Long-Polling and Streaming by default. Only the last section only support WebSocket.

The JQuery PubSub Application is quite simple. You enter a topic to subscribe, you select a transport to use (WebSocket, Streaming or Long-Polling) or let the plug in decide for you, and then you are ready to publish message. You can use Redis on the server side to cluster your application among servers. The subscribe operation is done using a GET, the publish using a POST (in the form of message=”something”). If the WebSocket transport is used, the message is wrapped as a POST and delivered as a normal HTTP request. Note that this feature is configurable in Atmosphere

PubSub using AtmosphereHandler

The AtmosphereHandler  is a low level API that can be used to write an asynchronous application. An application just have to implement that interface. This API is usually used by other framework in order to integrate with Atmosphere (GWT, Jersey, Vaading, etc.) but it can also be used if you want to write Servlet style code.  So, with an AtmosphereHandler, the PubSub implementation will take the form of:

public class AtmosphereHandlerPubSub
      extends AbstractReflectorAtmosphereHandler {

    @Override
    public void onRequest
       (AtmosphereResource r)
          throws IOException {

        HttpServletRequest req = r.getRequest();
        HttpServletResponse res = r.getResponse();
        String method = req.getMethod();

        // Suspend the response.
        if ("GET".equalsIgnoreCase(method)) {
            String trackingId = trackingId(req);

            // Log all events on the console, including WebSocket events.
            r.addEventListener(new WebSocketEventListenerAdapter());

            res.setContentType("text/html;charset=ISO-8859-1");

            Broadcaster b = lookupBroadcaster(req.getPathInfo());
            r.setBroadcaster(b);

            if (req.getHeader(X_ATMOSPHERE_TRANSPORT)
                    .equalsIgnoreCase(LONG_POLLING_TRANSPORT)) {
                req.setAttribute(RESUME_ON_BROADCAST, Boolean.TRUE);
                r.suspend(-1, false);
            } else {
                r.suspend(-1);
            }
        } else if ("POST".equalsIgnoreCase(method)) {
            Broadcaster b = lookupBroadcaster(req.getPathInfo());

            String message = req.getReader().readLine();

            if (message != null && message.indexOf("message") != -1) {
                b.broadcast(message.substring("message=".length()));
            }
        }
    }

    @Override
    public void destroy() {
    }

    Broadcaster lookupBroadcaster(String pathInfo) {
        String[] decodedPath = pathInfo.split("/");
        Broadcaster b = BroadcasterFactory.getDefault()
              .lookup(decodedPath[decodedPath.length - 1], true);
        return b;
    }

}

When a GET is received, we lookup a Broadcaster and then suspend the response based on the path info (REST style). Here we need a make sure we aren’t sending padding data (required for WebKit browser) when long polling is used. That’s the only required conditional evaluation needed in terms of transport. With the POST we just look up the Broadcaster (which represent a pubsub topic) and broadcast the request’s body. That;s it.

PubSub using Meteor

The Meteor is another low level API that can be used with existing Servlet application. As an example, the ADF framework use Meteor in order to integrate Atmosphere support.

public class MeteorPubSub extends HttpServlet {

    @Override
    public void doGet(HttpServletRequest req, HttpServletResponse res)
       throws IOException {
        // Create a Meteor
        Meteor m = Meteor.build(req);

        // Log all events on the console, including WebSocket events.
        m.addListener(new WebSocketEventListenerAdapter());

        res.setContentType("text/html;charset=ISO-8859-1");

        Broadcaster b = lookupBroadcaster(req.getPathInfo());
        m.setBroadcaster(b);

        if (req.getHeader(X_ATMOSPHERE_TRANSPORT)
                .equalsIgnoreCase(LONG_POLLING_TRANSPORT)) {
            req.setAttribute(RESUME_ON_BROADCAST, Boolean.TRUE);
            m.suspend(-1, false);
        } else {
            m.suspend(-1);
        }
    }

    public void doPost(HttpServletRequest req, HttpServletResponse res)
        throws IOException {

        Broadcaster b = lookupBroadcaster(req.getPathInfo());
        String message = req.getReader().readLine();

        if (message != null && message.indexOf("message") != -1) {
            b.broadcast(message.substring("message=".length()));
        }
    }

    Broadcaster lookupBroadcaster(String pathInfo) {
        String[] decodedPath = pathInfo.split("/");
        Broadcaster b = BroadcasterFactory.getDefault()
              .lookup(decodedPath[decodedPath.length - 1], true);
        return b;
    }
}

When a GET is received, we create a Meteor and use that Meteor to suspend the response, again using the path info. For post, we do the same as with AtmosphereHandler, e.g retrieve the Broadcaster and broadcast the message.

PubSub using Jersey’s Atmosphere Extension

With the Jersey extension, we can either use annotations or the programmatic API. As simple as”

@Path("/pubsub/{topic}")
@Produces("text/html;charset=ISO-8859-1")
public class JQueryPubSub {

    private
    @PathParam("topic")
    Broadcaster topic;

    @GET
    public SuspendResponse subscribe() {
        return new SuspendResponse.SuspendResponseBuilder()
                .broadcaster(topic)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();
    }

    @POST
    @Broadcast
    public Broadcastable publish(@FormParam("message") String message) {
        return new Broadcastable(message, "", topic);
    }
}

The GET could have been handled using the @Suspend annotation:

    @GET
    @Suspend(listeners = EventsLogger.class, outputComments = true)
    public Broadcastable subscribe(){
        return new Broadcastable(topic);
    }

As you can see, it is quite simpler that with Meteor and AtmosphereHandler.
PubSub.

PubSub using WebSocket only

If you are planning to write pure WebSocket application and don’t plan to support normal HTTP, you can also write your own WebSocket sub protocol. It is quote important to note here that only WebSocket will be supported.

public class WebSocketPubSub implements WebSocketProtocol {

    private AtmosphereResource r;

    @Override
    public HttpServletRequest onMessage(WebSocket webSocket, String message) {
        Broadcaster b = lookupBroadcaster(r.getRequest().getPathInfo());

        if (message != null && message.indexOf("message") != -1) {
            b.broadcast(message.substring("message=".length()));
        }

        //Do not dispatch to another Container like Jersey
        return null;
    }

    @Override
    public void onOpen(WebSocket webSocket) {
        // Accept the handshake by suspending the response.
        r = (AtmosphereResource)
              webSocket.atmosphereResource();

        Broadcaster b = lookupBroadcaster(r.getRequest().getPathInfo());
        r.setBroadcaster(b);
        r.addEventListener(new WebSocketEventListenerAdapter());

        r.suspend(-1);
    }

    @Override
    public void onClose(WebSocket webSocket) {
        webSocket.atmosphereResource().resume();
    }

    Broadcaster lookupBroadcaster(String pathInfo) {
        String[] decodedPath = pathInfo.split("/");
        Broadcaster b = BroadcasterFactory.getDefault().
                 lookup(decodedPath[decodedPath.length - 1], true);
        return b;
    }

The important method here is onOpen (for accepting the handshake) and the onMessage, which is were the messages are received.

Conclusion

It is quite important to pick the best API when writing Atmosphere application as it can save you a lot of time! You can download all samples from here.

For any questions or to download Atmosphere Client and Server Framework, go to our main site, use our Google Group forum, follow the team or myself and tweet your questions there! .

Categories: Atmosphere, Comet, JQuery, Websocket

Configuring HAProxy for WebSocket

October 6, 2011 1 comment

A lot of peoples (including myself at Wordnik) needed to configure HAProxy in order to make WebSocket working. For my Atmosphere Framework project, I’m using:

$ cat /etc/haproxy/haproxy.cfg
global
    maxconn     4096 # Total Max Connections. This is dependent on ulimit
    nbproc      1

defaults
    mode        http

frontend all 0.0.0.0:80
    timeout client 86400000
    default_backend www_backend
    acl is_websocket hdr(Upgrade) -i WebSocket
    acl is_websocket hdr_beg(Host) -i ws

    use_backend socket_backend if is_websocket

backend www_backend
    balance roundrobin
    option forwardfor # This sets X-Forwarded-For
    timeout server 30000
    timeout connect 4000
    server apiserver 127.0.0.1:8080 weight 1 maxconn 1024 check

backend socket_backend
    balance roundrobin
    option forwardfor # This sets X-Forwarded-For
    timeout queue 5000
    timeout server 86400000
    timeout connect 86400000
    server apiserver targetserver:7777 weight 1 maxconn 1024 check

Thanks to Matthias L. Jugel for sharing … see his use of Atmosphere at twimpact.com.

For any questions or to download Atmosphere Client and Server Framework, go to our main site, use our Google Group forum, follow the team or myself and tweet your questions there! You can also checkout the code on Github.

Categories: Atmosphere, Comet, Websocket

Atmosphere.next Update: WebSocket, Javascript, full docs and 1.0.0 on the horizon!

September 8, 2011 8 comments

I took this summer off in order to explore what can be done with the Atmosphere Framework. I have to admit I was extremely surprised about the number of emails I did received for supports, new features, venture capital stuff etc. As you may already know, I’ve decided to join Wordnik.com to pursue the work on Atmosphere.

The future of Atmosphere has never been brighter than now…starting mid September, I will be allowed as much as 50% of my time to work on Atmosphere. That’s A LOT, more time than I ever got allowed at Sun, Ning and Sonatype. For the last two years I’ve innovated on my own time, helped growing the community, do some talks, but I had never a chance to spend the time I wanted on the project. Not anymore!!

So soon I will restart contributing (created a lot of great things over the summer) and work on Atmosphere 1.0, which I hope I can do before next year. The roadmap is simple: stabilize, documents (quite needed), merge all the pull requests/donations for the client side, push the Socket.IO supports, etc. Since I haven’t received what I wanted from Oracle on Atmosphere, I will also rewrite some part of the framework completely to get rid of the CDDL/LGPL stuff in favor or a pure APL licensing (more than 75% is APL right now anyway). And of course the project will stay under Github, and Twitter will still be used for communicating news (either atmo_framework or jfarcand). Stay tuned, the future is bright for Atmosphere!

 

Categories: Atmosphere, Comet, Websocket

Quick Tip: Using Apache Shiro with your Atmosphere’s WebSocket/Comet app.

Apache Shiro is a powerful and easy-to-use Java security framework that performs authentication, authorization, cryptography, and session management. When used with The Atmosphere Framework and a Servlet based Container, the security context of the thread that will execute the async operation may not carry the same SecurityContext, causing some unexpected issues.

As an example, let’s say you want to retrieve the user principal after you have suspended your request. Normally all you need to do is:

Subject currentUser = SecurityUtils.getSubject();

This work perfectly well when the request gets executed using the calling thread of the HTTP request, e.g. when the Servlet Container execute your Servlet.service method. Now when using the Atmosphere Framework, you may need to lookup the Subject once the async operation occurs: when a Broadcast (server side events) gets executed or when the suspended connection resume. Under that condition, doing SecurityUtils.getSubject() will NOT return the same value as when the call gets executed using the calling thread and hence can cause unexpected issues. The async Thread used in that case may be provided by the Servlet Container itself or by the Atmosphere’s BroadcasterConfig ExecutorServices. In both case, the security context is NOT the same as the original HTTP request thread, hence you should not call SecurityUtils API. Instead, one solution is to use the request attribute map and store the information required in it. As simple as:

request.setAttribute("subject", SecurityUtils.getSubject());

Then when the request resume or when a broadcast event occurs (inside your AtmosphereHandler#onStateChange method, your BroadcastFilter etc,), you can easily retrieve that information by doing:

Subject subject = request.getAttribute("subject");

The Request object is *always* the original object constructed by the Servlet Container when handling the HTTP request, hence attributes (and session) will always be available. This is a safe way to store Apache Shiro information.

For any questions or to download Atmosphere Client and Server Framework, go to our main site, use our Google Group forum, follow the team or myself and tweet your questions there! You can also checkout the code on Github.


								
Categories: Atmosphere, Comet, Websocket

REST + WebSocket applications? Why not using the Atmosphere Framework

The Atmosphere Framework easily allow the creation of REST applications … using WebSocket. This time I will describe a super simple example on how to do it.

The Atmosphere Framework supports transparently both WebSocket and Comet transport and brings portability to any Java based application. An application written using Atmosphere can be deployed in any WebServer and Atmosphere will transparently make it work.  Atmosphere is also able to transparently select the best transport to use, e.g. WebSocket or Comet. Now let’s write a very simple REST application with Comet support as we normally write:

@Path("/pubsub/{topic}")
@Produces("text/html;charset=ISO-8859-1")
public class JQueryPubSub {

    private @PathParam("topic") Broadcaster topic;

    @GET
    public SuspendResponse<String> subscribe() {
        return new SuspendResponse.SuspendResponseBuilder<String>()
                .broadcaster(topic)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();
    }

    @POST
    @Broadcast
    public Broadcastable publish(@FormParam("message") String message) {
        return new Broadcastable(message, "", topic);
    }
}

Doing

GET /pubsub/something

will invoked the SuspendResponse. To make the exercise simple, all we do there is suspend the connection (e.g. do not return any response, wait for an event). If you want to make this exercise more difficult, you can always implements the ETag trick! Once the connection is suspended, we need to use a second connection in order to post some data

POST /pubsub/something
message=I love Comet

Executing the POST request will result in the invocation of the publish method. The @Broadcast annotation means the FormParam value will be broadcasted to all suspended connections.

WebSocket to rule them all

OK so let’s assume we now deploy your application in a WebServer that supports WebSocket like Jetty or GlassFish. Now Atmosphere will auto detect WebSockets are supported and use it when a WebSocket request is done. Now let’s assume we build the client using the Atmosphere JQuery Plug In and execute the GET request using Chrome (which support Websocket).The Atmosphere Javascript library is able to challenge the remote server and discover if the server and client support WebSocket, and use it.

In that scenario, suspending the connection will tell Atmosphere to execute the WebSocket handshake. Now the POST will be executed on the same connection, and the public method will be invoked this time.  This is not a POST as you see normally with normal HTTP. Since WebSocket is used, only the form param will be send over the wire:

message=I love WebSocket

All of this occurs without any modification of your REST application. All you need to do to enable WebSocket is to “suspend” the connection when a @GET occurs. Transparent, is it :-) You can download the current version of the sample here.

Now what Atmosphere is doing under the hood is wrapping the WebSocket message into an HttpServletRequest so any framework like Jersey, Wicket, etc, works as it is. If you are familiar with Atmosphere, your AtmosphereHandler implementation will get invoked with an instance of HttpServletRequest that contains the WebSocket message, so you can use it as your will normally do using Comet or normal HTTP request.

For any questions or to download Atmosphere Client and Server Framework, go to our main site, use our Nabble forum, follow the team or myself and tweet your questions there! You can also checkout the code on Github.

Categories: Atmosphere, Comet, JQuery, Websocket

Improving HTTP Long Polling performance using ETag

The Comet technique called “long polling” is the most widely used technique. In this blog I will explain a way optimize this technique using the well know etag HTTP header using the Atmosphere Framework.

There are many ways to optimally implement the long polling technique, and how events get cached on the server side. In most frameworks/applications, events are getting cached to prevent clients from missing them when reconnecting. When the client reconnect, missed events are retrieved using custom techniques like:

  • Use a special header containing the last time the client has connected. Based on that time stamp, the server can retrieve all events that occurred after that time.
  • Use a special header container an event counter, and retrieve the missed events based on that count

A better approach consist of using the well know Etag HTTP header. Wikipedia defines Etag as:

An ETag, or entity tag, is part of HTTP, the protocol for the World Wide Web. It is one of several mechanisms that HTTP provides for cache validation, and which allows a client to make conditional requests. This allows caches to be more efficient, and saves bandwidth, as a web server does not need to send a full response if the content has not changed. .

That’s exactly what we need to optimize our long polling request, e.g use an Etag to:

  1. Decide when to long poll the connection
  2. Decide to long poll the connection even if some events where cached since the last connection. Aggreate the cached events with the new one.
  3. Avoid long polling the connection, return immediately.

Before going into the details, let’s describe how an ETag can be generated from the server side. A simple but very powerful way to generate an Etag is to generate it based on a MD5 Message-Digest Algorithm. That value will be regenerated every time events get cached, and that value will be used as an Etag send back to the client.

The client will use the ETag value and send it back to the server using the “If-None-Match” header. On the server side, the value will be compared with the last generated ETag value. If the client’s ETag match the server one, that means no event occurred since the last time the client was connected. Two actions can be taken:

  • Long poll (suspend) the connection until the ETag gets regenerated, which means a new events occurred on the server side.
  • Return an HTTP status code of 204: The server successfully processed the request, but is not returning any content. Returning a 204 could be used when the number of long polled connections is really high to prevent out of memory errors or to reduce the memory footprint. In that case the client could reconnect later (polling the server).

If the ETag values aren’t matching, that mean events occurred since the last time the client connected. Two actions can be taken:

  • Sent back the cached events to the client. In that case we don’t long poll the connection.
  • long poll the connection (suspend) until the next events occurs. Then combine the cached events with the new one and resume the connection.

Note that every time a new events occurs, a new ETag value needs to be regenerated.

Now how can this be implemented? Using the Atmosphere Framework it is as simple as:

@Produces("application/json")
public SuspendResponse optimalLongPolling(final @Context Request request){
    final String eTagString = server.getETag();
    final Response.ResponseBuilder responseBuilder =
        request.evaluatePreconditions(new EntityTag(eTagString));

    if (responseBuilder != null){
        return new SuspendResponse.SuspendResponseBuilder()
                .broadcaster(broadcaster)
                .addListener(new AtmosphereEventsListener())
                .outputComments(false)
                .resumeOnBroadcast(true)
                .period(30, TimeUnit.MILLISECONDS)
                .build();
    } else {
        throw new WebApplicationException(
                Response.status(Response.Status.NO_CONTENT).build());
    }
}

Very simple.

For any questions or to download Atmosphere Client and Server Framework, go to our main site, use our Nabble forum, follow the team or myself and tweet your questions there! You can also checkout the code on Github.

Categories: Atmosphere

Small Survival guide when debugging your Comet/WebSocket application.

Under my Atmosphere’s Project works, I’m getting a lot of questions about how to debug WebSocket/Comet applications. Here is some simple tools and trick  I use.

ngrep.sourceforge.net

A lot of people use wireshark, I much prefer using ngrep for snooping the network packet. As simple as:

sudo ngrep -d en0 -q -W byline port 8080

For WebSocket it helps seeing what’s being sent being the client and the server. Of course you can always use FireBug or Chrome dev tool for debugging inside the browser,  but it is much easier IMO to see the interaction between the browser and the server from a terminal window:

T 127.0.0.1:61265 -> 127.0.0.1:8080 [AP]
GET /atmosphere-jquery-pubsub/pubsub/chat HTTP/1.1.
Upgrade: WebSocket.
Connection: Upgrade.
Host: 127.0.0.1:8080.
Origin: http://127.0.0.1:8080.
Sec-WebSocket-Key1: Z      mZ1  4 9 0W75 50!p13.
Sec-WebSocket-Key2: .mx1674Dy@ 69K  %K027C9:m.
.
.TQ..34.

T 127.0.0.1:8080 -> 127.0.0.1:61265 [AP]
HTTP/1.1 101 WebSocket Protocol Handshake.
Upgrade: WebSocket.
Connection: Upgrade.
Sec-WebSocket-Origin: http://127.0.0.1:8080.
Sec-WebSocket-Location:
    ws://127.0.0.1:8080/atmosphere-jquery-pubsub/pubsub/chat.
.

T 127.0.0.1:8080 -> 127.0.0.1:61265 [AP]
B...%...K....[..

Using curl

If you are using curl to test your Comet application (or Websocket to test the initial handshake). make sure you pass the -N option as without, curl will buffer the server’s response and will not display it until the connection gets dropped by the server:

 curl -N http://127.0.0.1:8080/atmosphere-jquery-pubsub/pubsub/chat

Without the -N option any real time update will not get display live in the console:

curl -N http://127.0.0.1:8080/atmosphere-jquery-pubsub/pubsub/chat
<!--                                  http://github.com/Atmosphere                                      -->
<!-- Welcome to the Atmosphere Framework. To work with
all the browsers when suspending connection, Atmosphere must output some
data to makes WebKit based browser working.-->
<!-- EOD -->

For any questions, use Twitter to reach me!

Categories: Atmosphere, Comet, Websocket

Writing powerful REST Client using the AsyncHttpClient Library and Jersey

The Jersey Client Framework is a powerful library that easily allow the writing of client based REST client.  Recently I’ve implemented a new transport for the framework based on the AsyncHttpClient library. You can now get the best of both worlds: an easy REST Client API powered by a poweful http client library.

Currently the Jersey Client Framework ships with two transport, one based on the infamous URLConnection, and one based on Apache Http Client.  Recently I needed to replace Wink with Jersey in one of my Sonatype project (guess one?) and  at the same time realized the Jersey Client features was quite limited in terms of what you can do and what is exposed to the client. I understand the original design was to abstract the underlying transport, but unfortunately all http library aren’t supporting the same set of features. Now if you are familiar with the Jersey client, creating a resource is as simple as (see here for the complete tutorial):

    Client c = Client.create(); 

Now if you want to customize the client a little, you can do (complete info here):

    ClientConfig cc = new DefaultClientConfig();
    cc.getProperties().put(
        ClientConfig.PROPERTY_FOLLOW_REDIRECTS, true);
    Client c = Client.create(cc); 

Next step is as simple as:

     WebResource r = c.resource(“http://localhost:8080/xyz”);
     ClientResponse response = r.get(ClientResponse.class);
     EntityTag e = response.getEntityTag();
     String entity = response.getEntity(String.class);

Very simple. Now in order to take advantage of AHC (AsyncHttpClient) features, all you need to do is to first get the library from here, and then just do get a reference to the AsyncHttpClient’s Config class (if you aren’t familiar with AHC, take a look at this introduction)

     DefaultAhcConfig config = new DefaultAhcConfig();
     config.getAsyncHttpClientConfigBuilder().setRealm(
           new Realm.RealmBuilder()
               .setScheme(Realm.AuthScheme.SPNEGO)
               .setUsePreemptiveAuth(false)
               .build());
     Client c = Client.create(config);
     WebResource r = c.resource(getUri().build());

You just added Kerberos support to Jersey Client! The trick here consists of getting a reference to the AsyncHttpClientConfig and configure all the cool stuff offered by AHC. As an example, you can add resumable support, Request/Response or IOException Filter by just doing:

     config.getAsyncHttpClientConfigBuilder()
        .addIOExceptionFilter(new ResumableIOExceptionFilter());

For any questions, use Twitter to reach me! You can checkout the code on Github, download the jars from Maven Central or use Maven:

<dependency>
    <groupId>org.sonatype.spice</groupId>
    <artifactId>jersey-ahc-client</artifactId>
    <version>1.0.0</version>
</dependency>

Categories: Async Http client
Follow

Get every new post delivered to your Inbox.