Archive

Archive for November, 2010

Friday’s Trick #5: Per Request Filtering of WebSocket/Comet Server Side Events

November 26, 2010 1 comment

This week I will describe how you can filter, transform and aggregate WebSocket/Comet’s server side events per request using the Atmosphere Framework.

As you may know, Atmosphere already support global filtering for a set of WebSocket/Comet connection that are associated with a Broadcaster. Those filters are called BroadcastFilter and are applied every time a server side event (SSE) is broadcasted.  In that case, all connections associated with the Broadcaster, which include WebSocket, long-polling and http streaming connections will receive a SSE that may have been transformed by a BroadcastFilter. As an example, in order to support http streaming in Internet Explorer, the following BroadcastFilter needs to be added (in web.xml or programmatically):

public class JavaScriptClientFilter implements BroadcastFilter {
    private final AtomicInteger uniqueScriptToken = new AtomicInteger();

    @Override
    public BroadcastAction filter(Object message) {
        if (message instanceof String) {
            StringBuilder sb = new StringBuilder("<script id=\"atmosphere_")
                    .append(uniqueScriptToken.getAndIncrement())
                    .append("\">")
                    .append("parent.callback")
                    .append("('")
                    .append(message.toString())
                    .append("');</script>");
            message = sb.toString();
        }
        return new BroadcastAction(BroadcastAction.ACTION.CONTINUE, message);
    }
}

If we use the Atmosphere JQuery PubSub sample, which is defined as:

@Path("/pubsub/{topic}")
@Produces("text/html;charset=ISO-8859-1")
public class JQueryPubSub {

    private @PathParam("topic") Broadcaster topic;

    @GET
    public SuspendResponse subscribe() {
        return new SuspendResponse.SuspendResponseBuilder()
                .broadcaster(topic)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();
    }

    @POST
    @Broadcast
    public Broadcastable publish(@FormParam("message") String message) {
        return new Broadcastable(message, "", topic);
    }

BroadcastFilter will be invoked after the publish method return. Hence the browse generates SSE “hello sent using websocket” to be broadcasted to all connections associated with the Broadcaster’s “topic”, the remote browser will receive the transformed message

<script id="atmosphere_0">parent.callback('hello sent using websocket');</script> 

If you use the Atmosphere JQuery Plugin, the IE specific code for supporting http streaming will be executed. But that solution is far from optimal because we do process every message independently of the browser user-agent, which is a waste of resource. The JQuery Plugin will have no problem parsing the message, but it would be far more optimal if we could transform SSE only when IE is used. This is where the PerRequestBroadcastFilter comes into the picture. To implements a per request transformation, all we need to do is:

public class JavascriptClientFilter implements PerRequestBroadcastFilter {

    private final AtomicInteger uniqueScriptToken = new AtomicInteger();

    @Override
    public BroadcastFilter.BroadcastAction filter
                   (HttpServletRequest request, Object message) {

        if (request.getHeader("User-Agent") != null) {
            String userAgent = request.getHeader("User-Agent").toLowerCase();
            if (userAgent != null && userAgent.indexOf("MSIE") != -1
                    && message instanceof String) {
                StringBuilder sb = new StringBuilder
                    ("<script id=\"atmosphere_")
                    .append(uniqueScriptToken.getAndIncrement())
                    .append("\">")
                    .append("parent.callback")
                    .append("('")
                    .append(message.toString())
                    .append("');</script>");
                message = sb.toString();
            }
        }
        return new BroadcastFilter.BroadcastAction
            (BroadcastFilter.BroadcastAction.ACTION.CONTINUE, message);
    }

Now we talk! With the above, our SSE is only transformed when the IE browser is used. Filtering WebSocket’s message can also be transformed using the same technique. If you have play/read with the JQuery PubSub, the sample can be used with WebSocket/Comet and the JQueryPubSub code described above. When the browser send form param via a Websocket connection, we need to extract the form data in order to broadcast it back. A simple way to do it is by:

public class FormParamFilter implements PerRequestBroadcastFilter{

    @Override
    public BroadcastFilter.BroadcastAction filter
          (HttpServletRequest request, Object message) {

        if (request.getHeaders("X-Atmosphere-Transport").equals("WebSocket")) {
            if ( (message instanceof String) 
               && ((String) message).indexOf("=") != -1) {
                message =  message.toString().split("=")[1];
            }
        }
        return new BroadcastFilter.BroadcastAction
             (BroadcastFilter.BroadcastAction.ACTION.CONTINUE, message);
    }
}

With the above filter we are parsing the form param value only when Websocket is used, in order to extract the SSE we want to broadcast to all other connections. Of course, you can do much more complex operations inside filters!

For any questions or to download Atmosphere Client and Server Framework, go to our main site, use our Nabble forum, follow the team or myself and tweet your questions there! You can also checkout the code on Github.

Advertisements
Categories: Atmosphere, Comet, JQuery, Websocket

What’s new with AsyncHttpClient 1.4.0

November 16, 2010 2 comments

AsyncHttpClient 1.4.0 is has just been released and contains many new features , bugs fixes and significant performance improvement.

Request/ResponseFilter

AsyncHttpClient 1.4.0 supports new Filter API that can be used for pre-processing Request and post-processing Response. The new RequestFilter can be used to decorate the original Request and AsyncHandler instance As an example, below is a RequestFilter which throttle requests:


public class ThrottleRequestFilter implements RequestFilter {
    private final int maxConnections;
    private final Semaphore available;
    private final int maxWait;

    public ThrottleRequestFilter(int maxConnections) {
        this.maxConnections = maxConnections;
        this.maxWait = Integer.MAX_VALUE;
        available = new Semaphore(maxConnections, true);
    }

    public ThrottleRequestFilter(int maxConnections, int maxWait) {
        this.maxConnections = maxConnections;
        this.maxWait = maxWait;
        available = new Semaphore(maxConnections, true);
    }

    public FilterContext filter(FilterContext ctx) throws FilterException {

        try {
            if (!available.tryAcquire(maxWait, TimeUnit.MILLISECONDS)) {
                throw new FilterException(
                    String.format("No slot available for Request %s "
                            "with AsyncHandler %s",
                            ctx.getRequest(), ctx.getAsyncHandler()));
            };
        } catch (InterruptedException e) {
            throw new FilterException(
                    String.format("Interrupted Request %s" +
                         "with AsyncHandler %s",
                            ctx.getRequest(), ctx.getAsyncHandler()));
        }

        return new FilterContext(
             new AsyncHandlerWrapper(ctx.getAsyncHandler()), ctx.getRequest());
    }

    private class AsyncHandlerWrapper implements AsyncHandler {

        private final AsyncHandler asyncHandler;

        public AsyncHandlerWrapper(AsyncHandler asyncHandler) {
            this.asyncHandler = asyncHandler;
        }

        public void onThrowable(Throwable t) {
            asyncHandler.onThrowable(t);
        }

        public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart)
                throws Exception {
            return asyncHandler.onBodyPartReceived(bodyPart);
        }

        public STATE onStatusReceived(HttpResponseStatus responseStatus)
              throws Exception {
            return asyncHandler.onStatusReceived(responseStatus);
        }

        public STATE onHeadersReceived(HttpResponseHeaders headers)
              throws Exception {
            return asyncHandler.onHeadersReceived(headers);
        }

        public T onCompleted() throws Exception {
            available.release();
            return asyncHandler.onCompleted();
        }
    }

To add RequestFilter, all you need to do is to configure it on the AsyncHttpClientConfig:


        AsyncHttpClientConfig.Builder b =
                 new AsyncHttpClientConfig.Builder();
        b.addRequestFilter(new ThrottleRequestFilter(100));

        AsyncHttpClient c = new AsyncHttpClient(b.build());

You can also intercept and decorate the response before your AsyncHandler gets invoked, and also before the authorization, authentication or redirects get handled by the library. Below is a simple example of how a challenge for authorization can be handled by a ResponseFilter instead of letting the library doing it:


AsyncHttpClientConfig.Builder b = new AsyncHttpClientConfig.Builder();
final AtomicBoolean replay = new AtomicBoolean(true);

b.addResponseFilter(new ResponseFilter(){

   public FilterContext filter(FilterContext ctx) throws FilterException {

     if (ctx.getResponseStatus() != null &&
         ctx.getResponseStatus().getStatusCode() == 401
           && replay.getAndSet(false)) {

             Request request =
                   new RequestBuilder(ctx.getRequest())
                        .addHeader("Authorization",
                                           "Basic XAEKDYTHS==").build();
            return new FilterContext(ctx.getAsyncHandler(), request, true);
     }
     return ctx;
}});

TransferListener

A new TransferListener API has been added in order to get progress status of downloading or uploading file. AsyncHttpClient always supported that scenario using the AsyncCompletionHandler, but the new API just make it simpler:


        AsyncHttpClient c = new AsyncHttpClient();
        TransferCompletionHandler tl = new TransferCompletionHandler();
        tl.addTransferListener(new TransferListener() {

            public void onRequestHeadersSent(
                FluentCaseInsensitiveStringsMap headers) {
            }

            public void onResponseHeadersReceived(
                FluentCaseInsensitiveStringsMap headers) {
            }

            public void onBytesReceived(ByteBuffer buffer) {
            }

            public void onBytesSent(ByteBuffer buffer) {
            }

            public void onRequestResponseCompleted() {
            }

            public void onThrowable(Throwable t) {
            }
        });

         Response response = c.preparePut(getTargetUrl())
                              .setBody(largeFile)
                              .execute(tl).get();

Apache HttpClient Support

The Apache Http Client Library is now supported. If you can’t use Netty (which is still the recommended provider), all you need to do is:

   AsyncHttpClient c = new AsyncHttpClient(new ApacheAsyncHttpProvider());

Per Provider Configuration

It is now possible to configure AsyncHttpProvider with their respective configuration. For Netty, you can do:


AsyncHttpClientProviderConfig c = new NettyAsyncHttpClientProviderConfig();
c.addProperty(NettyAsyncHttpClientProviderConfig.USE_BLOCKING_IO,"true");

AsyncHttpClientConfig.Builder config = new AsyncHttpClientConfig.Builder();
builder.setAsyncHttpClientProviderConfig(c);

AsyncHttpClient client = new AsyncHttpClient(config.build());

Bug fixes and Performance Improvements

A lot of bugs and improvements are listed here. We did improve support for http to https redirection, realm/proxy authentication support, NTLM support, OAuth support etc.  On the performance side, the library has been heavily tested and we are tracking performance improvement/regression using that simple load application. So far the Netty provider gives excellent results comparing to other Http Client library.

For any questions you can use our Google Group, on irc.freenode.net #asynchttpclient or use Twitter to reach me! You can checkout the code on Github, download the jars from Maven Central or use Maven:

<dependency>
    <groupId>com.ning</groupId>
    <artifactId>async-http-client</artifactId>
    <version>1.4.0</version>
</dependency>

Categories: Async Http client

Friday’s Tricks #4: Improving Websocket/Comet performance using Delayed/Aggregated Server Side Events

November 12, 2010 1 comment

This week I will explain how you can significantly improve the performance of your WebSocket/Comet application using delayed and aggregated Server Side Events using the Atmosphere Framework.

It is not trivial to broadcast real time server side events using a Comet or WebSocket connection. As an example, if the frequency of your server side events broadcast is high like many events per seconds, it is important to pick up the best strategy when it is time to write those events back to the client. The usual mistake an application does is to sent events to the browser one by one. With the long-polling Comet technique, this gives catastrophic performance issues as the browser needs to reconnect after receiving an events. For the http-streaming Comet’s technique or with WebSocket, the browser doesn’t have to reconnect, but you can still produce catastrophic performance problem on both client and server side if you send events one by one.

With the Atmosphere Framework, you can aggregate or delay events in order to reduce the number of events you sent back to the browser. Let’s explore the first technique, which consist of delaying server side events:

    @Path("/")
    @POST
    @Broadcast(delay=0)
    public Broadcastable buffer(@FormParam("message") String message){
        return broadcast(message);
    }

The code above reflect any form data received from a browser to the set of WebSocket/Comet connections listening to server side events. Instead of automatically sent back the form data to the browser, the code above delay the operation until a second event happens. This is particularly useful if your server side event doesn’t have to be sent real time. Now it may never occurs a second event, so the code above is a little dangerous. Instead, let’s use:

    @Path("/")
    @POST
    @Broadcast(delay=5)
    public Broadcastable buffer(@FormParam("message") String message){
        return broadcast(message);
    }

The difference this time is the server side events will be delayed for 5 seconds maximum. If an second event happens before, the two events will be aggregated and send as a single event.

Although the above mechanism is useful, you may want to aggregate many server side events and send them as a hole instead of one by one. This is particularly true when you need to construct a complex data structure. You can let the browser cache/aggregate the result (but all the data can potentially be lost if the browser crash). Instead, you should create the complex data on the server side by aggregating events. Let’s say your application stream JSON array from an external source. It is quite possible the JSON data received from an external source isn’t fully complete as I’ve demonstrated in my Twitter Feed application, which use the Atmosphere JQuery Plugin (ya, I do what I recommend to avoid :-)):

function subscribe()
   {
     function callback(response)
     {
       $.atmosphere.log('info', ["response.state: " + response.state]);
       $.atmosphere.log('info', ["response.transport: " + response.transport]);
       if (response.transport != 'polling'
         && response.state != 'connected'
         && response.state != 'closed') {

         if (response.status == 200) {
           var data = response.responseBody;

           try {
                var result =  $.parseJSON(incompleteMessage + data);
                incompleteMessage = "";

                 var i = 0;
                 for (i = result.results.length -1 ; i > -1; i--){
                   $('ul').prepend($('').text("["
                         + response.transport + "] "
                         + result.results[i].from_user + " "
                           result.results[i].text));
                 }
            } catch (err) {
               // JSON error (JSON message not complete)
               incompleteMessage = data;
            }
 

In that case, it would be much better to aggregate the server side events until the JSON object has been fully constructed. With Atmosphere, all you need to do is to define a BroadcastFilter. Let’s use a simple example by aggregating String events:

public class StringFilterAggregator implements BroadcastFilter {
    private final int maxBufferedString;
    private final AtomicReference<StringBuilder> bufferedMessage
        = new AtomicReference<StringBuilder>(new StringBuilder());

    public StringFilterAggregator() {
        maxBufferedString = 256;
    }

    public StringFilterAggregator(int maxBufferedString) {
        this.maxBufferedString = maxBufferedString;
    }

    public BroadcastAction filter(Object message) {
        if (message instanceof String) {
            bufferedMessage.get().append(message);
            if (bufferedMessage.get().length() > maxBufferedString) {
                return new BroadcastAction(ACTION.ABORT, message);
            } else {
                message = bufferedMessage.toString();
                bufferedMessage.get().delete(0, bufferedMessage.get().length());
                return new BroadcastAction(ACTION.CONTINUE, message);
            }
        } else {
            return new BroadcastAction(message);
        }
    }
}

All we need to do is to add our Filter to the Atmosphere’s Broadcaster.

    @Path("/")
    @Broadcast
    @POST
    public Broadcastable aggregate(@Context Broadcaster bc) {
         .....
         bc.getBroadcasterConfig().addFilter(new StringFilterAggregator());

        return new Broadcastable(bc);
    }
} 

So now every time an events gets broadcasted, the String will be aggregated until it reach the limit, and only when the limit is reached the write operations will be executed. That can significantly improve the performance of your application.

For any questions or to download Atmosphere Client and Server Framework, go to our main site, use our Nabble forum, follow the team or myself and tweet your questions there! You can also checkout the code on Github.

Categories: Atmosphere, Comet, JQuery, Websocket

Using JQuery, XMPP and Atmosphere to cluster your WebSocket/Comet application

November 8, 2010 3 comments

The Extensible Messaging and Presence Protocol (XMPP) is an open technology for real-time communication, which powers a wide range of applications including instant messaging, presence, multi-party chat, voice and video calls, collaboration, lightweight middleware, content syndication, and generalized routing of XML data. The protocol’s adoption is phenomenal, and this time I will show how easy is to use the Atmosphere’s XMPP Plugin with GTalk to scale and cluster a WebSocket/Comet application.

Warning: If you have read my previous blog entry on Redis and WebSocket, this new entry will suspiciously looks similar 🙂 The Atmosphere Framework Plugin API is bloody simple and adding support for any cloud/clustering technology is extremely easy.

Atmosphere supports many communication channel between server, but today we will focus on our newly addition: XMPP.  For the sample I will use GTalk (but works with any XMPP server) and the XMPP library named  Smack. I will show how it is possible to deploy a WebSocket application on any Webserver and use GTalk/XMPP to share events between “the cloud”.  As usual I will use Atmosphere JQuery Plugin on the client side as the library will always try to use WebSocket, and if fail will fall back to Comet techniques. And fortunately, the Plug In will also fall back to Comet as well if the remote server isn’t supporting WebSocket.  This is great because when you deploy into a cloud or a cluster, you aren’t guarantee all servers will be the same product, same version etc. The Atmosphere Framework saves you from all those questions.  Less things to worries.

OK, so let’s write a simple PubSub application. Without going into the details (read this entry for an introduction of Atmosphere JQuery Plugin), our client side will looks like (browse the code here):


    function callback(response)
    {
      if (response.status == 200) {
        // display the result.
      }
    }
    /* transport can be : long-polling, streaming or websocket */
    $.atmosphere.subscribe(uri + 'pubsub/' + "A topic",
                           callback,
                           $.atmosphere.request = {transport: 'websocket'});

As simple as it look, we just invoke  the subscribe method with a uri, a callback and the default transport we want to use: WebSocket. The server will understand the request and act appropriately: upgrade for WebSocket, avoid committing the response for Comet. For publishing, you can use any existing Redis client or use Atmosphere JQuery Plugin as well

    $.atmosphere.response.push(uri + 'pubsub/' + "A Topic",
                               $.atmosphere.request = {data: 'Some Message'});

Here the push method re-use the existing connection (the WebSocket one) to push messages back to the server.  If WebSocket is not supported, the Plug In will still works and fall back to use Comet. That’s it for the client side.

Now, configuration wise, the Atmosphere XMPP’s plugin can be configured via your web.xml (or using the API as well):

<?xml version="1.0" encoding="ISO-8859-1"?>
<web-app xmlns="http://java.sun.com/xml/ns/j2ee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee
               http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd"
         version="2.4">

    <display-name>XMPPAtmosphere</display-name>

    <servlet>
        <description>AtmosphereServlet</description>
        <servlet-name>AtmosphereServlet</servlet-name>
        <servlet-class>org.atmosphere.cpr.AtmosphereServlet</servlet-class>
        <init-param>
            <param-name>org.atmosphere.useWebSocket</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.useNative</param-name>
            <param-value>true</param-value>
        </init-param>

        <init-param>
            <param-name>org.atmosphere.plugin.xmpp.
                 XMPPBroadcaster.authorizationt</param-name>
            <param-value>me@gmail.com:password</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.plugin.xmpp.
                  XMPPBroadcaster.server</param-name>
            <param-value>http://gmail.com</param-value>
        </init-param>

        <load-on-startup>0</load-on-startup>
    </servlet>

    <servlet-mapping>
        <servlet-name>MeteorServlet</servlet-name>
        <url-pattern>/*</url-pattern>
    </servlet-mapping>
</web-app>

Here you specify a GMail account and which server you want to use ti transit your messages.

Now on the server side, let’s use the Atmosphere’s Jersey module and the Atmosphere XMPP Plugin. The XMPP Plugin uses the great Smack internally to connect to XMPP Server. For this demo I use GTalk. (browse the code here)

    private @PathParam("topic") XMPPBroadcaster topic;

    @GET
    public SuspendResponse subscribe() {

        return new SuspendResponse.SuspendResponseBuilder()
                .broadcaster(topic)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();
    }

    @POST
    @Broadcast
    public Broadcastable publish(@FormParam("message") String message) {
      return new Broadcastable(message, "", topic);
    }

Don’t look for more code, there isn’t more! How it works:

  1. When the JQuery Plugin issue a request like GET /pubsub/”A Topic”, the above resource is created and an instance of XMPPBroadcaster gets injected.  The XMPPBroadcaster uses the value of “A Topic” for creating a channel to the  to GTalk’ server  using your GMail ID (more on that below) . “A Topic”  is also the name of theXMPPBroadcaster , which can always be used to retrieve it by an external component like an EJB, a Java Program etc.
  2. Second, the subscribe method gets invoked, and we tell Atmosphere to suspend the response. For WebSocket, it means accept the upgrade handshake.
  3. Now any messages send using  Atmosphere JQuery’s $.atmosphere.response.push will transit in XMPP via GTalk. The $.atmosphere.response.push will invoke the publish method of your resource, which will use the XMPPBroadcaster  to progate the change to all WebSocket/Comet connection via GTalk.

That’s it. The JQuery look and field is the same as the one described here.  Of course you can do much more with XMPP and Atmosphere, but my goal for this blog was to show how easy it is to get started.

As demonstrated, writing an application with Atmosphere is super simple, and deploying it inside a cloud/distributed environment is also simple. The good news here is you don’t have to focus on how GTalk/XMPP works, but on your application. On a side note, if you want to use RedisActiveMQ (any JMS impl) or JGroups intead of Redis, this is as simple as:

    private @PathParam("topic")
    RedisBroadcaster topic; // Or JMSBroadcaster or JGroupsBroadcaster

    @GET
    public SuspendResponse subscribe() {

        return new SuspendResponse.SuspendResponseBuilder()
                .broadcaster(topic)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();
    }

Just need to inject the proper technology you want to use.

The complete binary/source code can be downloaded here. For any questions or to download Atmosphere Client and Server Framework, go to our main site and use our Nabble forum, or follow the team and myself and tweet your questions there!