Archive

Archive for November, 2008

Writing a Twitter like application using Grizzly Comet part 1: The Servlet

November 30, 2008 2 comments

Twitter is more and more popular and I’ve decided to write my own Twitter using Grizzly Comet. The result is amazing: 150 lines of Java code and an amazing grizzly transformed into a bird!

IMG_0126.JPG

Grizzly Comet is a framework build on top of the Grizzly. With Grizzly Comet, you can create powerful asynchronous application. Grizzly Comet support:

  • Asynchronous read and write: read or write without blocking, waiting for an images to be fully uploaded or written. Instead, let the framework notify you when an async event is ready. This is particularly useful when your application needs to upload files.
  • Long Polling: open a connection from the browser and wait for the server to push data only when available. That means the connection stay suspended until an event happens on the server side.
  • Streaming: Same as long polling, but never resume the connection. When an event happens, leave the connection suspended “forever”.
  • Grouping of suspended connections: You can group suspended connection, and manipulate a groups instead of single connection. That makes building a Grizzly Comet application quite simple.
  • Filtering/Aggregating/Throttling events per connection or group. Under high load, it might be better to aggregate events instead of sending them one by one (might be faster). Event can also be filtered or throttled before they get written on a suspended connection. When pushing event to a group, it is always important to make sure the push operations is not a bottleneck. Grizzly Comet ships with such mechanism, significantly improving performance of your asynchronous application

OK Enough charabia! Let’s demonstrate the power of the framework by re-writing Twitter and make it a full real time and asynchronous application. No need to refresh the page anymore. In this first part, I will explain the server side…the client will comes next, but get ready, I’m far from an expert with JavaScript :-). But first, let’s see what the end results will looks like…all of this using a single Servlet!

MainScreen.png

Now let’s deep dive into the monster’s Comet API

CometEngine: The CometEngine is the entry point to the framework. The first steps when writing a Grizzly Comet application is to first create a ‘group’ or ‘topic’ object that can be used to suspend, share, filter, throttle, aggregate and resume connections:

    /**
     * Create a {@link CometContext}
     * @param id - The topic associated with the {@link CometContext}
     * @return {@link CometContext}
     */
    private CometContext createCometContext(String id){
        CometEngine cometEngine = CometEngine.getEngine();
        CometContext ctx = cometEngine.register(id);
        ctx.setExpirationDelay(FIVE_MINUTES_TIMEOUT);
        return ctx;
    }

In the above, we grab the static instance of CometEngine and call register(id) to create a CometContext, an object representing suspended connection based on a topic. Note that CometContext can be created from everywhere like EJB, POJo, etc. This is quite important if you are planning to push events from non web components

CometContext: The most important object of the Grizzly Comet Framework. A CometContext represents a group of suspended connections. From a CometContext you can push events, define your own mechanism of filtering/aggregating/throttling, suspend and resume connection:

                // Create a CometContext based on this session id.
                twitterContext = 
                        createCometContext(sessionId);
                
                // Create and register a CometHandler.
                ReflectorCometHandler handler = new ReflectorCometHandler
                        (true,startingMessage,endingMessage);
                
                handler.attach(response.getWriter());
                twitterContext.addCometHandler(handler);
                
                // Keep a reference to us so we can be updated directly.
                twitterContext.addAttribute("twitterHandler", handler);                

In the code above, we first create a CometContext, then add a ReflectorCometHandler (more on this below) and add some attributes. Invoking addCometHandler(handler) automatically suspend the connection.

CometHandler: This interface represents your suspended connection. Defining a CometHandler allow a web application to handle the lifecycle of a suspended connection. Events are pushed to a CometHandler as soon as they occurs. Event like when the connection get suspended (onInitialize), when server event are pushed (onEvent), when your application decide to resume the connection (onTerminate) or when the client close a suspended connection, or the connection was idle for X times (onInterrupt). The most important method is the onEvent, where you are usually define what you will do with the event, e.g. store it, write it, discard it etc. As a very simple example (this is the one used by this Twitter application), Grizzly Comet ships with the RefectorCometHandler, which does nothing except writing all messages it gets:

    /**
     * Write {@link CometEvent#attachment} and resume the connection if 
     * {@link ReflectorCometHandler#useStreaming} is false
     * @param event
     * @throws java.io.IOException
     */
    public synchronized void onEvent(CometEvent event) throws IOException {
        try {
            if (event.getType() != CometEvent.READ) {
                printWriter.println(event.attachment());
                printWriter.flush();
                
                if (!useStreaming){
                    event.getCometContext().resumeCometHandler(this);
                }
            }
        } catch (Throwable t) {
            throw new IOException(t.getMessage());
        }
    }

But how CometHandler gets invoked? CometHandler gets invoked when an application invoke CometContext.notify(CometEvent). As an example, let’s take explain how a chat application works. First, users (browsers) enter a chat room, waiting for message. As soon as they enter the chatroom, a Grizzly Comet implementation will invoke CometContext.addCometHandler(). That means those connections are suspended, waiting for events. If a user enter some message, the way to share (or push) that information back to the users is by invoking CometContext.notify(“Salut”). Automatically the CometHandler.onEvent() will be called, and if the ReflectorCometHandler described above is used, then the message will be directly written to the suspended connection. In effect, that operation will push back the message to the client. You can always filter messages before they reach CometHandler (details in part III)

So, four simple steps:

  • CometEngine.register(“topic|group”) to create the group
  • CometHandler ch = new CometHandler() to prepare for suspending the request/response
  • CometContext.addCometListener(CometHandler) to suspend the connection
  • CometContext.notify(Message) to push message to the group or CometContext.resumeCometHandler(ch) to resume the connection, or in short to commit the response

And now, Twitter Twitter Twitter

Let’s first define what the application will do:

  • First, the user will sign in.
  • Welcome.png

  • Next, the user is now able to micro blog (add updates and push them to peoples registered (followers) to receives such updates).
  • MainScreen-JF.png

  • User can follow another user’s micro blogs/updates by entering their name inside the ‘follow field’
  • AlexeyFollowingJan.png

  • To be smarter than Twitter, we will allow the user to gets update from the person the follow directly on the screen
  • JanUpdates.png

  • To make the application more cool that the original Twitter.com, we will allow users to move their blogs/updates on the screen, via the JMaki Comet Extension

Moving.png

So let’s go steps by steps on how we can build a Twitter like application. For Twitter, we will use a single TwitterServlet, and we will define the basic as:

    /**
     * Grab an instance of {@link ServletContext}
     * @param config
     * @throws javax.servlet.ServletException
     */
    @Override
    public void init(ServletConfig config) throws ServletException {
        super.init(config);
        servletContext = config.getServletContext();
    }

    /**
     * Same as {@link TwitterServlet#doPost}
     * 
     * @param request
     * @param response
     * @throws javax.servlet.ServletException
     * @throws java.io.IOException
     */
    @Override
    public void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        doPost(request, response);
    }

All the logic happens inside the doPost.

First, sign in Twitter:

I will talk in more details in part II about how the client works, so let’s just describe the basic. Below is the code that allow a user to sign in

   login: function() {
      var name = $F('login-name');
      if(! name.length > 0) {
	 $('system-message').style.color = 'red';
	 $('login-name').focus();
	 return;
      }
      $('system-message').style.color = '#2d2b3d';
      $('system-message').innerHTML = '';

      $('login-button').disabled = true;
      $('login-form').style.display = 'none';
      $('message-form').style.display = '';
      $('follower').style.display = '';

      var query =
	 'action=login' +
	 '&name=' + encodeURI($F('login-name'));
      new Ajax.Request(app.url, {
	 postBody: query,
	 onSuccess: function() {
	    $('message').focus();
	 }
      });
   },

On the server side, this is as simple as:

    /**
     * Based on the {@link HttpServletRequest#getParameter} action value, decide
     * if the connection needs to be suspended (when the user logs in) or if the 
     * {@link CometContext} needs to be updated (by the user or by its follower.
     * 
     * There is one {@link CometContext} per suspended connection, representing 
     * the user account. When one user B request to follow user A, the {@link CometHandler}
     * associated with user B's {@link CometContext} is also added to user A
     * {@link CometContext}. Hence when user A push message ({@link CometContext.notify()}
     * all {@link CometHandler} gets the {@link CometEvent}, which means user B
     * will be updated when user A update its micro blog.
     * 
     * The suspended connection on the client side is multiplexed, e.g. 
     * messages sent by the server are not only for a single component, but
     * shared amongs several components. The client side include a message board
     * that is updated by notifying the owner of the {@link CometContext}. This
     * is achieved by calling {@link CometContext.notify(CometEvent,CometHandler)}
     * 
     * @param request
     * @param response
     * @throws javax.servlet.ServletException
     * @throws java.io.IOException
     */
    @Override
    public void doPost(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

        String action = request.getParameter("action");                               
        String sessionId = request.getSession().getId();
        HttpSession session = request.getSession();
        CometContext twitterContext = (CometContext) session.getAttribute(sessionId);
        if (action != null) {

In short, we grab the action value from the doPost, and also we look for a CometContext. The CometContext here represents the user blogs (or update), and all CometHandlers added to this “user” CometContext will receive updates when the user micro blogs/updates his page. Of course on the first request the CometContext is null as their is no suspended connections. On the client/broser side, the first action sent is ‘start’ (when the page is loaded), and this is where we will suspend the connection:

            } else if ("start".equals(action)) {
                String message = "{ message : 'Welcome'}";              
                response.setContentType("text/html");
                String callback = request.getParameter("callback");
                if (callback == null) {
                    callback = "alert";
                }
                            
                response.getWriter().println("<script id='comet_" + counter++ + "'>" 
                        + "window.parent." + callback + "(" + message + ");</script>");

                // Create a CometContext based on this session id.
                twitterContext = 
                        createCometContext(sessionId);
                
                // Create and register a CometHandler.
                ReflectorCometHandler handler = new ReflectorCometHandler
                        (true,startingMessage,endingMessage);
                
                handler.attach(response.getWriter());
                twitterContext.addCometHandler(handler);
                
                // Keep a reference to us so we can be updated directly.
                twitterContext.addAttribute("twitterHandler", handler);
                
                session.setAttribute("handler", handler);
                session.setAttribute(sessionId, twitterContext);
                return;

The most important code above is when we create the CometContext (based on the session id), then create a ReflectorCometHandler that gets invoked when it is time to write back to the browser updates made to the CometContext. The CometContext as described above. Next, we suspend the connection/response by invoking addCometHandler. We use the session to store the CometContext and also the CometHandler representing the user suspended connection. As soon as the user sign in:

            /*
             * Notify the submitter, via its CometHandler, that it has just logged in.
             */
            if ("login".equals(action)) {
                response.setContentType("text/plain");
                response.setHeader("Cache-Control", "private");
                response.setHeader("Pragma", "no-cache");
                response.setCharacterEncoding("UTF-8");
                               
                String name = request.getParameter("name");              
                if (name == null) {
                    logger.severe("Name cannot be null");
                    return;
                }
                
                session.setAttribute("name", name);              
                CometHandler ch = (CometHandler)session.getAttribute("handler");
                twitterContext.notify(BEGIN_SCRIPT_TAG
                        + toJsonp("Welcome back", name) 
                        + END_SCRIPT_TAG, CometEvent.NOTIFY, ch);
                
                // Store the CometContext associated with this user so
                // we can retrieve it for supporting follower.
                servletContext.setAttribute(name, twitterContext);

From the user’s name, we push our first message back to this user using the suspended connection. As we noted above, we push the message using the CometContext.notify, which in turn will invoke the ReflectorCometHandler.onEvent(), which will write the message back to the client. Now the user is ready to micro blog/updates.

Time to micro blog using Grizzzly!!!!!

As soon as the user update his status, the client sent:

            } else if ("post".equals(action)) {
                String message = request.getParameter("message");
                String callback = request.getParameter("callback");
                
                if (message == null) {
                    logger.severe("Message cannot be null");
                    return;
                }
                
                if (callback == null) {
                    callback = "alert";
                }

                if (twitterContext != null){
                    // Notify other registered CometHandler.
                    twitterContext.notify("<script id='comet_" + counter++ + "'>" 
                            + "window.parent." + callback + "(" + message + ");</script>");
                }
                response.getWriter().println("ok");
                return;

This looks like way to simple, right? For any update, we just need to get the CometContext and invoke notify on it, and BINGO, all our followers will be updated in REAL-TIME!. What is a follower? A follower is another users that want to get updated when another user enter a new micro blog. Followers register themselves by entering the name of the user they want to follow:

            } else if ("following".equals(action)) {
                response.setContentType("text/html");
                String message = request.getParameter("message");
                String name = (String)session.getAttribute("name");
                
                // Retrive the user CometContext.
                CometContext followerContext 
                        = (CometContext) servletContext.getAttribute(message);
    
                                
                CometHandler ch = (CometHandler)session.getAttribute("handler");
                if (followerContext == null){
                  twitterContext.notify(BEGIN_SCRIPT_TAG
                        + toJsonp("Invalid Twitter user ", message) 
                        + END_SCRIPT_TAG, CometEvent.NOTIFY, ch);
                  return;
                }

                followerContext.addCometHandler(ch, true);
                
                twitterContext.notify(BEGIN_SCRIPT_TAG
                        + toJsonp("You are now following ", message) 
                        + END_SCRIPT_TAG, CometEvent.NOTIFY, ch);
                
                CometHandler twitterHandler = 
                        (CometHandler)followerContext.getAttribute("twitterHandler");
                followerContext.notify(BEGIN_SCRIPT_TAG
                        + toJsonp(name, " is now following " + message)
                        + END_SCRIPT_TAG, CometEvent.NOTIFY, twitterHandler);               
                return;
            }

The ‘name’ is the user which want to follow another user, which is represented by the ‘message’. First, we get the CometContext representing the user we want to follow, and add our CometHandler to it. That means that every time the followerContext ill be updated, we will also get the update via our CometHandler. Too cool!. We also update our CometContext by pushing a message telling our follower that we are now following a new user. We also update the followerContext by pushing a message saying we are now following that user.

THAT’S IT!!!

Yes, that the only steps you have to write in order to build a Twitter like application using Grizzly Comet. One thing to remember is that everything happens using a CometContext. From that object, you can notify/filter/aggregate/ etc. a set of suspended connection represented by a CometHandler. In the current example I’m using the ReflectorCometHandler, but you can write your own by implementing the CometHandler interface. Now ready to try it? Two possibilities. For development, I recommend you use the embedded Grizzly Comet Server and just do

% java - jar grizzly-comet-webserver-1.9.0.jar -p 8080 -a \
./grizzly-twitter.war com.sun.grizzly.samples.twitter.TwitterServlet

For production or if you need to uses Eclipse or Netbeans, download GlassFish v3 and deploy the grizzly-twitter.war application like any other web application. The application and source can be downloaded here. If you want to improve it or contribute, you are welcome to join the Grizzly community!

OK next time I will explain in details how the client works (at least I will try :-)). Please post your questions on users@grizzly.dev.java.net so the Grizzly community can help and respond 🙂

_uacct = “UA-3111670-1”;
urchinTracker();

technorati:

Advertisements
Categories: Uncategorized

Tricks and Tips with AIO part 1: The frightening thread pool

OK it is now time to start our NIO.2 (Asynchronous I/O) expedition with the Thread pool. Bouuu dead lock are watching you! If you are new with NIO, I recommend you take a look at my series on NIO.1(1,2,3,4,5,6) before jumping into NIO.2

IMG_2618.JPG

One of the nice thing you can do with AIO is to configure yourself the thread pool the kernel will uses to invoke a completion handler. A completion handler is an handler for consuming the result of an asynchronous I/O operation like accepting a remote connection or reading/writing some bytes. So an asynchronous channels (with NIO.1 we had SelectableChannel) allow a completion handler to be specified to consume the result of an asynchronous operation. The interface define three “callback”:

  • completed(…): invoked when the I/O operation completes successfully.
  • failed(…): invoked if the I/O operations fails (like when the remote client close the connection).
  • cancelled(…): invoked when the I/O operation is cancelled by invoking the cancel method.

Below is an example (I will talk about it it much more details in part II) of how you can open a port and listen for requests:

// Open a port 
final AsynchronousServerSocketChannel listener = 
    AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(port));
        
// Accept connections
listener.accept(null, new CompletionHandler<AsynchronousSocketChannel,Void>() {
    public void completed(AsynchronousSocketChannel channel,Void> result) {...}
    public void cancelled(AsynchronousSocketChannel channel,Void> result) {...}
    public void failed(AsynchronousSocketChannel channel,Void> result) {...}  
}

Now every time a connection is made to the port, the completed method will be invoked by a kernel’s thread. Do you catch the difference with NIO.1? To achieve the same kind of operation with NIO.1 you would have listen for requests by doing:

selector.select(timeout);
...
while(readyKeys.hasNext()){
  SelectionKey key = iterators.next();
  if (key.isAcceptable()){
    // Do something that doesn't block
    // because if it blocks, no more connection can be 
    // accepted as the selector.select(..) 
    // cannot be executed
  }
}

With AIO, the kernel is spawning the thread for us. Where this thread coming from? This is the topic of this Tricks and Tips with AIO.

By default, applications that do not create their own asynchronous channel group will use the default group that has an associated thread pool that is created automatically. What? The kernel will create a thread pool and manage it for me? Might be well suited for simple application, but for complex applications like the Grizzly Framework, relying on an ‘external’ thread pool is unthinkable as most of the time the application embedding Grizzly will configure its thread pool and pass it to Grizzly. Another reason is Grizzly has its own WorkerThread implementation that contains information about transactions (like ByteBuffer, attributes, etc.). At least the monster needs to be able to set the ThreadFactory!.

Note that I’m not saying using the kernel’s thread pool is wrong, but for Grizzly, I prefer having full control of the thread pool. So what’s my solution? There is two solutions, et c’est parti:

Fixed number of Threads (FixedThreadPool)

An asynchronous channel group associated with a fixed thread pool of size N creates N threads that are waiting for already processed I/O events. The kernel dispatch event directly to those threads, and those thread will first complete the I/O operation (like filling a ByteBuffer during a read operation). Once ready, the thread is re-used to directly invoke the completion handler that consumes the result. When the completion handler terminates normally then the thread returns to the thread pool and wait on a next event. If the completion handler terminates due to an uncaught error or runtime exception, the thread is returned to the pool and wait for new events as well (no thread are lost). For those cases, the thread is allowed to terminate (a new event is submitted to replace it). The reason the thread is allowed to terminate is so that the thread (or thread group) uncaught exception handler is executed.

So far so good? ….NOT. The first issue you must be aware when using fixed thread pool is if all threads “dead lock” inside a completion handler, your entire application can hangs until one thread becomes free to execute again. Hence this is critically important that the completion handler’s methods complete in a timely manner so as to avoid keeping the invoking thread from dispatching to other completion handlers. If all completion handlers are blocked, any new event will be queued until one thread is ‘delivered’ from the lock. That can cause a really bad situation, is it? As an example, using a Future when waiting for a read operation to complete can lock you entire application:

        Future result = ((AsynchronousSocketChannel)channel).read
                   (byteBuffer,...,myCompletionHandler);

        try{
            count = result.get(30, TimeUnit.SECONDS);
        } catch (Throwable ex){
            throw new EOFException(ex.getMessage());
        }

Like for OP_WRITE, I’m pretty sure nobody will ever code something like that, right? Well, some application needs to blocks until all the bytes are arrived (a Servlet Container is a good example) and if you don’t paid attention, your server might hangs. Not convinced? Another example could be:


        channel.write(bb,30,TimeUnit.SECONDS,db_pool,new CompletionHandler() {

            public void completed(Integer byteWritten, DataBasePool attachment) {
                // Wait for a jdbc connection, blocking.
                MyDBConnection db_con = attachment.get(); 
            }

            public void failed(Throwable exc, DataBasePool attachment) {
            }

            public void cancelled(DataBasePool attachment) {
            }
        }); 

Again, all threads may dead lock waiting for a database connection and your application might stop working as the kernel has no thread available to dispatch and complete I/O operation.

Grrr what’s our solution? The first solution consists to carefully avoid blocking operations inside a completion handler, meaning any threads executing a kernel event must never block on something. I suspect this will be simple to achieve if you write an application from zero and you want to have a fully asynchronous application. Still, be careful and make sure you properly create enough threads. How you do that? Here is an example from Grizzly:


ThreadPoolExecutorServicePipeline executor = new ThreadPoolExecutorServicePipeline
      (corePoolThreads,maxThreads,8192,30,TimeUnit.SECONDS);
AsynchronousChannelGroup asyncChannelGroup = AsynchronousChannelGroup
       .withFixedThreadPool(executor,maxThreads);

The second solution is to use a cached thread pool

Cached Thread Pool Configuration

An asynchronous channel group associated with a cached thread pool submits events to the thread pool that simply invoke the user’s completion handler. Internal kernel’s I/O operations are handled by one or more internal threads that are not visible to the user application. Yup! That means you have one hidden thread pool (not configurable via the official API, but as a system property) that dispatch events to a cached thread pool, which in turn invokes completion handler (Wait! you just win a price: a thread’s context switch for free ;-). Since this is a cached thread pool, the probability of suffering the hangs problem described above is lower. I’m not saying it cannot happens as you can always create cached thread pool that cannot grows infinitely (those infinite thread pool should have never existed anyway!). But at least with cached thread pool you are guarantee that the kernel will be able to complete its I/O operations (like reading bytes). Just the invocation of the completion handler might be delayed when all the threads are blocked. Note that a cached thread pool must support unbounded queueing to works properly. How you do set a cached thread pool? Here is an example from Grizzly:


ThreadPoolExecutorServicePipeline executor = new ThreadPoolExecutorServicePipeline
      (corePoolThreads,maxCachedThreadPoolSize,8192,
            30,TimeUnit.SECONDS);
AsynchronousChannelGroup asyncChannelGroup = AsynchronousChannelGroup
       .withCachedThreadPool(executor,maxCachedThreadPoolSize);

What about the default that ship with the kernel?

If you do not create your own asynchronous channel group, then the kernel’s default group that has an associated thread pool will be created automatically. This thread pool is a hybrid of the above configurations. It is a cached thread pool that creates threads on demand, and it has N threads that dequeue events and dispatch directly to the application’s completion handler. The value of N defaults to the number of hardware threads but may be configured by a system property. In addition to N threads, there is one additional internal thread that dequeues events and submits tasks to the thread pool to invoke completion handlers. This internal thread ensures that the system doesn’t stall when all of the fixed threads are blocked, or otherwise busy, executing completion handlers.

What’s next

If you have one thing to learn from this part I is independently of which thread pool you decide to use (default, cached or fixed), make sure you at least limit blocking operations. This is specially true when a fixed thread pool is used. Now curious about which thread pool perform the best in Grizzly? Well, wait for the monster to be unleashed during Devoxx 2008! Finally, the AIO implementation in Grizzly can be browsed online (here, here, here and here) (or better, download the code and build on top of it) and our upcoming release, 1.9.0, will ship with two OSGi bundles (framework and http) to be embedded where the JDK 7 lives 🙂

P.S Some wording of this blog are from Alan Bateman’s note

_uacct = “UA-3111670-1”;
urchinTracker();

technorati:

Categories: Uncategorized