Archive

Archive for the ‘Async Http client’ Category

Writing Portable WebSockets Application using Java

Writing Portable WebSockets’ applications using the Java Language can be challenging. Since there is no API standardization yet, several “native” API exists (Play!, Vert.x, Kaazing), you can easily get locked with their API, changes, single vendor etc. The same “problem” happens with well known WebServer: Jetty, Tomcat or GlassFish all have native API making your application not portable. Worse, the same problem happens on the client side: Grizzly, Netty, JWebSocket all have their own API. Not better on the browser side as well: Firefox went with its own API instead of the recommended one (MozWebSocket, fixed in Firefox 12).

So, already giving up? The solution is simple: use the AHC library(client) and the Atmosphere Framework (browser and server). The AHC library adds a thin layer on top of existing WebSockets’ client libraries, using the Netty Framework as default. The Atmosphere Framework allow portable WebSockets across Java based WebServers.

Part I — Writing the Server Component

The Atmosphere Framework works on top of existing native WebSocket implementation and currently supports Jetty, Tomcat, GlassFish and the Netty Framework (the NettoSphere). The Atmosphere Framework allows writing pure WebSockets applications as well as applications supporting the HTML 5 Server Side Events, Long-Polling or HTTP Streaming. Today this blog will only focus on writing WebSockets-only applications. So, no fallback on another transport in case WebSockets aren’t supported, but it is important to note that Atmosphere transparently support that transport’s fallback.

The portable API and easiest way to write pure WebSockets with Atmosphere is called WebSocketHandler, and can be defined as:

public abstract class WebSocketHandler implements WebSocketProtocol {

    public void onByteMessage(WebSocket webSocket, 
                              byte[] data, int offset, int length) {
    }

    public void onTextMessage(WebSocket webSocket, String data) {
    }

    public void onOpen(WebSocket webSocket) {
    }

    public void onClose(WebSocket webSocket) {
    }

    @Override
    public void onError(WebSocket webSocket, 
                        WebSocketProcessor.WebSocketException t) {
    }

For example, let’s say we want to write a simple echo server that broadcast received messages to all connected WebSockets, all we need to do is to extends that class with:

    @Override
    public void onTextMessage(WebSocket webSocket, String message) {
        MetaBroadcaster.getDefault().broadcastTo("/*", messages);
    }

All this code is doing is to broadcast the received messages to all connected client using the MetaBroadcaster utility classes. Since, by default, all connected WebSockets are registered at ‘/*’, no extra code is required. Of course we could have wrote something more complicated (like writing our own WebSocket Protocol), but that’s not the goal of this blog.

Part II — Writing the Browser Component

For the Browser component, let’s use the Atmosphere JQuery Plugin, which supports all browsers and could fallback to another transport in case WebSocket aren’t supported. In the easiest form, all we need to do to receive WebSockets messages is:

   var socket = $.atmosphere;
   var req = new $.atmosphere.AtmosphereRequest();
   req.url = document.location.toString() + '/echo';
   req.transport = "websocket';

   req.onOpen = function(response) {
       alert("WebSocket opened");
   }

   req.onTransportFailure = function(request) {
       alert("This browser or the remote Server doesn't support WebSocket");
   }

   req.onMessage(response) {
      var message = response.responseBody;
      alert("WebSocket Message received: " + message);
   }

   var subSocket = socket.subscribe(request);

Of course a real application will not use the alert call. An interesting callback here is the onTransportFailure, which is called in case the browser or the server isn’t supporting WebSockets. For sending WebSocket’s message:

    subSocket.push("Hello World");

Part III — Writing a Java Client

Now let’s write a portable Java client using the AHC library. As simple as

    AsyncHttpClient c = new AsyncHttpClient();
    WebSocket w = c.prepareGet("ws://127.0.0.1:8080")
                   .execute(new WebSocketUpgradeHandler.Builder().build())
                  .get();
    w.addWebSocketListener(new WebSocketTextListener() {

       public void onMessage(String message) {
         System.out.println("Message Received: " + message);
       }

       public void onOpen(WebSocket websocket) {
         System.out.println("WebSocket Opened");
       }
   }).sendTextMessage("Hello World");

By default the AHC library use the Netty Framework for WebSockets support, but other framework (like the Grizzly Framework) can easily be replaced.

For a more complex, portable WebSockets application, take a look at the Atmosphere Chat sample, which transparently support WebSocket, Server Sides Events, Long-Polling and Streaming. If you are planning to use WebSockets and Java, I strongly recommend you look at Atmosphere instead of using private native API and get stuck on a server/framework forever. For more information, ping me on Twitter or follow the Atmosphere Framework!

Transparently adding WebSockets to your application using SwaggerSocket

Today at Wordnik we announced the immediate availability of SwaggerSocket, a REST over WebSockets protocol that build on of Atmosphere and AsyncHttpClient. In this blog I will give some details about how the protocol works and how we implemented it.

First let’s gives some details about how SwaggerSocket works. SwaggerSocket build on top of the Atmosphere WebSocketProtocol API. The Atmosphere Runtime dispatchs WebSocket events to WebSocketProtocol API, allowing the manipulation of WebSockets events. It is important to note that Atmosphere Runtime takes care of all the WebServer WebSocket details (remember there is no standard on the server side with WebSocket). That why writing a WebSocket application with Atmosphere is portable and can be run on top of Netty, Jetty, GlassFish and Tomcat WebSocket implementation. By default, Atmosphere uses the SimpleHttpProtocol for dispatching WebSocket events to framework or application.

The SimpleHttpProtocol is “simple”: when a WebSocket message is received, it is dispatched as a POST to applications and frameworks. As an example, the Jersey Framework works without any modification inside Atmosphere and transparently work with WebSocket. But you can’t dynamically pass information like form/query params and headers unless you add those inside the WebSocket message itself, by writing your own “sub protocol”. Worse, why waiting for a response to arrive before sending another one? Why not sending an array of requests and get responses as they come, asynchronously? This is where SwaggerSocket can saves our life!

The SwaggerSocket Protocol is exactly that, a Protocol on top of WebSocket. The First version of the Protocol is using JSON to pass information between the client and server. As an example, a WebSocket message looks like:

    "requests" : [
        {
            "uuid" : 0,
            "method" : "POST",
            "path" : "/foo",
            "headers" : [
                {
                    "name" : "Content-Type",
                    "value" : "application/json"
                }
            ],
            "queryStrings" : [
                {
                    "name" : "foo2",
                    "value" : "bar2"
                }
            ],
            "messageBody" : "SwaggerSocket Protocol is cool"
        }
    ]

A Response looks like

    "responses" : [
        {
            "uuid" : 0,
            "status" : "status",
            "path" : "/foo",
            "headers" : [
                {
                    "name" : "name",
                    "value" : "value"
                }
            ],
            "messageBody" : "You are right! SwaggerSocket is coool!"
        }
    ]

It is important to note that any existing applications will WORKS AS IT IS, without any modifications server side. That means your Wicket/GWT/Vaadin/Resis/Hazelcast/Jersey/Spring/Guice/RichFaces/etc. can take advantage of the protocol right now. As an example, a simple Jersey resource:

@Path("/helloworld")
class HelloWorld {

  @Path("/sayHello")
  @GET
  def get(): String = {
    "Swagger Socket Hello World"
  }
}

will run unmodified. For the client, you have choice:

  1. Open a WebSocket and use the swaggersocket-protocol library to create requests and responses
  2. Use the swaggersocket.js library
  3. Use the SwaggerSocket Scala Library, which build on top of AHC (AsyncHttpClient) WebSocket

I strongly recommends the use of 2 and 3, but would like to see contribution for 1 :-). Our goal is to integrate SwaggerSocket with Swagger and ships other client language as well.

With Javascript, all you need to do is:

        $(document).ready(function() {
  new jQuery.swaggersocket.SwaggerSocket()
     .open(document.location.toString() + "ss",
          function(swaggerSocket, r) {
              if (r.getStatus() == 200) {
              var ss = new jQuery.swaggersocket.SwaggerSocketListener();
              ss.onResponse = function(r) {
               // Write the response
              };

              var request = new jQuery.swaggersocket.Request()
                       .method("GET")
                       .path('/helloworld')
                        listener(ss);

               swaggerSocket.send(request);
          } 
   ....

All you need to do is to create a SwaggerSocket, invoke the open method and pass a function that will be invoked when the initial handshake as succeeded. Then you can call swaggerSocket.sendto send a single or an array of requests. You can do the same using the SwaggerSocket Scala Library:

  val ss = SwaggerSocket().open("http://127.0.0.1:8080/helloworld")
  ss.send(new Request.Builder()
   .path("/sayHello")
   .build(), new SwaggerSocketListener() {
  	override def message(r: Request, s: Response) {
        // print the response using response.getData()
     }
   })

The fun just begin! We will soon add support for other transport like Server Side Events, Long-Polling, HTTP Streaming and JSONP so you can deploy your SwaggerSocket application in your current production infrastructure which most probably isn’t supporting WebSocket yet. But you don’t have to wait for WebSocket, Atmosphere always use the best available one and the framework makes sure to switch to WebSockets when available.

I strongly encourage you to take a look at our demos, specially the Twitter one that bring real time search to Twitter, as fast as what Google Search recently started to support.

For any questions or to download SwaggerSocket, go to our main site, use our Google Group forumfollow the team or myself and tweet your questions there!

Writing WebSocket Clients using AsyncHttpClient

December 21, 2011 3 comments

The AsyncHttpClient version newly released 1.7.0 now supports WebSocket. Both Netty and Grizzly provider supports the latest version of the specification.

Like HTTP support with AHC, WebSoket is quite simple. Starting with AHC 1.7, a new interface called UpgradeHandler is available to the client so any kind of protocol can be used. You can event replace the actual WebSocket implementation with your in case you don’t like mine :-)

public interface UpgradeHandler<T> {

    /**
     * If the HTTP Upgrade succeed (response's status code equals 101), 
     * the {@link AsyncHttpProvider} will invoke that
     * method
     *
     * @param t an Upgradable entity
     */
    void onSuccess(T t);

    /**
     * If the upgrade fail.
     * @param t a {@link Throwable}
     */
    void onFailure(Throwable t);

For WebSocket, I wrote a simple one called WebSocketUpgradeHandler which extends the usual AsyncHandler (always required with AHC) and UpgradeHandler. The interesting part is the WebSocketUpgradeHandler.Builder, which allow you to add listeners and WebSocket properties:

        /**
         * Add a {@link WebSocketListener} that
         * will be added to the {@link WebSocket}
         *
         * @param listener a {@link WebSocketListener}
         * @return this
         */
        public Builder addWebSocketListener(WebSocketListener listener) {
            l.add(listener);
            return this;
        }

        /**
         * Remove a {@link WebSocketListener}
         *
         * @param listener a {@link WebSocketListener}
         * @return this
         */
        public Builder removeWebSocketListener(WebSocketListener listener) {
            l.remove(listener);
            return this;
        }

        /**
         * Set the WebSocket protocol.
         *
         * @param protocol the WebSocket protocol.
         * @return this
         */
        public Builder setProtocol(String protocol) {
            this.protocol = protocol;
            return this;
        }

        /**
         * Set the max size of the WebSocket byte message that will be sent.
         *
         * @param maxByteSize max size of the WebSocket byte message
         * @return this
         */
        public Builder setMaxByteSize(long maxByteSize) {
            this.maxByteSize = maxByteSize;
            return this;
        }

        /**
         * Set the max size of the WebSocket text message that will be sent.
         *
         * @param maxTextSize max size of the WebSocket byte message
         * @return this
         */
        public Builder setMaxTextSize(long maxTextSize) {
            this.maxTextSize = maxTextSize;
            return this;
        }

        /**
         * Build a {@link WebSocketUpgradeHandler}
         * @return a {@link WebSocketUpgradeHandler}
         */
        public WebSocketUpgradeHandler build() {
            return new WebSocketUpgradeHandler(this);
        }

You can add several type of listeners like WebSocketTextListener, WebSocketByteListener, etc. To create a WebSocket, you just need to do:

        AsyncHttpClient c = new AsyncHttpClient(); 
           // or new AsyncHttpClient(new GrizzlyAsyncHttpprovider(config))

        WebSocket websocket = c.prepareGet("ws://something:port)
                .execute(
                  new WebSocketUpgradeHandler.Builder().addWebSocketListener(
                     new WebSocketByteListener() {

                    @Override
                    public void onOpen(WebSocket websocket) {
                    }

                    @Override
                    public void onClose(WebSocket websocket) {
                    }

                    @Override
                    public void onError(Throwable t) {
                    }

                    @Override
                    public void onMessage(byte[] message) {
                    }

                    @Override
                    public void onFragment(byte[] fragment, boolean last) {
                    }
                }).build()).get();

The AHC Future returned is a WebSocket, and it is returned as soon as the handshake is successful. The WebSocket API is also simple:

public interface WebSocket {

    /**
     * Sen a byte message.
     * @param message a byte message
     * @return this
     */
    WebSocket sendMessage(byte[] message);

    /**
     * Send a text message
     * @param message a text message
     * @return this.
     */
    WebSocket sendTextMessage(String message);

    /**
     * Add a {@link WebSocketListener}
     * @param l a {@link WebSocketListener}
     * @return this
     */
    WebSocket addMessageListener(WebSocketListener l);

    /**
     * Close the WebSocket.
     */
    void close();
}

Conclusion, writing WebSocket is super simple!!! Here is a fully asynchronous echo client

        WebSocket websocket = c.prepareGet("ws://localhost:80")
                .execute(
                  new WebSocketUpgradeHandler.Builder()
                    .addWebSocketListener(new WebSocketTextListener() {

                    @Override
                    public void onMessage(String message) {
                        System.out.println(message);
                    }

                    @Override
                    public void onFragment(String fragment, boolean last) {
                    }

                    @Override
                    public void onOpen(WebSocket websocket) {
                        websocket
                          .sendTextMessage("ECHO")
                          .sendTextMessage("ECHO");
                    }

                    @Override
                    public void onClose(WebSocket websocket) {
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();

                    }
                }).build()).get();

For any questions you can use our Google Group, irc.freenode.net #asynchttpclient or use Twitter to reach me! You can checkout the code on Github as well!

Writing powerful REST Client using the AsyncHttpClient Library and Jersey

The Jersey Client Framework is a powerful library that easily allow the writing of client based REST client.  Recently I’ve implemented a new transport for the framework based on the AsyncHttpClient library. You can now get the best of both worlds: an easy REST Client API powered by a poweful http client library.

Currently the Jersey Client Framework ships with two transport, one based on the infamous URLConnection, and one based on Apache Http Client.  Recently I needed to replace Wink with Jersey in one of my Sonatype project (guess one?) and  at the same time realized the Jersey Client features was quite limited in terms of what you can do and what is exposed to the client. I understand the original design was to abstract the underlying transport, but unfortunately all http library aren’t supporting the same set of features. Now if you are familiar with the Jersey client, creating a resource is as simple as (see here for the complete tutorial):

    Client c = Client.create(); 

Now if you want to customize the client a little, you can do (complete info here):

    ClientConfig cc = new DefaultClientConfig();
    cc.getProperties().put(
        ClientConfig.PROPERTY_FOLLOW_REDIRECTS, true);
    Client c = Client.create(cc); 

Next step is as simple as:

     WebResource r = c.resource(“http://localhost:8080/xyz”);
     ClientResponse response = r.get(ClientResponse.class);
     EntityTag e = response.getEntityTag();
     String entity = response.getEntity(String.class);

Very simple. Now in order to take advantage of AHC (AsyncHttpClient) features, all you need to do is to first get the library from here, and then just do get a reference to the AsyncHttpClient’s Config class (if you aren’t familiar with AHC, take a look at this introduction)

     DefaultAhcConfig config = new DefaultAhcConfig();
     config.getAsyncHttpClientConfigBuilder().setRealm(
           new Realm.RealmBuilder()
               .setScheme(Realm.AuthScheme.SPNEGO)
               .setUsePreemptiveAuth(false)
               .build());
     Client c = Client.create(config);
     WebResource r = c.resource(getUri().build());

You just added Kerberos support to Jersey Client! The trick here consists of getting a reference to the AsyncHttpClientConfig and configure all the cool stuff offered by AHC. As an example, you can add resumable support, Request/Response or IOException Filter by just doing:

     config.getAsyncHttpClientConfigBuilder()
        .addIOExceptionFilter(new ResumableIOExceptionFilter());

For any questions, use Twitter to reach me! You can checkout the code on Github, download the jars from Maven Central or use Maven:

<dependency>
    <groupId>org.sonatype.spice</groupId>
    <artifactId>jersey-ahc-client</artifactId>
    <version>1.0.0</version>
</dependency>

Categories: Async Http client

AsyncHttpClient 1.5.0 Released

January 31, 2011 2 comments

We are pleased to announce the immediate availability of the Asynchronous Http Client (AHC) library version 1.5.0.

This version includes:

For any questions you can use our Google Group, irc.freenode.net #asynchttpclient or use Twitter to reach me! You can checkout the code on Github, download the jars from Maven Central or use Maven:

<dependency>
    <groupId>com.ning</groupId>
    <artifactId>async-http-client</artifactId>
    <version>1.5.0</version>
</dependency>

Categories: Async Http client

Going Asynchronous using AsyncHttpClient: For Dummies

January 12, 2011 10 comments

Concluding on my “Going Asynchronous using AsyncHttpClient (Basic and Complex)” blog series, this week I will explains how to use the new SimpleAsynHttpClient API to write powerful async client applications.

The SimpleAsynchHttpClient is an implementation on top of the AsyncHttpClient and its builders class like AsyncHttpClientConfigBuilder, RealmBuilder etc. With SimpleAsynchHttpClient, all you need to do is to create a single builder, and configure your request from that builder:

 SimpleAsyncHttpClient client = new SimpleAsyncHttpClient.Builder()
                .setUrl("http://..")
                .setProxyHost("http://...")
                .addHeader("foo","bar")
                .setRealmPrincipal("me")
                .setRealmPassword("pwd")
                .build();

Once you have configured your SimpleAsyncHttpClient, you can invoke your HTTP method:

 client.get();
   or
 client.head();

Now the biggest difference with the AsyncHttpClient API is the support for BodyConsumer and BodyGenerator. The BodyGenerator API allows an application to customize how bytes are sent. This is quite useful when you need to stream large fie and you don’t want to load it in memory. As an example, the library contains a FileBodyGenerator defined as

public class FileBodyGenerator implements BodyGenerator {

    private final File file;

    public FileBodyGenerator(File file) {
        this.file = file;
    }

    public RandomAccessBody createBody() throws IOException {
        return new FileBody(file);
    }

    protected static class FileBody
            implements RandomAccessBody {

        private final RandomAccessFile file;

        private final FileChannel channel;

        private final long length;

        public FileBody(File file)
                throws IOException {
            this.file = new RandomAccessFile(file, "r");
            channel = this.file.getChannel();
            length = this.file.length();
        }

        public long getContentLength() {
            return length;
        }

        public long read(ByteBuffer buffer)
                throws IOException {
            return channel.read(buffer);
        }

        public long transferTo(long position,
                               long count, WritableByteChannel target)
                throws IOException {
            return channel.transferTo(position, count, target);
        }

        public void close()
                throws IOException {
            file.close();
        }
    }
}

The library ships with InpuStreamBodyGenerator, ByteArrayBodyGenerator and the one above. Those BodyGenerator never buffers bytes in memory, reducing the probability to fill the heap and get an OOM. For the response, the SimpleAsyncHttpClient support a new API called BodyConsumer, which is simply defined as

public interface BodyConsumer {

    void consume(ByteBuffer byteBuffer) throws IOException;

    void close() throws IOException;

}

Simple callback that are invoked as soon as the response’s bytes are available. As an example, the library ships with

public class AppendableBodyConsumer implements BodyConsumer {

    private final Appendable appendable;
    private final String encoding;

    public AppendableBodyConsumer(Appendable appendable, String encoding) {
        this.appendable = appendable;
        this.encoding = encoding;
    }

    public AppendableBodyConsumer(Appendable appendable) {
        this.appendable = appendable;
        this.encoding = "UTF-8";
    }

    public void consume(ByteBuffer byteBuffer) throws IOException {
        appendable.append(new String(byteBuffer.array(), encoding));
    }

    public void close() throws IOException {
        if (Closeable.class.isAssignableFrom(appendable.getClass())) {
            Closeable.class.cast(appendable).close();
        }
    }
}

The library ships with ByteBufferBodyConsumer, FileBodyConsumer and OutputStreamBodyConsumer. Hence normally you shouldn’t have to write BodyConsumer and BodyGenerator…just use the one available! Simply do:

 SimpleAsyncHttpClient client = new SimpleAsyncHttpClient.Builder()
                .setIdleConnectionInPoolTimeoutInMs(100)
                .setUrl("http://.....)
                .setHeader("Content-Type", "text/html")
                .build();

  Future future = client.post(
                new InputStreamBodyGenerator(myInputStream),
                new OutputStreamBodyConsumer(myOutputStream));

  Response response = future.get();

Finally, if you need to re-use your instance of SimpleAsyncHttpClient, you an use the Request API directly like when you work with AsyncHttpClient:

 SimpleAsyncHttpClient client = new SimpleAsyncHttpClient.Builder()
                .setIdleConnectionInPoolTimeoutInMs(100)
                .setUrl("http://.....)
                .setHeader("Content-Type", "text/html")
                .build();

  Future future = client.post(
                new InputStreamBodyGenerator(myInputStream),
                new OutputStreamBodyConsumer(myOutputStream));

  Response response = future.get();

  client.get(new RequestBuilder().setUrl("http://...").build(),
             new OutputStreamBodyConsumer(myOutputStream));
  response = future.get();  

The SimpleAsyncHttpClient will be included in 1.5.0, which is targeted to ship weeks of January 18. For any questions you can use our Google Group, irc.freenode.net #asynchttpclient or use Twitter to reach me! You can checkout the code on Github, download the jars from Maven Central or use Maven:

<dependency>
    <groupId>com.ning</groupId>
    <artifactId>async-http-client</artifactId>
    <version>1.5.0-SNAPSHOT</version>
</dependency>

Categories: Async Http client

Going Asynchronous using AsyncHttpClient: The Complex

The Async Http Client library purpose is to allow Java applications to easily execute HTTP requests and asynchronously process the HTTP responses. In this second part on the topic, I will describe more complex operations that can be done with the AsyncHttpClient like resumable download, zero in memory bytes copy, oAuth calculation, optimal transfer listener and performance tricks.

Resumable Donwload

The AsyncHttpClient supports resumable download in two differents scenarios:

  • IOException: If an IOException occurs (for whatever reason), you can configure the library to restart the download automatically without having to restart the download from the beginning.
  • JVM crashes: If your application or the JVM goes down during a file download, the library can also restart the download automatically when the same download is requested.

You can configure the AsyncHttpClient Library to survive IOException using the IOException Filter:

  AsyncHttpClient c = new AsyncHttpClient(
          new AsyncHttpClientConfig.Builder()
            .addIOExceptionFilter(
               new ResumableIOExceptionFilter()).build());

  ResumableAsyncHandler a =
      new ResumableAsyncHandler(
         new ResumableRandomAccessFileListener());

  a.setResumableListener(
       new ResumableRandomAccessFileListener(
           new RandomAccessFile( "file.avi", "rw" ) ) );

 Response r = c.prepareGet("http://host:port/file.avi")
       .execute(a).get();

If you need something more high level and configurable, you can use a ResumableAsyncHandler, and or implement a ResumableProcessor:

 AsyncHttpClient c = new AsyncHttpClient();
 ResumableAsyncHandler a =
     new ResumableAsyncHandler( new PropertiesBasedResumableProcessor() );
 a.setResumableListener(
     new ResumableRandomAccessFileListener(
        new RandomAccessFile( "file.avi", "rw" ) ) );

 Response r = c.prepareGet( "http://localhost:8081/file.AVI" )
        .execute( a ).get();

You can also simply use a ResumableListener (or use the ResumableRandomAccessFileListener, which does what’s described below):

public interface ResumableListener {

    public void onBytesReceived(ByteBuffer byteBuffer) throws IOException;

    public void onAllBytesReceived();

    public long length();
}

As simple as:

 AsyncHttpClient c = new AsyncHttpClient();
 final RandomAccessFile file = new RandomAccessFile( "file.avi", "rw" );        
 ResumableAsyncHandler a = new ResumableAsyncHandler();
 a.setResumableListener( new ResumableListener() {

    public void onBytesReceived(ByteBuffer byteBuffer) throws IOException {
       file.seek( file.length() );
       file.write( byteBuffer.array() );
    }

    public void onAllBytesReceived() {
       file.close();
    }

    public long length() {
       return file.length();
    }
} );
Response r = c.prepareGet( "http://localhost:8081/file.AVI" )
        .execute( a ).get();

Make it simple: TransferListener

In some scenario an application may need to manipulate the received bytes in more than one place, e.g. saves the bytes on disk but also accumulate it for checksum checking later. In that case, instead of using an AsyncHandler and mixes logic inside an AsyncHandler.onBodyPartReceived, it is recommended to use the TransferListener simple API:

public interface TransferListener {

    public void onRequestHeadersSent
       (FluentCaseInsensitiveStringsMap headers);

    public void onResponseHeadersReceived
        (FluentCaseInsensitiveStringsMap headers);

    public void onBytesReceived(ByteBuffer buffer)
         throws IOException;

    public void onBytesSent(ByteBuffer buffer);

    public void onRequestResponseCompleted();

    public void onThrowable(Throwable t);
}

All you need to do in that case is to create a TransferCompletionHandler and add as many TransferListener as you need:

 AsyncHttpClient client = new AsyncHttpClient();
 TransferCompletionHandler tl = new TransferCompletionHandler();
 tl.addTransferListener(new TransferListener(){...});

 Response response = httpClient.prepareGet("http://...").execute(tl).get();

Zero Bytes Copy

When uploading or downloading bytes, it is important to try to avoid buffering bytes in memory.

Upload

On the upload side, the mechanism is enabled by default when setting the Request’s body to a File:

 AsyncHttpClient client = new AsyncHttpClient();

 File file = new File("file.avi");
 Future f = client.preparePut("http://localhost").setBody(file).execute();

If you can’t use a File, the recommended way (as described in part I) is to use a BodyGenerator. It is strongly recommended to avoid using InputStream as the library will unfortunately buffer the entire content in memory in order to set the content-lenght, which can cause out of memory error.

Download

On the download side, you can use the HttpResponseBodyPart.writeTo to avoid loading bytes in memory and unnecessary copy:

 AsyncHttpClient client = new AsyncHttpClient();

 File tmp = new File("zeroCopy.txt");
 final FileOutputStream stream = new FileOutputStream(tmp);
 Future f = client.prepareGet("http://localhost/largefile.avi")
       .execute(new AsyncHandler() {
     public void onThrowable(Throwable t) {
     }

     public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart)
         throws Exception {
        bodyPart.writeTo(stream);
        return STATE.CONTINUE;
     }

     { .... }
});
Response resp = f.get();

Limiting the number of connections to improve raw performance

By default the library uses a connection pool and re-use connections as needed. It is important to not let the connection pool grow too large as it takes resources in memory. One way consist of setting the maximum number of connection per host or in total:

 AsyncHttpClientConfig config = new AsyncHttpClientConfig.Builder()
     .setMaximumConnectionsPerHost(10)
     .setMaximumConnectionsTotal(100)
     .build();
 AsyncHttpClient c = new AsyncHttpClient(config);

There is no magic number, so you will need to try it and decide which one gives the best result.

Using OAuth

You can use the library to pull data from any OAuth site (like Twitter). This is as simple as:

 private static final String CONSUMER_KEY = "dpf43f3p2l4k3l03";
 private static final String CONSUMER_SECRET = "kd94hf93k423kf44";
 public static final String TOKEN_KEY = "nnch734d00sl2jdk";
 public static final String TOKEN_SECRET = "pfkkdhi9sl3r4s00";
 public static final String NONCE = "kllo9940pd9333jh";
 final static long TIMESTAMP = 1191242096;

 public void oAuth() {
   ConsumerKey consumer =
     new ConsumerKey(CONSUMER_KEY, CONSUMER_SECRET);
   RequestToken user =
     new RequestToken(TOKEN_KEY, TOKEN_SECRET);
   OAuthSignatureCalculator calc =
      new OAuthSignatureCalculator(consumer, user);
   AsyncHttpClient client = new AsyncHttpClient();

   Response response = client.prepareGet("http://...")
      .setSignatureCalculator(calc).execute().get();

What’s Next

In the next part of this series I will explain more complex operations that can be done with the AsyncHttpClient library like:

  • Supporting the WebSocket protocol.
  • Concurrent use of AsyncHandler

For any questions you can use our Google Group, irc.freenode.net #asynchttpclient or use Twitter to reach me! You can checkout the code on Github, download the jars from Maven Central or use Maven:

<dependency>
    <groupId>com.ning</groupId>
    <artifactId>async-http-client</artifactId>
    <version>1.4.1</version>
</dependency>

Categories: Async Http client

Going Asynchronous using AsyncHttpClient: The Basic

December 21, 2010 12 comments

The Async Http Client library purpose is to allow Java applications to easily execute HTTP requests and asynchronously process the HTTP responses. In this blog I will explain how to use the library and what features are supported.

Executing request synchronously or asynchronously.

The first thing to decide when using the library is if your application can handle asynchronous response or not. If not, the library has been designed using the Future API, hence you can always execute synchronous call by blocking on the Future.get() method:

   AsyncHttpClient client = new AsyncHttpClient();
   Response response = client.prepareGet(("http://sonatype.com")
     .execute().get();

The above means the request will block until the full Response has been received. It also made your application’s blocking, waiting for the response to comes back. This could be potentially an issue to block for every request, specially when doing POST or PUT operations where you don’t necessarily need to wait for the response. A simple way consists of not calling the Future.get()

AsyncHttpClient client = new AsyncHttpClient();
Response response =
   client.preparePut(("http://sonatype.com/myFile.avi").execute();

A better way than above would consist of using an AsyncHandler. The AynchHandler API is fairly simple and just consist of 5 methods to implements:

public interface AsyncHandler<T>  {
    void onThrowable(Throwable t);

    STATE onBodyPartReceived(HttpResponseBodyPart bodyPart)
      throws Exception;

    STATE onStatusReceived(HttpResponseStatus responseStatus)
      throws Exception;

    STATE onHeadersReceived(HttpResponseHeaders headers)
      throws Exception;

    T onCompleted() throws Exception;
}

The method’s order of invocation when the response start arriving consist of:

  1. onStatusReceived: The status line has been processed.
  2. onHeadersReceived: All response’s headers has been processed.
  3. onBodyPartReceived: A body parts has been received. This method can be invoked many time depending of the response’s bytes body.
  4. onCompleted: Invoked when the full response has been read, or if the processing get aborted (more on this below)
  5. onThrowable: Invoked if something wrong happened inside the previous methods or when an I/O exception occurs.

Note that for all methods onXXXReceived, the return value is an enum which can take the value of CONTINUE or ABORT. Returning CONTINUE tells the library to continue processing the response, where ABORT means stop processing the response and automatically invoke the onCompleted(). This is particularly helpful if your application just need to looks for the response’s status or headers, without the need to process the entire response’s body. An implementation would looks like (T can be anything):

AsyncHttpClient client = new AsyncHttpClient();
client.prepareGet("http://sonatype.com")
  .execute(new AsyncHandler<T>() {

     void onThrowable(Throwable t) {
     }
     public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart)
       throws Exception{
        return STATE.CONTINUE;
     }
     public STATE onStatusReceived(HttpResponseStatus responseStatus)
       throws Exception {
        return STATE.CONTINUE;
     }
     public STATE onHeadersReceived(HttpResponseHeaders headers)
       throws Exception {
        return STATE.CONTINUE;
     }
     T onCompleted() throws Exception {
       return T;
   }
});

Creating a Request object

The AsynHttpClient uses the builder pattern when it is time to create Request object. The simplest way consist of:

    RequestBuilder builder = new RequestBuilder("PUT");
    Request request = builder..setUrl("http://")
     .addHeader("name", "value")
     .setBody(new File("myUpload.avi"))
     .build();
    AsyncHttpClient client = new AsyncHttpClient();
    client.execute(request, new AsyncHandler&lt;...&gt;() {
         .....
    } );

If you need to work with File, the library supports the zero copy in memory concept, e.g the File can be uploaded or downloaded without loading its associated bytes in memory, preventing out of memory errors in case you need to upload or download many large files. Although the library support the following:

    Request request = builder..setUrl("http://")
      .addHeader("name", "value")
      .setBody(myInputStream))
      .build();

it is discouraged to use InputStream as the library will need to buffer bytes in memory in order to determine the length of the stream, and instead highly recommended to either use a File or the BodyGenerator API to avoid loading unnecessary bytes in memory:

 public interface BodyGenerator {
    Body createBody() throws IOException;
}

where a Body is defined as:

public interface Body {
    long getContentLength();
    long read(ByteBuffer buffer)
      throws IOException;
    void close() throws IOException;
}

This way the library will never read unnecessary bytes in memory, which could significantly improve the performance your application.

The RequestBuilder can also be used to create per Request configuration, like setting a Proxy or request timeout:

    PerRequestConfig requestConfig = new PerRequestConfig();
    requestConfig.setRequestTimeoutInMs(5 * 1000);
    requestConfig.setProxy(new ProxyServer(...));
    Future responseFuture =
     client.prepareGet("http://").setPerRequestConfig(requestConfig)
        .execute();

Creating a Response object

The AsyncHandler is typed, e.g you can return any object from the AsyncHandler.onCompleted(). One useful object of the library is the Response object and it’s associate builder. You can incrementally create a Response object using the ResponseBuilder.accumulate() method:

MyAsyncHandler<Response> asyncHandler = new MyAsyncHanfler<Response>() {
  private final Response.ResponseBuilder builder =
          new Response.ResponseBuilder();

  public STATE onBodyPartReceived(final HttpResponseBodyPart content)
    throws Exception {
      builder.accumulate(content);
      return STATE.CONTINUE;
  }

  public STATE onStatusReceived(final HttpResponseStatus status)
     throws Exception {
      builder.accumulate(status);
      return STATE.CONTINUE;
  }

  public STATE onHeadersReceived(final HttpResponseHeaders headers)
     throws Exception {
      builder.accumulate(headers);
      return STATE.CONTINUE;
  }

  public Response onCompleted() throws Exception {
      return builder.build();
  }
}

Response response = client.prepareGet("http://sonatype.com")
     .execute(asyncHandler).get();

One thing to consider when creating a Response object is the size of the response body. By default, a Response object will accumulate all response’s bytes in memory, and that could potentially create an out of memory error. If you are planning to use the API for downloading large files, it is not recommended to accumulate bytes in memory and instead flush the bytes on disk as soon as they are available. Note that you can still use the Response object, except you don’t accumulate the response’s bytes as demonstrated below:

MyAsyncHandler<Response> asyncHandler = new MyAsyncHanfler<Response>() {
   private final Response.ResponseBuilder builder =
      new Response.ResponseBuilder();

   public STATE onBodyPartReceived(final HttpResponseBodyPart content)
     throws Exception {
       content.write(myOutputStream);
       return STATE.CONTINUE;
   }

   public STATE onStatusReceived(final HttpResponseStatus status)
     throws Exception {
       builder.accumulate(status);
       return STATE.CONTINUE;
   }

   public STATE onHeadersReceived(final HttpResponseHeaders headers)
      throws Exception {
       builder.accumulate(headers);
       return STATE.CONTINUE;
   }

   public Response onCompleted() throws Exception {
       return builder.build();
   }
}

Response response = client.prepareGet("http://sonatype.com")
   .execute(asyncHandler).get();

Note that in the above scenario invoking Response.getResponseBodyAsStream() or getResponseBody() will return an IllegalStateException because the body wasn’t accumulated by the Response object.

Configuring the AsyncHttpClient: Compression, Connection Pool, Proxy, Times out, Thread Pools, Security, etc.

You can configure the AsyncHttpClient class using the AsyncHttpClientConfig’s Builder:

    Builder builder = new AsyncHttpClientConfig.Builder();
    builder.setCompressionEnabled(true)
        .setAllowPoolingConnection(true)
        .setRequestTimesout(30000)
        .build();
    AsyncHttpClient client = new AsyncHttpClient(builder.build());

You can set the ExecutorServices as well if you don’t want to use the default, which is a cached threads pool:

    Builder builder = new AsyncHttpClientConfig.Builder();
    builder.setExecutorService(myOwnThreadPool);
    AsyncHttpClient client = new AsyncHttpClient(builder.build());

You can also configure the connection pool the library is using and implement your own polling strategy:

    Builder builder = new AsyncHttpClientConfig.Builder();
    builder.setConnectionsPool(new ConnectionsPoo<U,V>() {
          public boolean offer(U uri, V connection) {...}
          public V poll(U uri)  {...}
          public boolean removeAll(V connection)  {...}
          public boolean canCacheConnection()  {...}
          public void destroy()  {...}
     });
    AsyncHttpClient client = new AsyncHttpClient(builder.build());

It is recommended to use the default connections pool for performance reason, but you are always free to design a better one.
You can also set the SSL information, Filters, etc. Those topics will be covered inside their own section.

Configuring SSL

Configuring the library to support SSL is simple. By default you don’t have to configure anything if you don’t need to use your own certificates etc.

   AsyncHttpClient client = new AsyncHttpClient();
   Response response = client.prepareGet(("https://sonatype.com")
     .execute().get();

The library will detect it’s an SSL request and appropriately locate the key store, trust store etc. If you need to configure those objects, all you need to do is to create an SSLContext and set it using the AsyncHttpClient’s Builder as showed below:

  InputStream keyStoreStream = ....
  char[] keyStorePassword = "changeit".toCharArray();
  KeyStore ks = KeyStore.getInstance("JKS");
  ks.load(keyStoreStream, keyStorePassword);

  char[] certificatePassword = "changeit".toCharArray();
  KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
  kmf.init(ks, certificatePassword);

  KeyManager[] keyManagers = kmf.getKeyManagers();
  TrustManager[] trustManagers = new TrustManager[]{DUMMY_TRUST_MANAGER};
  SecureRandom secureRandom = new SecureRandom();

  SSLContext sslContext = SSLContext.getInstance("TLS");
  sslContext.init(keyManagers, trustManagers, secureRandom);
  Builder builder = new AsyncHttpClientConfig.Builder();
  builder.setSSLContext(myOwnThreadPool);
  AsyncHttpClient client = new AsyncHttpClient(builder.build());

Using Filters

The library supports three types of Filter who can intercept, transform, decorate and replay transactions: Request, Response and IOException.

Request Filter

Request Filters are useful if you need to manipulate the Request or AsyncHandler object before the request is made. As an example, you can throttle requests using the following RequestFilter implementation:

public class ThrottleRequestFilter implements RequestFilter {
    private final int maxConnections;
    private final Semaphore available;
    private final int maxWait;

    public ThrottleRequestFilter(int maxConnections) {
        this.maxConnections = maxConnections;
        this.maxWait = Integer.MAX_VALUE;
        available = new Semaphore(maxConnections, true);
    }

    public ThrottleRequestFilter(int maxConnections, int maxWait) {
        this.maxConnections = maxConnections;
        this.maxWait = maxWait;
        available = new Semaphore(maxConnections, true);
    }

    public FilterContext filter(FilterContext ctx) throws FilterException {

        try {
            if (!available.tryAcquire(maxWait, TimeUnit.MILLISECONDS)) {
                throw new FilterException(
                    String.format("No slot available for Request %s "
                            "with AsyncHandler %s",
                            ctx.getRequest(), ctx.getAsyncHandler()));
            };
        } catch (InterruptedException e) {
            throw new FilterException(
                    String.format("Interrupted Request %s" +
                         "with AsyncHandler %s",
                            ctx.getRequest(), ctx.getAsyncHandler()));
        }

        return new FilterContext(
             new AsyncHandlerWrapper(ctx.getAsyncHandler()), ctx.getRequest());
    }

    private class AsyncHandlerWrapper implements AsyncHandler<T> {

        private final AsyncHandler asyncHandler;

        public AsyncHandlerWrapper(AsyncHandler asyncHandler) {
            this.asyncHandler = asyncHandler;
        }

        public void onThrowable(Throwable t) {
            asyncHandler.onThrowable(t);
        }

        public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart)
                throws Exception {
            return asyncHandler.onBodyPartReceived(bodyPart);
        }

        public STATE onStatusReceived(HttpResponseStatus responseStatus)
              throws Exception {
            return asyncHandler.onStatusReceived(responseStatus);
        }

        public STATE onHeadersReceived(HttpResponseHeaders headers)
              throws Exception {
            return asyncHandler.onHeadersReceived(headers);
        }

        public T onCompleted() throws Exception {
            available.release();
            return asyncHandler.onCompleted();
        }
    }

In the above, we decorate the original AsyncHandler and use semaphore to throttle requests. To add RequestFilter, all you need to do is to configure it on the AsyncHttpClientConfig:

 AsyncHttpClientConfig.Builder b =
                 new AsyncHttpClientConfig.Builder();
 b.addRequestFilter(new ThrottleRequestFilter(100));
 AsyncHttpClient c = new AsyncHttpClient(b.build());

Response Filter

Like with Request, you can also filter the Response’s bytes before an AsyncHandler gets called. Response Filters are always invoked before the library executes the logic for authentication, proxy challenging, redirection etc. That means an application can takes control of those operations at any moment using a Response Filter. As an example, the following Response Filter redirect request from google.ca to google.com in case .ca is not responding:

 AsyncHttpClientConfig.Builder b = new AsyncHttpClientConfig.Builder();
 b.addResponseFilter(new ResponseFilter() {

   public FilterContext filter(FilterContext ctx) throws FilterException {

      if ( ctx.getResponseStatus().getStatusCode() == 503 ) {
          return new FilterContext.FilterContextBuilder(ctx)
                       .request(new RequestBuilder("GET")
                       .setUrl("http://google.com").build())
                       .build();
                 }
            }});

 AsyncHttpClient c = new AsyncHttpClient(b.build());

IOException Filter

The AsyncHttpClient library support IOExceptionFilter that can be used to replay a request in case server a server goes down or unresponsive, a network outage occurs, or nay kind of I/O abnormal situation. In those cases, the library will catch the IOException and delegate the IOException handling to the Filter. As an example, the following filter will resume an interrupted download instead of restarting downloading the file from the beginning:

    AsyncHttpClient c = new AsyncHttpClient(
          new AsyncHttpClientConfig.Builder()
            .addIOExceptionFilter(
              new ResumableIOExceptionFilter()).build());

    Response r = c.prepareGet("http://host:port/LargeFile.avi")
       .execute(new AsyncHandler(){...}).get();

The IOExceptionFilter is defined as

public class ResumableIOExceptionFilter implements IOExceptionFilter {
    public FilterContext filter(FilterContext ctx) throws FilterException {
        if (ctx.getIOException() != null ) {
            Request request = new RequestBuilder(ctx.getRequest())
                .setRangeOffset(file.length());
            return new FilterContext.FilterContextBuilder(ctx)
               .request(request)
               .replayRequest(true)
               .build();
        }
        return ctx;
    }
}

In the above we just catch any IOException and replay the request using the Range header to tell the remote server to restart sending bytes at that position. This way we don’t need to re download the entire file.

Uploading file: Progress Listener

When uploading bytes, an application might need to take some action depending on where the upload status is. The AsyncHttpClient library support a special AsyncHandler called ProgressAsyncHandler that can be used to track the upload operation:

public interface ProgressAsyncHandler<T> extends AsyncHandler<T> {
    STATE onHeaderWriteCompleted();
    STATE onContentWriteCompleted();
    STATE onContentWriteProgress(long amount, long current, long total);
}

The methods are called in the following order:

  • onHeaderWriteCompleted: invoked when the headers has been flushed to the remote server
  • onContentWriteProgress: as soon as some response’s body bytes are written. Might be invoked many times.
  • onContentWriteCompleted: invoked when the response has been sent or aborted.

Like with AsyncHandler, you can always always abort the processing at any moment in the upload process

Configuring Authentication: BASIC, DIGEST or NTLM

Configuring authentication with AsyncHttpClient is simple. You can configure it at the Request level using the RealmBuilder:

    AsyncHttpClient client = new AsyncHttpClient();
    Realm realm = new Realm.RealmBuilder()
               .setPrincipal(user)
               .setPassword(admin)
               .setUsePreemptiveAuth(true)
               .setScheme(AuthScheme.BASIC)
               .build();
    client.prepareGet("http://...").setRealm(realm).execute();

You can also set the realm at the AsyncHttpClientConfig level:

    Builder builder = new AsyncHttpClientConfig.Builder();
    Realm realm = new Realm.RealmBuilder()
               .setPrincipal(user)
               .setPassword(admin)
               .setUsePreemptiveAuth(true)
               .setScheme(AuthScheme.BASIC)
               .build();
    builder.setRealm(realm).build();
    AsyncHttpClient client = new AsyncHttpClient(builder.build());

The authentication type supported are BASIC, DIGEST and NTLM. You can also customize your own authentication mechanism by using the Response Filter.

Configuring a Proxy

The AsyncHttpClient library supports proxy, proxy authentication and proxy tunneling. Just need to create a ProxyServer instance:

AsyncHttpClient client = new AsyncHttpClient();
        Future<Response> f = client
                .prepareGet("http://....)
                .setProxyServer(new ProxyServer("127.0.0.1", 8080))
                .execute();

If you need to use an SSL tunnel, all you need to do is:

 ProxyServer ps =
          new ProxyServer(ProxyServer.Protocol.HTTPS, "127.0.0.1", 8080);
 AsyncHttpClient asyncHttpClient = new AsyncHttpClient();
 RequestBuilder rb = new RequestBuilder("GET")
             .setProxyServer(ps)
             .setUrl("https://twitpic.com:443");

  Future responseFuture = asyncHttpClient
               .executeRequest(rb.build(), new AsyncCompletionHandlerBase() {
    @Override
    public void onThrowable(Throwable t) {}

    @Override
    public Response onCompleted(Response response) throws Exception {
      return response;
    }});

   Response r = responseFuture.get();

You can also set the authentication token on the ProxyServer instance

 ProxyServer ps = new ProxyServer(ProxyServer.Protocol.HTTPS,
                                  "127.0.0.1",
                                  8080,
                                  "admin",
                                  "password");
  AsyncHttpClient asyncHttpClient = new AsyncHttpClient();
  RequestBuilder rb = new RequestBuilder("GET")
        .setProxyServer(ps).setUrl("https://twitpic.com:443");

  Future responseFuture = asyncHttpClient
               .executeRequest(rb.build(), new AsyncCompletionHandlerBase() {
    @Override
    public void onThrowable(Throwable t) {}

    @Override
    public Response onCompleted(Response response) throws Exception {
      return response;
    }});

  Response r = responseFuture.get();

You can also set the ProxyServer at the AsyncHttpClientConfig level. In that case, all request will share the same proxy information.

Switching Provider

By default, the AsyncHttpClient is using the powerful Netty’s framework as the HTTP processor. There might be environment where you can’t use Netty. Fortunately, the AsyncHttpClient library supports two other http runtime: the JDKAsyncHttpProvider, which build around the  URLConnection, and ApacheAsyncHttpProvider which build on top of the Apache HttpClient. To change provider, all you need to do is:

 AsyncHttpClient client = new AsyncHttpClient(
    new ApacheAsyncHttpProvider(new AsyncHttpClientConfig.Builder().build()));

Same for the JDK:

 AsyncHttpclient client = new AsyncHttpClient(
    new JDKAsyncHttpProvider(new AsyncHttpClientConfig.Builder().build()));

Also every AsyncHttpClientProvider can be configured with their native functionality. As an example, you can switch the NettyAsyncHttpProvider to use blocking I/O instead of NIO:

 NettyAsyncHttpProviderConfig config = new NettyAsyncHttpProviderConfig();
 config.setProperty(NettyAsyncHttpProviderConfig.USE_BLOCKING_IO, "true");

 AsyncHttpClientConfig c = 
    new AsyncHttpClientConfig()
      .setAsyncHttpClientProviderConfig(config).build();

 AsyncHttpClient client = new AsyncHttpClient(
    new NettyAsyncHttpProvider(config));

Using the WebDav protocol

The AsyncHttpClient has build in support for the WebDav protocol. The API can be used the same way normal HTTP request are made, and everything discussed in this blog works with WebDAV as well:

 AsyncHttpClient c = new AsyncHttpClient();
 Request mkcolRequest = new RequestBuilder("MKCOL")
      .setUrl("http://host:port/folder1").build();
 Response response = c.executeRequest(mkcolRequest).get();

or

  AsyncHttpClient c = new AsyncHttpClient();
  Request propFindRequest = new RequestBuilder("PROPFIND")
     .setUrl("http://host:port).build();
  Response response = c.executeRequest(propFindRequest, new AsyncHandler(){...}).get();

Using the ready to go AsyncHandler

The framework is shipping with ready to go AsyncHandler:

  • The AsyncCompletionHandlerBase is an implementation of the AsyncCompletionHandler&lt;Response&gt;. That means Future.get() will always return a instance of Response.
  • The WebDavCompletionHandlerBase can be used to parse the XML response of a PROPFIND request. As an example, response.getStatusCode() will return 200 instead of 207 if you don’t use that AsyncHandler.
  • The ResumableAsyncHandler can be used to survive interrupted download, either produced by an IOException or because the JVM went down. This handler will be explained in details in the next blog about AsyncHttpClient.

What’s Next

In the next part of this article (here)  I will explain more complex operations that can be done with the AsyncHttpClient library like:

  • Configuring resumable download using the ResumableAsyncHandler, ResumableProcessor and ResumableListener
  • How to efficiently uses the TransferListener
  • How to efficiently uses the zero bytes copy mechanism
  • How to configure the AsyncHttpClient for performance
  • How to manage 100-Continue server response
  • OAuth build in support
  • Supporting the WebSocket protocol.

For any questions you can use our Google Group, irc.freenode.net #asynchttpclient or use Twitter to reach me! You can checkout the code on Github, download the jars from Maven Central or use Maven:

<dependency>
    <groupId>com.ning</groupId>
    <artifactId>async-http-client</artifactId>
    <version>1.4.1</version>
</dependency>

What’s new with AsyncHttpClient 1.4.0

November 16, 2010 2 comments

AsyncHttpClient 1.4.0 is has just been released and contains many new features , bugs fixes and significant performance improvement.

Request/ResponseFilter

AsyncHttpClient 1.4.0 supports new Filter API that can be used for pre-processing Request and post-processing Response. The new RequestFilter can be used to decorate the original Request and AsyncHandler instance As an example, below is a RequestFilter which throttle requests:


public class ThrottleRequestFilter implements RequestFilter {
    private final int maxConnections;
    private final Semaphore available;
    private final int maxWait;

    public ThrottleRequestFilter(int maxConnections) {
        this.maxConnections = maxConnections;
        this.maxWait = Integer.MAX_VALUE;
        available = new Semaphore(maxConnections, true);
    }

    public ThrottleRequestFilter(int maxConnections, int maxWait) {
        this.maxConnections = maxConnections;
        this.maxWait = maxWait;
        available = new Semaphore(maxConnections, true);
    }

    public FilterContext filter(FilterContext ctx) throws FilterException {

        try {
            if (!available.tryAcquire(maxWait, TimeUnit.MILLISECONDS)) {
                throw new FilterException(
                    String.format("No slot available for Request %s "
                            "with AsyncHandler %s",
                            ctx.getRequest(), ctx.getAsyncHandler()));
            };
        } catch (InterruptedException e) {
            throw new FilterException(
                    String.format("Interrupted Request %s" +
                         "with AsyncHandler %s",
                            ctx.getRequest(), ctx.getAsyncHandler()));
        }

        return new FilterContext(
             new AsyncHandlerWrapper(ctx.getAsyncHandler()), ctx.getRequest());
    }

    private class AsyncHandlerWrapper implements AsyncHandler {

        private final AsyncHandler asyncHandler;

        public AsyncHandlerWrapper(AsyncHandler asyncHandler) {
            this.asyncHandler = asyncHandler;
        }

        public void onThrowable(Throwable t) {
            asyncHandler.onThrowable(t);
        }

        public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart)
                throws Exception {
            return asyncHandler.onBodyPartReceived(bodyPart);
        }

        public STATE onStatusReceived(HttpResponseStatus responseStatus)
              throws Exception {
            return asyncHandler.onStatusReceived(responseStatus);
        }

        public STATE onHeadersReceived(HttpResponseHeaders headers)
              throws Exception {
            return asyncHandler.onHeadersReceived(headers);
        }

        public T onCompleted() throws Exception {
            available.release();
            return asyncHandler.onCompleted();
        }
    }

To add RequestFilter, all you need to do is to configure it on the AsyncHttpClientConfig:


        AsyncHttpClientConfig.Builder b =
                 new AsyncHttpClientConfig.Builder();
        b.addRequestFilter(new ThrottleRequestFilter(100));

        AsyncHttpClient c = new AsyncHttpClient(b.build());

You can also intercept and decorate the response before your AsyncHandler gets invoked, and also before the authorization, authentication or redirects get handled by the library. Below is a simple example of how a challenge for authorization can be handled by a ResponseFilter instead of letting the library doing it:


AsyncHttpClientConfig.Builder b = new AsyncHttpClientConfig.Builder();
final AtomicBoolean replay = new AtomicBoolean(true);

b.addResponseFilter(new ResponseFilter(){

   public FilterContext filter(FilterContext ctx) throws FilterException {

     if (ctx.getResponseStatus() != null &&
         ctx.getResponseStatus().getStatusCode() == 401
           && replay.getAndSet(false)) {

             Request request =
                   new RequestBuilder(ctx.getRequest())
                        .addHeader("Authorization",
                                           "Basic XAEKDYTHS==").build();
            return new FilterContext(ctx.getAsyncHandler(), request, true);
     }
     return ctx;
}});

TransferListener

A new TransferListener API has been added in order to get progress status of downloading or uploading file. AsyncHttpClient always supported that scenario using the AsyncCompletionHandler, but the new API just make it simpler:


        AsyncHttpClient c = new AsyncHttpClient();
        TransferCompletionHandler tl = new TransferCompletionHandler();
        tl.addTransferListener(new TransferListener() {

            public void onRequestHeadersSent(
                FluentCaseInsensitiveStringsMap headers) {
            }

            public void onResponseHeadersReceived(
                FluentCaseInsensitiveStringsMap headers) {
            }

            public void onBytesReceived(ByteBuffer buffer) {
            }

            public void onBytesSent(ByteBuffer buffer) {
            }

            public void onRequestResponseCompleted() {
            }

            public void onThrowable(Throwable t) {
            }
        });

         Response response = c.preparePut(getTargetUrl())
                              .setBody(largeFile)
                              .execute(tl).get();

Apache HttpClient Support

The Apache Http Client Library is now supported. If you can’t use Netty (which is still the recommended provider), all you need to do is:

   AsyncHttpClient c = new AsyncHttpClient(new ApacheAsyncHttpProvider());

Per Provider Configuration

It is now possible to configure AsyncHttpProvider with their respective configuration. For Netty, you can do:


AsyncHttpClientProviderConfig c = new NettyAsyncHttpClientProviderConfig();
c.addProperty(NettyAsyncHttpClientProviderConfig.USE_BLOCKING_IO,"true");

AsyncHttpClientConfig.Builder config = new AsyncHttpClientConfig.Builder();
builder.setAsyncHttpClientProviderConfig(c);

AsyncHttpClient client = new AsyncHttpClient(config.build());

Bug fixes and Performance Improvements

A lot of bugs and improvements are listed here. We did improve support for http to https redirection, realm/proxy authentication support, NTLM support, OAuth support etc.  On the performance side, the library has been heavily tested and we are tracking performance improvement/regression using that simple load application. So far the Netty provider gives excellent results comparing to other Http Client library.

For any questions you can use our Google Group, on irc.freenode.net #asynchttpclient or use Twitter to reach me! You can checkout the code on Github, download the jars from Maven Central or use Maven:

<dependency>
    <groupId>com.ning</groupId>
    <artifactId>async-http-client</artifactId>
    <version>1.4.0</version>
</dependency>

Categories: Async Http client

Real Time Twitter Search via Websocket or Comet using the Atmosphere Framework

Currently Twitter support a streaming API described as:

The Twitter Streaming API allows high-throughput near-realtime access to various subsets of public and protected Twitter data.

Unfortunately, no such API are available when it is time to execute real time search. Thanks to the freshly released Atmosphere Framework 0.6.2, let’s create such API in less than 25 lines. As an extra, let’s the Atmosphere JQuery Plugin selects the best transport for communicating with this API: Websocket, http-streaming or long-polling.

The Server Side

What we need to accomplish consist of:

  1. Provide an URI that can be used to send search requests (hashtag).
  2. Based on a keyword/hashtag, connect and poll the Twitter search API every second (or more) to collect the JSON result. Since we want to support thousand of clients, let’s make sure never block waiting for the result, and instead be fully asynchronous.
  3. Broadcast/Push back the JSON object to our set of suspended responses. By suspended here I means a connection which use long-polling, http-streaming or Websocket. As you might be aware, Atmosphere hides such details to your application so you don’t have to focus on how the transport works, but on your application itself.
  4. Provide a URI for stopping the real time search.

Now let’s build it. If you can’t wait, you can read the entire code here and here. For our application, let’s use the atmosphere-annotations and atmosphere-jersey modules:

@Path("/search/{tagid}")
@Singleton
public class TwitterFeed {

    private final AsyncHttpClient asyncClient = new AsyncHttpClient();
    private final ConcurrentHashMap<String, Future<?>> futures
                 = new ConcurrentHashMap<String, Future<?>>();

    @GET
    public SuspendResponse<String>
             search(final @PathParam("tagid") Broadcaster feed
                    final @PathParam("tagid") String tagid) {

Simply here we want our class to be invoked when request take the form of /search/{hashtag} and the hashtag can be anything you want to search on. Next we define out first method by asking Jersey’s to inject a Broadcaster and the hashtag (tagid) from the URL. For those of you are aren’t familiar with Atmosphere concept, a Broadcaster is an object that can be used to broadcast/push back events back to the browser. A Broadcaster contains the list of connections that has been suspended  independently of the transport used: long-polling, streaming or Websocket. Hence a Broadcaster can be used to broadcast realtime events. For our current application, we will create one Broadcaster per hashtag. Next is to configure our Broadcaster to poll the Twitter Search API by simply doing:

if (feed.getAtmosphereResources().size() == 0) {
    Future<?> future = feed.scheduleFixedBroadcast(new Callable<String>() {
        private final AtomicReference<String> refreshUrl
               = new AtomicReference<String>("");
           public String call() throws Exception {
              String query = null;
              if (!refreshUrl.get().isEmpty()) {
                  query = refreshUrl.get();
              } else {
                  query = "?q=" + tagid;
              }
              asyncClient.prepareGet(
                  "http://search.twitter.com/search.json"  + query)
                    .execute(new AsyncCompletionHandler <Integer>()) {
                          @Override
                          public Object onCompleted(Response response) throws Exception {
                            String s = response.getResponseBody();
                            JSONObject json = new JSONObject(s);
                            refreshUrl.set(json.getString("refresh_url"));
                            feed.broadcast(s).get();
                            return response.getStatusCode();
                          }
                    });
                    return "OK";
                }
            }, 1, TimeUnit.SECONDS);
            futures.put(tagid, future);
        }

First, we query our Broadcaster (feed) to see if there is already a connection who asked for the real time search. If none, then we invoke the Broasdcaster.scheduleFixedBroadcast(..) with a Callable. That Callable will be executed every second. Inside the callable we use my other active open source project AsynHttpClient, which allow the callable to be executed asynchronously, e.g we send the request but we don’t block waiting for the response. The AsyncHttpClient will take care of calling back the AsyncCompletionHandler once the Twitter API has send us the entire response. We could have streamed the response, but to make the sample simple we just use an AsyncCompletionHandler that buffer the entire JSON response. From the JSON object we get the refresh_url value which we will used next time we query the Twitter Search AP in order to receive only the new results instead of the entire set. Next we just need to tell Atmosphere to suspend the connection and use this Broadcaster:


return new SuspendResponse.SuspendResponseBuilder<String>()
                .broadcaster(feed)
                .outputComments(true)
                .addListener(new EventsLogger())
                .build();

Finally we just broadcast the result as it is so our client can use JSON to read the response. We do store the Future returned so later we can stop the real time Broadcast:

    @GET
    @Path("stop")
    public String stopSearch(
           final @PathParam("tagid") Broadcaster feed,
           final @PathParam("tagid") String tagid) {                 

            // Resume all connections associated with an hashtag
            feed.resumeAll();
            futures.get(tagid).cancel(true);
            return "DONE";
        }
    }

To stop a real time search, we just issue /search/{#hashtag}/stop. That’s all we need to do on the server side. Our server side application is now able to receive Websocket, long-polling or http-streaming requests.

The client side

On the client side, you can use any existing javascript library supporting websocket or comet to query the real time API. It is as simple as

/search/#hasttag                for subscribing to real time update (try it with WebSocket!)

/search/#hashtag/stop    to stop the real time update

But the easiest way is to use the Atmosphere’s JQuery Plugin (offfff course :-)), which support Websocket and Comet. More important, the Plugin is able to detect the best transport to use based on what the client and the server supports. As an example, if the application is deployed in Tomcat and you use Chrome, you can let the Plugin find the best transport or specify the one you want to use it.  This is as simple as:


   $.atmosphere.subscribe(document.location.toString() + 'search/' + hashtag              
              callback,
              $.atmosphere.request = {transport: 'Websocket'});

What we do above is to invoke the subscribe method by passing the URL containing the hashtag, a callback and some request properties. The Plugin will invoke the callback as soon as the real time search starts on the server. The callback looks like:


function callback(response) {
   if (response.transport != 'polling' &&
           response.state != 'connected' &&
           response.state != 'closed') {

           if (response.status == 200) {
               var data = response.responseBody;

               try {
                    var result =  $.parseJSON(incompleteMessage + data);  
                    incompleteMessage = "";

                     var i = 0;
                     for (i = result.results.length -1 ; i > -1; i--){
                          $('ul').prepend($('<li></li>').text("["
                               + response.transport + "] "
                               + result.results[i].from_user + " "
                               + result.results[i].text));
                   }
              } catch (err) {
                    incompleteMessage = data;
              }

The critical piece of code above is the parseJSON(incompleteMessage + data);. The size of the the data send back by the server may vary between servers so we may not get the entire JSON object in one invocation of the callback, so we need to wrap that call inside a try/catch (since parseJSON will fail)  and make sure next time the callback gets invoked we append the previously received data. That scenario will happens only if you search for a popular hashtag and you get a large number of response (try #Nordiques !!!).

That’s it for the client side. You can download the sample here and deploy it in any webserver supporting comet and or websocket or none of them!! The interface is simple and demonstrate the transport’s auto detection.

Any cool and better designed interface welcomed :-) Finally, you can get more information about this sample by reading my JavaOne talk.

For any questions or to download Atmosphere Client and Server Framework, go to our main site and use our Nabble forum (no subscription needed), or follow the team or myself and tweet your questions there! You can also checkout the code on Github.

Follow

Get every new post delivered to your Inbox.

Join 50 other followers