Archive

Archive for October, 2010

Scaling your Websocket/Comet Real-time application using Redis Pub/Sub

October 25, 2010 3 comments

Scaling Websocket/Comet applications are not simple. Why? As with Comet, a websocket connection on server A cannot receives or share events with another websocket connection on server B.  Just think about an application deployed in the cloud. the same server doesn’t serve all connections, you need some communications between all the servers. You can write your custom mechanism and ends up with kilometers of code, or use the Atmosphere Framework to do it for you.

Atmosphere supports many communication channel between server, but today we will focus on our newly addition: Redis. Using Redis pub/sub API,  we will show how it is possible to deploy a WebSocket application “into the cloud”. Here I will use Atmosphere JQuery Plugin on the client side as the plug in will always try to use WebSocket, and if fail will fall back to Comet techniques. And fortunately, the Plug In will also fall back to Comet as well if the remote server isn’t supporting WebSocket.  This is great because when you deploy into a cloud like EC2, you aren’t guarantee all servers will be the same product, same version etc. Less things to worries.

OK, so let’s write a simple PubSub application. Without going into the details (read this entry for an introduction of Atmosphere JQuery Plugin), our client side will looks like (browse the code here):


    function callback(response)
    {
      if (response.status == 200) {
        // display the result.
      }
    }
    /* transport can be : long-polling, streaming or websocket */
    $.atmosphere.subscribe(uri + 'pubsub/' + "A topic",
                           callback,
                           $.atmosphere.request = {transport: 'websocket'});

As simple as it look, we just invoke  the subscribe method with a uri, a callback and the default transport we want to use: WebSocket. The server will understand the request and act appropriately: upgrade for WebSocket, avoid committing the response for Comet. For publishing, you can use any existing Redis client or use Atmosphere JQuery Plugin as well

    $.atmosphere.response.push(uri + 'pubsub/' + "A Topic",
                               $.atmosphere.request = {data: 'Some Message'});

Here the push method re-use the existing connection (the WebSocket one) to push messages back to the server.  If WebSocket is not supported, the Plug In will still works and fall back to use Comet. That’s it for the client side.

Now on the server side, let’s use the Atmosphere’s Jersey module and the Atmosphere Redis Plugin. The Redis Plugin uses the great Jedis Client internally to connect to Redis (browse the code here)

    private @PathParam("topic") RedisBroadcaster topic;

    @GET
    public SuspendResponse subscribe() {

        return new SuspendResponse.SuspendResponseBuilder()
                .broadcaster(topic)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();
    }

    @POST
    @Broadcast
    public Broadcastable publish(@FormParam("message") String message) {
      return new Broadcastable(message, "", topic);
    }

Don’t look for more code, there isn’t more! How it works:

  1. When the JQuery Plugin issue a request like GET /pubsub/”A Topic”, the above resource is created and an instance of RedisBroadcaster gets injected.  The RedisBroadcaster usesthe value of “A Topic” for subscribing to redis’s server queue. “A Topic”  is also the name of the RedisBroadcaster, which can always be used to retrieve it.
  2. Second, the subscribe method gets invoked, and we tell Atmosphere to suspend the response. For WebSocket, it means accept the upgrade handshake.
  3. Now any messages send using either a Redis client (like redis-cli) or Atmosphere JQuery’s $.atmosphere.response.push will transit into Redis’s pubsub API. The $.atmosphere.response.push will invoke the publish method of your resource, which will use the RedisBroadcaster to progate the change to all WebSocket/Comet connection via Jedis/Redis.

If you are new to Redis, the following explain how to set it up (taken from this great blog on Comet+Atamosphere+Akka)

./redis-cli
redis> subscribe atmosphere
Reading messages... (press Ctrl+c to quit)
1. “subscribe”
2. “atmosphere”
3. (integer) 1

redis-cli is a simple Redis console client. subscribe default starts waiting for messages published to the channel default in the foreground. Redis response confirms that the client was subscribed to the channel. Now, lets post something to the channel:

./redis-cli
redis> publish atmosphere Hello!
(integer) 1

If you have deployed the sample described in this blog, the Hello! message will be pushed back to your browser. Now you may have noticed that every time we publish a message, the message will transit from your application, to the Redis server, back to your application (take a look at the RedisBroadcaster internal if you are curious). You may not have to do this round trip if you use a single server. Instead, you can use the RedisFilter with the  normal Broadcaster:

    private @PathParam("topic") Broadcaster topic;

    @GET
    public SuspendResponse subscribe() {

        topic.getBroadcasterConfig().addFilter(new RedisFilter());

        return new SuspendResponse.SuspendResponseBuilder()
                .broadcaster(topic)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();
    }

With the Redis is every time you publish a message, the normal Atmosphere Broadcaster will first publish it locally (using a simple in-memory queue), and then publish it to Redis without re-publishing the result it will get from Redis. In some applications it may be an optimization to use the in-memory queue.

As demonstrated, writing an application with Atmosphere is super simple, and deploying it inside a cloud/distributed environment is also simple. The good news here is you don’t have to focus on hwo redis work, but on your application. On a side note, if you want to use ActiveMQ (any JMS impl) or JGroups intead of Redis, this is as simple as:

    private @PathParam("topic") JMSBroadcaster topic;

    @GET
    public SuspendResponse subscribe() {

        return new SuspendResponse.SuspendResponseBuilder()
                .broadcaster(topic)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();
    }
    private @PathParam("topic") JGroupsBroadcaster topic;

    @GET
    public SuspendResponse subscribe() {

        return new SuspendResponse.SuspendResponseBuilder()
                .broadcaster(topic)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();
    }

Just need to inject the proper technology you want to use.

The complete binary/source code can be downloaded here. For any questions or to download Atmosphere Client and Server Framework, go to our main site and use our Nabble forum, or follow the team and myself and tweet your questions there!

Async Http Client 1.2.0 released

Read the official announcement including the changes log

http://codemeself.blogspot.com/2010/10/async-http-client-120.html

Thanks to every body that contributed to that amazing release.

For any questions you can use our Google Group, on irc.freenode.net #asynchttpclient or use Twitter to reach me! You can checkout the code on Github, download the jars from Maven Central or use Maven:

<dependency>
    <groupId>com.ning</groupId>
    <artifactId>async-http-client</artifactId>
    <version>1.2.0</version>
</dependency>

Categories: Uncategorized

Writing Websocket applications using Apache Wicket

Writing Apache Wicket application is quite simple. Adding Websocket support to Wicket applications is event simpler! How? By combining the power of the Atmosphere Framework with Wicket, you can quickly transform any existing Wicket application into a powerful HTLM5 application. You think it impossible because your browser or server doesn’t support Websocket? Not a problem with the Atmosphere Framework: both the client and server component are able to emulate Websocket using Comet. What does it means? It means that if you deploy on a Webserver that support Websocket, Atmosphere will use it and if not will use Comet. That means you are 100% guarantee that your Wicket application can be deployed anywhere, and can be used with any browser supporting or not Html5’s Websocket.

Let’s create an extremely simple Wicket application: a simple clock server. The browser will connect to the server and get update about the current time. Nothing complicated, but the goal is to demonstrate how easy it is. Our Web page will look like:

The Server Side

First, let’s just define our web.xml as (all source are available here).


<?xml version="1.0" encoding="ISO-8859-1"?>
<web-app xmlns="http://java.sun.com/xml/ns/j2ee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd"
         version="2.4">

    <display-name>Wicket+Atmosphere</display-name>

    <servlet>
        <description>MeteorServlet</description>
        <servlet-name>MeteorServlet</servlet-name>
        <servlet-class>org.atmosphere.cpr.MeteorServlet</servlet-class>
        <init-param>
            <param-name>org.atmosphere.filter</param-name>
            <param-value>org.apache.wicket.protocol.http.WicketFilter</param-value>
        </init-param>
        <init-param>
            <param-name>applicationClassName</param-name>
            <param-value>org.atmosphere.samples.wicket.WicketPushApplication</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.useWebSocket</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.useNative</param-name>
            <param-value>true</param-value>
        </init-param>
        <init-param>
            <param-name>filterMappingUrlPattern</param-name>
            <param-value>/*</param-value>
        </init-param>
        <load-on-startup>0</load-on-startup>
    </servlet>

    <servlet-mapping>
        <servlet-name>MeteorServlet</servlet-name>
        <url-pattern>/*</url-pattern>
    </servlet-mapping>
</web-app>

We configure Atmosphere by telling the framework to use the MeteorServlet, and tell the servlet to run Wicket and make sure all Atmosphere’s object are injected and available from Wicket. Since we want to support Websocket, we also set it there. Finally, we tell Atmosphere to use native Comet implementation when possible, instead of the Servlet 3.0 Async API (which is not available in many server right now anyway). For people already using Atmosphere, note that here we aren’t using the usual AtmosphereServlet because it made it easier for existing Wicket application to use Meteor.

Now I assume you are familiar with Wicket. If not, take a look at their HelloWord sample. First, our HomePage just consist of a Label and a Panel:

The ClockPanel looks like:


public class ClockPanel extends Panel {

    public ClockPanel(String id) {
        super(id);
        add(new BookmarkablePageLink<PushPage>("cometStart", PushPage.class));
        add(new Label("clock", new AbstractReadOnlyModel<String>() {
            @Override
            public String getObject() {
                return new Date().toString();
            }
        }));
    }
}

Atmosphere comes to play inside our Wicket’s WebPage implementation called PushPage


public class PushPage extends WebPage implements AtmosphereResourceEventListener {

    private final AtomicBoolean scheduleStarted = new AtomicBoolean(false);

    public PushPage() {
        HttpServletRequest req = getWebRequestCycle()
               .getWebRequest().getHttpServletRequest();
        Meteor meteor = Meteor.build(req);
        if (!scheduleStarted.getAndSet(true)) {
            meteor.schedule(new Callable<String>() {
                public String call() {
                    String s = new Date().toString();
                    return s;
                }
            }, 1); // One second
        }
        meteor.addListener(this);

        // Depending on the connection
        String transport = req.getHeader("X-Atmosphere-Transport");
        meteor.suspend(-1, !(transport != null
              && transport.equalsIgnoreCase("long-polling")));
    }

    public void onBroadcast(AtmosphereResourceEvent
           <HttpServletRequest, HttpServletResponse> event) {

        String transport = event.getResource()
                .getRequest().getHeader("X-Atmosphere-Transport");
        if (transport != null && transport.equalsIgnoreCase("long-polling")) {
            Meteor meteor = Meteor.lookup(event.getResource().getRequest());

            meteor.removeListener(this);
            meteor.resume();
        }
    }

    ...
}

This is as simple as:

  1. Get a Meteor instance from the current WebRequest
  2. If we haven’t yet scheduled a task using an Atmosphere’s Broadcaster, schedule it. For our application, we just schedule a Callable that return the current date.
  3. Since we want to support all asynchronous techniques, we add an Atmosphere’s listener to get notified every time the clock is updated. This is required in order to support the long-polling technique.
  4. Finally we suspend the current response. Under the hood, Atmosphere will execute the Websocket upgrade, or avoid committing the response if Comet is required
  5. If the long-polling technique is used, the response will be resumed after every broadcast.

So when a browser send a request, we suspend the underlying connection, and that connection will be used every time a broadcast happens, which is every second. That means the client will receive update every second. As an application developer, note that you DON’T HAVE TO LEARN ANYTHING WEBSOCKET/COMET related. The framework is doing it for you so you can focus on the application logic instead of re-inventing the wheel over and over.

The Client Side

Since we are using Wicket, the client side just consist of defining our ClockPanel.html. Since we don’t want to think about Websocket oups (is the browser supports it?, how to use the api? Is the server supports it?), let’s use the Atmosphere JQuery Plugin to do the work for us:


<html
        xmlns="http://www.w3.org/1999/xhtml"
        xmlns:wicket="http://wicket.apache.org/.../wicket-xhtml1.4-strict.dtd"
        xml:lang="en"
        lang="en">
<wicket:head>
    <script src="http://github.com/Atmosphere/.../jquery-1.4.2.js"
            type="text/javascript"></script>
    <script src="http://github.com/Atmosphere/.../jquery.atmosphere.js"
            type="text/javascript"></script>
    <script>
       $(document).ready(function() {

            function callback(response) {
                if (response.status == 200) {
                    document.getElementById('clock').innerHTML = response.data;
                }
            }

            // You can set websocket, streaming or long-polling here.
            $.atmosphere.subscribe(document.getElementById('cometStart').href,
                    callback,
                    $.atmosphere.request = {transport: 'websocket'});
        });
    </script>
</wicket:head>
<body>
<wicket:panel>
    <a wicket:id="cometStart" id="cometStart"></a>
    <div wicket:id="clock" id="clock">99:99:99</div>
     <div>
        <iframe id="cometFrame" name="cometFrame" width="0" height="0"
               border="0" style="border-width: 0px"/>
    </div>
</wicket:panel>
</body>
</html>

Here we use Atmosphere to send the request and display the asynchronous response from the server (the callback function). For demonstration purpose, we set the default transport to Websocket but this is optional. What the PlugIn will do here is first try to use Websocket support from the Browser. If Websocket is available, it will use it to make a request to the server. If the server reject the request because it doesn’t support Websocket, it will fallback to Comet. For our demo, it fallback to long-polling, but you could change it to streaming and it will still works. For this demo I use an iframe to display the clock update.

Wow. Without being a Wicket expert, I consider this demo quite easy to do. Let me know what you think! You can download the complete application from here (rename to atmosphere-wicket-clock.war). You can also checkout the code on Github.  The sample has been done in collaboration with Andrey Belyaev.

For any questions or to download Atmosphere Client and Server Framework, go to our main site and use our Nabble forum (no subscription needed), or follow the team or myself and tweet your questions there!

Real Time Twitter Search via Websocket or Comet using the Atmosphere Framework

Currently Twitter support a streaming API described as:

The Twitter Streaming API allows high-throughput near-realtime access to various subsets of public and protected Twitter data.

Unfortunately, no such API are available when it is time to execute real time search. Thanks to the freshly released Atmosphere Framework 0.6.2, let’s create such API in less than 25 lines. As an extra, let’s the Atmosphere JQuery Plugin selects the best transport for communicating with this API: Websocket, http-streaming or long-polling.

The Server Side

What we need to accomplish consist of:

  1. Provide an URI that can be used to send search requests (hashtag).
  2. Based on a keyword/hashtag, connect and poll the Twitter search API every second (or more) to collect the JSON result. Since we want to support thousand of clients, let’s make sure never block waiting for the result, and instead be fully asynchronous.
  3. Broadcast/Push back the JSON object to our set of suspended responses. By suspended here I means a connection which use long-polling, http-streaming or Websocket. As you might be aware, Atmosphere hides such details to your application so you don’t have to focus on how the transport works, but on your application itself.
  4. Provide a URI for stopping the real time search.

Now let’s build it. If you can’t wait, you can read the entire code here and here. For our application, let’s use the atmosphere-annotations and atmosphere-jersey modules:

@Path("/search/{tagid}")
@Singleton
public class TwitterFeed {

    private final AsyncHttpClient asyncClient = new AsyncHttpClient();
    private final ConcurrentHashMap<String, Future<?>> futures
                 = new ConcurrentHashMap<String, Future<?>>();

    @GET
    public SuspendResponse<String>
             search(final @PathParam("tagid") Broadcaster feed
                    final @PathParam("tagid") String tagid) {

Simply here we want our class to be invoked when request take the form of /search/{hashtag} and the hashtag can be anything you want to search on. Next we define out first method by asking Jersey’s to inject a Broadcaster and the hashtag (tagid) from the URL. For those of you are aren’t familiar with Atmosphere concept, a Broadcaster is an object that can be used to broadcast/push back events back to the browser. A Broadcaster contains the list of connections that has been suspended  independently of the transport used: long-polling, streaming or Websocket. Hence a Broadcaster can be used to broadcast realtime events. For our current application, we will create one Broadcaster per hashtag. Next is to configure our Broadcaster to poll the Twitter Search API by simply doing:

if (feed.getAtmosphereResources().size() == 0) {
    Future<?> future = feed.scheduleFixedBroadcast(new Callable<String>() {
        private final AtomicReference<String> refreshUrl
               = new AtomicReference<String>("");
           public String call() throws Exception {
              String query = null;
              if (!refreshUrl.get().isEmpty()) {
                  query = refreshUrl.get();
              } else {
                  query = "?q=" + tagid;
              }
              asyncClient.prepareGet(
                  "http://search.twitter.com/search.json"  + query)
                    .execute(new AsyncCompletionHandler <Integer>()) {
                          @Override
                          public Object onCompleted(Response response) throws Exception {
                            String s = response.getResponseBody();
                            JSONObject json = new JSONObject(s);
                            refreshUrl.set(json.getString("refresh_url"));
                            feed.broadcast(s).get();
                            return response.getStatusCode();
                          }
                    });
                    return "OK";
                }
            }, 1, TimeUnit.SECONDS);
            futures.put(tagid, future);
        }

First, we query our Broadcaster (feed) to see if there is already a connection who asked for the real time search. If none, then we invoke the Broasdcaster.scheduleFixedBroadcast(..) with a Callable. That Callable will be executed every second. Inside the callable we use my other active open source project AsynHttpClient, which allow the callable to be executed asynchronously, e.g we send the request but we don’t block waiting for the response. The AsyncHttpClient will take care of calling back the AsyncCompletionHandler once the Twitter API has send us the entire response. We could have streamed the response, but to make the sample simple we just use an AsyncCompletionHandler that buffer the entire JSON response. From the JSON object we get the refresh_url value which we will used next time we query the Twitter Search AP in order to receive only the new results instead of the entire set. Next we just need to tell Atmosphere to suspend the connection and use this Broadcaster:


return new SuspendResponse.SuspendResponseBuilder<String>()
                .broadcaster(feed)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();

Finally we just broadcast the result as it is so our client can use JSON to read the response. We do store the Future returned so later we can stop the real time Broadcast:

    @GET
    @Path("stop")
    public String stopSearch(
           final @PathParam("tagid") Broadcaster feed,
           final @PathParam("tagid") String tagid) {                 

            // Resume all connections associated with an hashtag
            feed.resumeAll();
            futures.get(tagid).cancel(true);
            return "DONE";
        }
    }

To stop a real time search, we just issue /search/{#hashtag}/stop. That’s all we need to do on the server side. Our server side application is now able to receive Websocket, long-polling or http-streaming requests.

The client side

On the client side, you can use any existing javascript library supporting websocket or comet to query the real time API. It is as simple as

/search/#hasttag                for subscribing to real time update (try it with WebSocket!)

/search/#hashtag/stop    to stop the real time update

But the easiest way is to use the Atmosphere’s JQuery Plugin (offfff course :-)), which support Websocket and Comet. More important, the Plugin is able to detect the best transport to use based on what the client and the server supports. As an example, if the application is deployed in Tomcat and you use Chrome, you can let the Plugin find the best transport or specify the one you want to use it.  This is as simple as:


   $.atmosphere.subscribe(document.location.toString() + 'search/' + hashtag              
              callback,
              $.atmosphere.request = {transport: 'Websocket'});

What we do above is to invoke the subscribe method by passing the URL containing the hashtag, a callback and some request properties. The Plugin will invoke the callback as soon as the real time search starts on the server. The callback looks like:


function callback(response) {
   if (response.transport != 'polling' &&
           response.state != 'connected' &&
           response.state != 'closed') {

           if (response.status == 200) {
               var data = response.responseBody;

               try {
                    var result =  $.parseJSON(incompleteMessage + data);  
                    incompleteMessage = "";

                     var i = 0;
                     for (i = result.results.length -1 ; i > -1; i--){
                          $('ul').prepend($('<li></li>').text("["
                               + response.transport + "] "
                               + result.results[i].from_user + " "
                               + result.results[i].text));
                   }
              } catch (err) {
                    incompleteMessage = data;
              }

The critical piece of code above is the parseJSON(incompleteMessage + data);. The size of the the data send back by the server may vary between servers so we may not get the entire JSON object in one invocation of the callback, so we need to wrap that call inside a try/catch (since parseJSON will fail)  and make sure next time the callback gets invoked we append the previously received data. That scenario will happens only if you search for a popular hashtag and you get a large number of response (try #Nordiques !!!).

That’s it for the client side. You can download the sample here and deploy it in any webserver supporting comet and or websocket or none of them!! The interface is simple and demonstrate the transport’s auto detection.

Any cool and better designed interface welcomed 🙂 Finally, you can get more information about this sample by reading my JavaOne talk.

For any questions or to download Atmosphere Client and Server Framework, go to our main site and use our Nabble forum (no subscription needed), or follow the team or myself and tweet your questions there! You can also checkout the code on Github.