Home > Async Http client, Atmosphere, Comet, JQuery, Websocket > Real Time Twitter Search via Websocket or Comet using the Atmosphere Framework

Real Time Twitter Search via Websocket or Comet using the Atmosphere Framework

Currently Twitter support a streaming API described as:

The Twitter Streaming API allows high-throughput near-realtime access to various subsets of public and protected Twitter data.

Unfortunately, no such API are available when it is time to execute real time search. Thanks to the freshly released Atmosphere Framework 0.6.2, let’s create such API in less than 25 lines. As an extra, let’s the Atmosphere JQuery Plugin selects the best transport for communicating with this API: Websocket, http-streaming or long-polling.

The Server Side

What we need to accomplish consist of:

  1. Provide an URI that can be used to send search requests (hashtag).
  2. Based on a keyword/hashtag, connect and poll the Twitter search API every second (or more) to collect the JSON result. Since we want to support thousand of clients, let’s make sure never block waiting for the result, and instead be fully asynchronous.
  3. Broadcast/Push back the JSON object to our set of suspended responses. By suspended here I means a connection which use long-polling, http-streaming or Websocket. As you might be aware, Atmosphere hides such details to your application so you don’t have to focus on how the transport works, but on your application itself.
  4. Provide a URI for stopping the real time search.

Now let’s build it. If you can’t wait, you can read the entire code here and here. For our application, let’s use the atmosphere-annotations and atmosphere-jersey modules:

@Path("/search/{tagid}")
@Singleton
public class TwitterFeed {

    private final AsyncHttpClient asyncClient = new AsyncHttpClient();
    private final ConcurrentHashMap<String, Future<?>> futures
                 = new ConcurrentHashMap<String, Future<?>>();

    @GET
    public SuspendResponse<String>
             search(final @PathParam("tagid") Broadcaster feed
                    final @PathParam("tagid") String tagid) {

Simply here we want our class to be invoked when request take the form of /search/{hashtag} and the hashtag can be anything you want to search on. Next we define out first method by asking Jersey’s to inject a Broadcaster and the hashtag (tagid) from the URL. For those of you are aren’t familiar with Atmosphere concept, a Broadcaster is an object that can be used to broadcast/push back events back to the browser. A Broadcaster contains the list of connections that has been suspended  independently of the transport used: long-polling, streaming or Websocket. Hence a Broadcaster can be used to broadcast realtime events. For our current application, we will create one Broadcaster per hashtag. Next is to configure our Broadcaster to poll the Twitter Search API by simply doing:

if (feed.getAtmosphereResources().size() == 0) {
    Future<?> future = feed.scheduleFixedBroadcast(new Callable<String>() {
        private final AtomicReference<String> refreshUrl
               = new AtomicReference<String>("");
           public String call() throws Exception {
              String query = null;
              if (!refreshUrl.get().isEmpty()) {
                  query = refreshUrl.get();
              } else {
                  query = "?q=" + tagid;
              }
              asyncClient.prepareGet(
                  "http://search.twitter.com/search.json"  + query)
                    .execute(new AsyncCompletionHandler <Integer>()) {
                          @Override
                          public Object onCompleted(Response response) throws Exception {
                            String s = response.getResponseBody();
                            JSONObject json = new JSONObject(s);
                            refreshUrl.set(json.getString("refresh_url"));
                            feed.broadcast(s).get();
                            return response.getStatusCode();
                          }
                    });
                    return "OK";
                }
            }, 1, TimeUnit.SECONDS);
            futures.put(tagid, future);
        }

First, we query our Broadcaster (feed) to see if there is already a connection who asked for the real time search. If none, then we invoke the Broasdcaster.scheduleFixedBroadcast(..) with a Callable. That Callable will be executed every second. Inside the callable we use my other active open source project AsynHttpClient, which allow the callable to be executed asynchronously, e.g we send the request but we don’t block waiting for the response. The AsyncHttpClient will take care of calling back the AsyncCompletionHandler once the Twitter API has send us the entire response. We could have streamed the response, but to make the sample simple we just use an AsyncCompletionHandler that buffer the entire JSON response. From the JSON object we get the refresh_url value which we will used next time we query the Twitter Search AP in order to receive only the new results instead of the entire set. Next we just need to tell Atmosphere to suspend the connection and use this Broadcaster:


return new SuspendResponse.SuspendResponseBuilder<String>()
                .broadcaster(feed)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();

Finally we just broadcast the result as it is so our client can use JSON to read the response. We do store the Future returned so later we can stop the real time Broadcast:

    @GET
    @Path("stop")
    public String stopSearch(
           final @PathParam("tagid") Broadcaster feed,
           final @PathParam("tagid") String tagid) {                 

            // Resume all connections associated with an hashtag
            feed.resumeAll();
            futures.get(tagid).cancel(true);
            return "DONE";
        }
    }

To stop a real time search, we just issue /search/{#hashtag}/stop. That’s all we need to do on the server side. Our server side application is now able to receive Websocket, long-polling or http-streaming requests.

The client side

On the client side, you can use any existing javascript library supporting websocket or comet to query the real time API. It is as simple as

/search/#hasttag                for subscribing to real time update (try it with WebSocket!)

/search/#hashtag/stop    to stop the real time update

But the easiest way is to use the Atmosphere’s JQuery Plugin (offfff course :-)), which support Websocket and Comet. More important, the Plugin is able to detect the best transport to use based on what the client and the server supports. As an example, if the application is deployed in Tomcat and you use Chrome, you can let the Plugin find the best transport or specify the one you want to use it.  This is as simple as:


   $.atmosphere.subscribe(document.location.toString() + 'search/' + hashtag              
              callback,
              $.atmosphere.request = {transport: 'Websocket'});

What we do above is to invoke the subscribe method by passing the URL containing the hashtag, a callback and some request properties. The Plugin will invoke the callback as soon as the real time search starts on the server. The callback looks like:


function callback(response) {
   if (response.transport != 'polling' &&
           response.state != 'connected' &&
           response.state != 'closed') {

           if (response.status == 200) {
               var data = response.responseBody;

               try {
                    var result =  $.parseJSON(incompleteMessage + data);  
                    incompleteMessage = "";

                     var i = 0;
                     for (i = result.results.length -1 ; i > -1; i--){
                          $('ul').prepend($('<li></li>').text("["
                               + response.transport + "] "
                               + result.results[i].from_user + " "
                               + result.results[i].text));
                   }
              } catch (err) {
                    incompleteMessage = data;
              }

The critical piece of code above is the parseJSON(incompleteMessage + data);. The size of the the data send back by the server may vary between servers so we may not get the entire JSON object in one invocation of the callback, so we need to wrap that call inside a try/catch (since parseJSON will fail)  and make sure next time the callback gets invoked we append the previously received data. That scenario will happens only if you search for a popular hashtag and you get a large number of response (try #Nordiques !!!).

That’s it for the client side. You can download the sample here and deploy it in any webserver supporting comet and or websocket or none of them!! The interface is simple and demonstrate the transport’s auto detection.

Any cool and better designed interface welcomed :-) Finally, you can get more information about this sample by reading my JavaOne talk.

For any questions or to download Atmosphere Client and Server Framework, go to our main site and use our Nabble forum (no subscription needed), or follow the team or myself and tweet your questions there! You can also checkout the code on Github.

About these ads
  1. October 4, 2010 at 3:32 am

    wonderful!
    Thank you for sharing …

  2. October 8, 2010 at 7:24 am

    Hey,

    I’m playing around with your examples and they are working like a charm. At the moment I try to create a similar project, but with Guice+Jersey+Atmosphere and I didn’t get it up and running.
    Do you have an Example to share? :) Or are interested in writing one? I already started.

  3. October 8, 2010 at 8:49 pm

    Good hint. I just looked for Atmosphere+Jersey and not Guice. :)

    I merge your twitter-live (atmo+jersey) with guice-chat (atmo+guice) and mixed it up with my own Guice Extension for Automatic Binding of Modules, Bean-Binding and Configuration.
    I’ll release the example under http://github.com/manzke/ gab-web-example.

    Bye and Thanks for the Hint!
    Daniel

  4. October 8, 2010 at 9:01 pm

    Hey I like it. Once completed I’ve no issue adding it to Atmosphere sample distribution!

    Thanks!

    — Jeanfrancois

    • October 8, 2010 at 9:05 pm

      Alright. I’ll make a Guice+Jersey+Atmosphere+GAB-Example and inform than ready to publish! :)

      (GAB = Automatic Binding for Guice :))

      Bye,
      Daniel

  1. October 11, 2010 at 7:18 am
  2. October 11, 2010 at 7:34 am
  3. November 12, 2010 at 12:57 am

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

Join 50 other followers

%d bloggers like this: