Home > Uncategorized > Tricks and Tips with NIO part IV: Meet the Selectors

Tricks and Tips with NIO part IV: Meet the Selectors

When building scalable NIO based server, it is important to not restrict your server to use a single Selector. Multiples Selectors can always be used to handle OP_ACCEPT, OP_READ and OP_WRITE to avoid overloading the main Selector.

Using multiples Selectors to load balance OP_ACCEPT and OP_READ

Usually, an NIO based server will do the following:

 

            serverSocketChannel = ServerSocketChannel.open();
            selector = Selector.open();

            serverSocket = serverSocketChannel.socket();
            serverSocket.setReuseAddress(true);
            if ( inet == null)
                serverSocket.bind(new InetSocketAddress(port),
                   ssBackLog);
            else
                serverSocket.bind(new InetSocketAddress(inet,port),
                                  ssBackLog);

            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

 

Mainly, you create an instance of ServerSocketChannel, get a Selector instance (called the main Selector), bind it to a dedicated port, configure the ServerSocketChannel non blocking, and then register the Selector for OP_ACCEPT interest key. Latter when processing the OP_ACCEPT, you register the SocketChannel to the previously created Selector

 

    protected void handleAccept(SelectionKey key) throws IOException{
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel channel = server.accept();

        if (channel != null) {
           channel.configureBlocking(false);
           SelectionKey readKey =
                  channel.register(selector, SelectionKey.OP_READ);
           setSocketOptions(((SocketChannel)readKey.channel()).socket())
        }
        ....

 

Although this not clearly stated inside the NIO API documentation, you don’t need to register the SocketChannel using the “main” Selector. Instead, you might want to load balance amongst several Selectors:

 

        SlaveSelector[] selectors = new SlaveSelector[selectorCount];
        for(int i=0; i < selectorCount; i++){
              SlaveSelector slave = new SlaveSelector();
              slave.start();
              selectors[i] = slave;
        }

 

Where the SlaveSelector class looks like:

 

public class SlaveSelector extends Thread{

    private Selector selector;

    public SlaveSelector throws IOException{
         selector = Selector.open();
    }

    /**
     * List of Channels to process.
     */
    ArrayList channels = new ArrayList();

    /**
     * Add a Channel to be processed by this
     * Selector
     */
    public synchronized void addChannel(SocketChannel channel)
            throws IOException, ClosedChannelException {
        channels.add(channel);
        selector.wakeup();
    }

    /**
     * Register all Channels with an OP_READ opeation.
     */
    private synchronized void registerNewChannels() throws IOException {
        int size = channels.size();
        for (int i = 0; i < size; i++) {
            SocketChannel sc = channels.get(i);
            sc.configureBlocking(false);
            try {
                SelectionKey readKey =
                        sc.register(selector, SelectionKey.OP_READ);
                setSocketOptions(((SocketChannel)
                                  readKey.channel()).socket());
            } catch (ClosedChannelException cce) {
            }
        }
        channels.clear();
    }
    .....

 

Thus, when handling OP_ACCEPT, instead of using the main Selector, you register using one “slave” Selector:

 

    protected void handleAccept(SelectionKey key) throws IOException{
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel channel = server.accept();

        if (channel != null) {
            SlaveSelector slave = getSlaveSelector();
            slave.addChannel(channel);
        }
    }

    private synchronized SlaveSelector getSlaveSelector() {
        if (curSlave == slaveSelectors.length)
            curSlave = 0;
        return slaveSelectors[curSlave++];
    }

 

The Grizzly implementation of this trick can be found here. Note that distributing the “load” should be carefully evaluated, because the cost of maintaining multiple Selectors can reduce scalability instead of improving it. At least an NIO server should have an option to enable such functionality.

pont.jpg

 

Using a pool of Selectors for handling OP_READ and OP_WRITE

 

The next trick is also used in Grizzly. When Grizzly handles OP_READ, it always delegate the SocketChannel read(s) processing to a thread pool. Grizzly have several strategies to determine if all the HTTP header bytes have been fully read from the SocketChannel. One of the available strategy is to read the available bytes without trying to determine if all the bytes have been read or not, but instead use a pool of Selectors to help in case bytes still need to be read.

The way I did it in Grizzly is by wrapping a ByteBuffer inside a ByteBufferInputStream:

 

public class ByteBufferInputStream extends InputStream {
    /**
     * The wrapped ByteBuffer
     */
    protected ByteBuffer byteBuffer;

    public ByteBufferInputStream (final ByteBuffer byteBuffer) {
        this.byteBuffer = byteBuffer;
    }

    /**
     * Read bytes from the wrapped ByteBuffer.
     */
    public int read (byte[] b, int offset, int length)
            throws IOException {
        if (!byteBuffer.hasRemaining()) {
            int eof = 0;
            for (int i=0; i < readTry; i++) {
                eof = doRead();

                if ( eof != 0 ){
                    break;
                }
            }

            if (eof  byteBuffer.remaining()) {
            length = byteBuffer.remaining();
        }
        byteBuffer.get(b, offset, length);

        return (length);
    }

    .....    

    /**
     * Read bytes using the read a temporary Selector
     */
    protected int doRead() throws IOException{
        if ( key == null ) return -1;

        byteBuffer.clear();
        int count = 1;
        int byteRead = 0;
        Selector readSelector = null;
        SelectionKey tmpKey = null;

        try{
            SocketChannel socketChannel =
                   (SocketChannel)key.channel();
            while (count > 0){
                count = socketChannel.read(byteBuffer);// [1]
                if ( count > -1 )
                    byteRead += count;
                else
                    byteRead = count;
            }            

            if ( byteRead == 0 ){
                readSelector = SelectorFactory.getSelector(); //[2]

                if ( readSelector == null ){
                    return 0;
                }
                count = 1;
                tmpKey = socketChannel
                        .register(readSelector,SelectionKey.OP_READ);
                tmpKey.interestOps(
                     tmpKey.interestOps() | SelectionKey.OP_READ);
                int code = readSelector.select(readTimeout);
                tmpKey.interestOps(
                    tmpKey.interestOps() & (~SelectionKey.OP_READ));

                if ( code == 0 ){
                    return 0;
                    // Return on the main Selector and try again.
                }

                while (count > 0){
                    count = socketChannel.read(byteBuffer);
                }
            }
        } finally {
            if (tmpKey != null)
                tmpKey.cancel();

            if ( readSelector != null){
                // Bug 6403933
                try{
                    readSelector.selectNow();
                } catch (IOException ex){
                    ;
                }
                SelectorFactory.returnSelector(readSelector);
            }
        }
        byteBuffer.flip();
        return byteRead;
    }
}

 

First, socketChannel.read(byteBuffer) is executed until it return 0. Then the byteBuffer is wrapped by the ByteBufferInputStream and “passed” to the HTTP Parser class. The HTTP Parser will try to parse the http request line and headers with the bytes available inside the byteBuffer. If the HTTP Parser class ask for more bytes, being unable to parse all the headers, and the socketChannel.read() ([1] in the code above) return 0 (meaning no bytes have been read), internally I register the socketChannel on a different Selector by getting a temporary Selector from a pool of ready to use Selectors [2]. Then, using the temporary Selector, try to read the missing bytes (or at least more bytes).

There is several advantages when doing this. First, you don’t need to go back to the main Selector, which most probably run on another Thread, by attaching the current state of the processing to the SelectionKey. Second, when the main Selector is ready (bytes are available), re-process the OP_READ by retrieving the previous state of the ByteBuffer, getting another Thread from the pool, and try to see if this time all the bytes are available.

I know I know, on slow network (or strange client) it might be better to go back to the main Selector instead of holding a Thread, waiting for the temporary Selector to read the missing bytes. Hence uses this trick carefully by properly configuring the time before Selector.select(..) times out, to avoid holding a Thread.

The same trick can be applied when handing OP_WRITE:

 

        try {
            while ( bb.hasRemaining() ) {
                int len = socketChannel.write(bb);
                attempts++;
                if (len  2)
                            throw new IOException("Client disconnected");
                    } else {
                        attempts--;
                    }
                } else {
                    attempts = 0;
                }
            }
        } finally {
            if (key != null) {
                key.cancel();
                key = null;
            }

            if ( writeSelector != null ) {
                // Cancel the key.
                writeSelector.selectNow();
                SelectorFactory.returnSelector(writeSelector);
            }
        }

 

Same as for OP_READ, you need to make sure the Selector.select(..) is carefully configured.

 

C’est le temps des vacances!

 

OK, as usual, thanks for the feedback! I will take a couple of weeks out of this world, avoiding NIO monsters and Grizzly bugs, so you have a break of me…until I come with part V, which will consist of explaining a couple of workarounds when boom! the VM crash, or oups!, why do I get “javax.net.ssl.SSLHandshakeException: no cipher suites in common” when NIO + SSL play together.

technorati:

_uacct = “UA-3111670-1”;
urchinTracker();

Categories: Uncategorized
  1. No comments yet.
  1. No trackbacks yet.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: