Scalable I/O with NIO

We’ll now conclude the discussion of the NIO package we began in Chapter 12 by talking about nonblocking and selectable network communications. All our server examples in this chapter thus far have used a thread-bound pattern (one thread per I/O operation). In Java, this is very natural because of the ease with which we can create threads. It’s also very efficient, within limits. Problems arise when you try to build very large-scale servers using this style of client handling. While on a large machine it’s certainly possible to have hundreds or even thousands of threads (especially if they’re mostly idle, waiting for I/O), this is a resource-hungry solution. Every thread you start in Java consumes memory for its internal stack, and the performance of managing this number of threads is highly system-dependent.

An alternate approach is to take a lesson from the old, dark days before threading was available and use nonblocking I/O operations to manage numerous communications from a single thread. Better yet, our server uses a configurable pool of threads, taking advantage of machines with many processors.

At the heart of this process is the concept of selectable I/O. It’s not good enough to simply have nonblocking I/O operations if you have no way to efficiently poll for work to be done. The NIO package provides for efficient polling using selectable channels. A selectable channel allows for the registration of a special kind of listener called a selector that can check the readiness of the channel for operations, such as reading and writing or accepting or creating network connections.

The selector and the selection process are not typical Java listeners of the kind we’ll see elsewhere in this book, but instead rather slavishly follow the conventions of C language systems. This is mainly for performance reasons; because this API is primarily intended for high-volume servers, it is bound very tightly to the traditional, underlying operating system facilities with less regard for ease of use. This, combined with the other details of using the NIO package, mean that this section is somewhat dense and the server we create here is one of the longer and more complex examples in the book. Don’t be discouraged if you are a bit put off by this section. You can use the general techniques earlier in this chapter for most applications and reserve this knowledge for creating services that handle the very highest volumes of simultaneous client requests.

Selectable Channels

A selectable channel implements the SelectableChannel interface, which specifies that the channel can be set to a nonblocking mode and that it supports the select API that makes efficient polling possible. The primary implementations of selectable channels are those for working with the network: SocketChannel, ServerSocketChannel, and DatagramChannel. The only other selectable channel is the Pipe (which can be used in an analogous way for intra-VM communication).

At the heart of the process is the Selector object, which knows about a particular set of selectable channels and provides a select() method for determining their readiness for I/O operations. Conceptually, the process is simple; you register one or more channels with a selector and then poll it, asking it to tell you which set of channels is ready to go. In actuality, there are a few additional pieces involved.

First, the Selector does not work directly with channels but instead operates on SelectionKey objects. A SelectionKey object is created implicitly when the channel is registered with the Selector. It encapsulates the selectable channel as well as information about what types of operations (e.g., read, write) we are interested in waiting for. That information is held in the SelectionKey in a set of flags called the interest set, which can be changed by the application at any time. SelectionKeys are also used to return the results of a select operation. Each call to select() returns the number of SelectionKeys that are ready for some type of I/O. The keys are then retrieved with the selectedKeys() method. Each key also has a set of flags called the ready set that indicates which operation of interest is actually ready (possibly more than one). For example, a SelectionKey interest set might indicate that we want to know when its channel is ready for reading or writing. After a select operation, if that key is in the set returned by the selector, we know that it is ready for one or more of those operations, and we can check the key’s ready set to find out which one.

Before we go on, we should say that although we have been saying that channels are registered with selectors, the API is (confusingly) the other way around. Selectors are actually registered with the one or more channels they manage, but it’s better to mentally spackle over this and think of them the other way around.

Using Select

A Selector object is created using the Selector.open() method (Selector uses a factory pattern):

    Selector selector = Selector.open();

To register one or more channels with the selector, set them to nonblocking mode:

    SelectableChannel channelA = // ...
    channelA.configureBlocking( false );

Next, register the channels:

    int interestOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
    SelectionKey key = channelA.register( selector, interestOps );

When we register the channel, we have an opportunity to set the initial interest operations (or “interest ops”). These are defined by constant fields in the SelectionKey class:

OP_READ

Ready to read

OP_WRITE

Ready to write

OP_CONNECT

Client-socket connection ready

OP_ACCEPT

Server-socket connection ready

These fields are bit flags; you can logically OR them together as in this example to express interest in more than one type of operation.

The result of the register() method is a SelectionKey object. We can use the key to change the interest ops at any time with the SelectionKey interestOps() method or to unregister the channel from the Selector with the key’s cancel() method.

This same key is also returned as the result of selection operations when its channel is ready. When the SelectionKey is returned, its ready set holds flags for the operations that do not block if called. We can retrieve the value of the flags with the readySet() method. Convenience methods are available to test for each operation in the ready set: isReadable(), isWritable(), isConnectable(), and isAcceptable().

Depending on how you structure your application, it may not be necessary to save the SelectionKey at registration time. In our example, we let the Selector keep track of the keys for us, simply using them when they are ready. In fact, we go even further and put the SelectionKey to work by asking it to hold a reference for us! The SelectionKey attach() method is a convenience method that can attach an arbitrary object to the key for use by our application. We show how this can be useful in a bit.

After one or more channels are registered with the Selector, we can perform a select operation using one of its select() methods:

    int readyCount = selector.select();

Without arguments, the method blocks until at least one channel is ready for some operation or until the Selector’s wakeup() method is called. Alternatively, you can use the form of select() that takes a timeout (in milliseconds) to wait for a ready channel before returning. There is also selectNow(), which always returns immediately. Both of these return methods count the number of ready channels.

You can use select() and wakeup() somewhat like wait() and notify(). The wakeup is necessary because once a selection is started, it will not see any changes to its key’s interest ops until the next invocation. If another thread changes the interest ops, it must use wakeup() to prompt the selecting thread to select() again. The Selector is also heavily synchronized; for example, calls to register new channels block until the select is finished. Often it’s much easier to simply use select with a short timeout and a loop, like this:

    while ( selector.select( 50 ) == 0 );

However, if another thread is allowed to change the interest ops, you still need to use wakeup() to maximize throughput. Otherwise, in the worst case, you could end up waiting the full select wait period on every iteration, even when there is work to be done.

Next, we can get the set of ready channels from the Selector with the selectedKeys() method and iterate through them, doing whatever our application dictates:

    Set readySet = selector.selectedKeys();
    for( Iterator it = readySet.iterator(); it.hasNext(); ) {
        SelectionKey key = (SelectionKey)it.next();
        it.remove();  // remove the key from the ready set
     // use the key
    }

The ready set is returned to us as a java.util.Set, which we walk through with an Iterator (see Chapter 1). One important thing to note is that we’ve used the Iterator’s remove() method to remove the key from the ready set. The select() methods add keys only to the ready set or add flags to keys already in the set; they never remove them, so we must clear the keys when we handle them. You can get the full set of keys a Selector is managing with the keys() method, but you should not attempt to remove keys from that set; use the cancel() method on individual keys instead. Or you can close the entire Selector with its close() method, unregistering all its keys.

LargerHttpd

Let’s put this information to use. In this section, we’ll create the big brother of TinyHttpd (our minimalist web server) called LargerHttpd. The LargerHttpd server is a nonblocking web server that uses SocketChannels and a pool of threads to service requests. In this example, a single thread executes a main loop that accepts new connections and checks the readiness of existing client connections for reading or writing. Whenever a client needs attention, it places the job in a queue where a thread from our thread pool waits to service it. As we said, this example is a bit longer than we would like, but it is really the minimum that is necessary to show a realistic usage of the APIs:

import java.io.*;
import java.util.*;
import java.util.concurrent.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.util.regex.*;

public class LargerHttpd
{
    Selector clientSelector;

    public void run( int port, int threads ) throws IOException 
    {
        clientSelector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        InetSocketAddress sa =  new InetSocketAddress( InetAddress
            .getLoopbackAddress(), port );
        ssc.socket().bind( sa );
        ssc.register( clientSelector, SelectionKey.OP_ACCEPT );
    
        Executor executor = Executors.newFixedThreadPool( threads );

        while ( true ) {
            try {
                while ( clientSelector.select(100) == 0 );
                Set<SelectionKey> readySet = clientSelector.selectedKeys();
                for(Iterator<SelectionKey> it=readySet.iterator();
                    it.hasNext();)
                {
                    final SelectionKey key = it.next();
                    it.remove();
                    if ( key.isAcceptable() ) {
                        acceptClient( ssc );
                    } else {
                        key.interestOps( 0 );
                        executor.execute( new Runnable() {
                          public void run() {
                            try {
                                handleClient( key );
                              } catch ( IOException e) {
                                System.out.println(e);
                              }
                          }
                        } );
                    }
                }
            } catch ( IOException e ) { System.out.println(e); }
        }
    }

    void acceptClient( ServerSocketChannel ssc ) throws IOException
    {
        SocketChannel clientSocket = ssc.accept();
        clientSocket.configureBlocking(false);        SelectionKey key =  clientSocket.register( clientSelector,
            SelectionKey.OP_READ );
        HttpdConnection client = new HttpdConnection( clientSocket );
        key.attach( client );
    }

    void handleClient( SelectionKey key ) throws IOException
    {
        HttpdConnection client = (HttpdConnection)key.attachment();
        if ( key.isReadable() ) {
            client.read( key );
        } else {
            client.write( key );
        }
        clientSelector.wakeup();
    }

    public static void main( String argv[] ) throws IOException {
        //new LargerHttpd().run( Integer.parseInt(argv[0]), 3/*threads*/ );
        new LargerHttpd().run( 1235, 3/*threads*/ );
    }
}

class HttpdConnection 
{
    static Charset charset = Charset.forName("8859_1");
    static Pattern httpGetPattern = Pattern.compile("(?s)GET /?(\\S*).*");
    SocketChannel clientSocket;
    ByteBuffer buff = ByteBuffer.allocateDirect( 64*1024 );
    String request;
    String response;
    FileChannel file;
    int filePosition;

    HttpdConnection ( SocketChannel clientSocket ) {
        this.clientSocket = clientSocket;
    }

    void read( SelectionKey key ) throws IOException {
        if ( request == null && (clientSocket.read( buff ) == -1 
                || buff.get( buff.position()-1 ) == '\n' ) )
            processRequest( key );
        else
            key.interestOps( SelectionKey.OP_READ );
    }

    void processRequest( SelectionKey key ) {
        buff.flip();
        request = charset.decode( buff ).toString();
        Matcher get = httpGetPattern.matcher( request );
        if ( get.matches() ) {
            request = get.group(1);
            if ( request.endsWith("/") || request.equals("") )
                request = request + "index.html";
            System.out.println( "Request: "+request);
            try {
                file = new FileInputStream ( request ).getChannel();
            } catch ( FileNotFoundException e ) {
                response = "404 Object Not Found";
            }
        } else
            response = "400 Bad Request" ;

        if ( response != null ) {
            buff.clear();
            charset.newEncoder().encode( 
                CharBuffer.wrap( response ), buff, true );
            buff.flip();
        }
        key.interestOps( SelectionKey.OP_WRITE );
    }

    void write( SelectionKey key ) throws IOException {
        if ( response != null ) {
            clientSocket.write( buff );
            if ( buff.remaining() == 0 ) 
                response = null;
        } else if ( file != null ) {
            int remaining = (int)file.size()-filePosition;
            long sent = file.transferTo( filePosition, remaining,
                clientSocket);
            if ( sent >= remaining || remaining <= 0 ) {
                file.close();
                file = null;
            } else
                filePosition += sent;
        } 
        if ( response == null && file == null ) {
            clientSocket.close();
            key.cancel();        
        } else 
            key.interestOps( SelectionKey.OP_WRITE );
    }
}

From a bird’s-eye view, the structure of LargerHttpd is the same as TinyHttpd. The main class, LargerHttpd, accepts connections, and a connection class, HttpdConnection, encapsulates a socket and handles the conversation with the client. However, this time, instead of each connection object being a Runnable serviced in its own thread, its functionality is broken into two primary methods called read() and write(). The job of our LargerHttpd is to accept new client socket connections, wrap them in an instance of HttpdConnection, and then watch the client’s status with a Selector. Whenever we detect that a client is ready to send or receive data, we hand off a Runnable task to our Executor. The task calls read() or write() on the corresponding client, based on the operation that is is ready.

The HttpConnection object encapsulates the state of the conversation with the client. Because its interface is rather coarse, it must keep track of whether it is waiting to read more input, generate a response, or write file output. The HttpdConnection also manages the interest set of its key so that it can effectively schedule itself to be woken up when it’s ready for reading or writing. The association between the HttpdConnection and the key is made by using the key’s attach() and attachment() methods.

LargerHttpd’s acceptClient() method does several things. First, it accepts the new socket connection. Next, it configures and registers it with the selector with an initial interest set for reading. Finally, it creates the HttpdConnection for the socket, and attaches the HttpdConnection object to the key for later retrieval.

The main loop of LargerHttpd is fairly straightforward. First, we set up the ServerSocketChannel. This is similar to setting up a plain ServerSocket, except that we must first create an InetSocketAddress object to hold the local loopback address and port combination of our server socket and then explicitly bind our socket to that address with the ServerSocketChannel bind() method. We also configure the server socket to nonblocking mode and register it with our main Selector so that we can select for client connections in the same loop that we use to select for client read and write readiness.

In the main select loop, we check to see whether the key is ready for an accept operation and if so, we call acceptClient(); if not, we set the key’s interest set to zero with the interestOps() method and dispatch the key to our handleClient() method via a Runnable task. It’s important that we change the interest set to zero to clear it before the next loop; otherwise, we’d be in a race to see whether the thread pool performed its maximum work before we detected another ready condition. Setting the interest ops to 0 and resetting it in the HttpdConnection object upon completion ensures that only one thread is handling a given client at a time.

For each operation that is ready, we dispatch a task to our Executor. The task calls handleClient(), passing it the selection key. From the key, we retrieve the associated HttpdConnection object and call the appropriate service method based on whether the key is ready for reading or writing. After that, it’s up to the connection object to do its job. Each call to the read() method simply does what would be one iteration of a read loop in a thread-bound application. Each read gets as much data as available and checks to see whether we’ve reached the end of a line (a \n newline character). Upon reaching the end of a line, we dispatch the call to the processRequest() method, which turns the byte buffer into text and uses the same techniques as our TinyHttpd to parse the request into a file pathname. On each incomplete call to read(), we set the interest ops of our key back to OP_READ. Upon completing the read and processing the request, we switch to using OP_WRITE because we are now ready to send a response.

The write() method keeps track of whether it’s sending a text response (error message) or a file by using the response and file instance variables. When sending a file, we use the FileChannel’s transferTo() method to transfer bytes from the file directly to the network socket without copying them into Java’s memory space. (This is indeed an efficient little web server.) And that’s about it. When we’re done, we close the client socket and cancel our key, which causes it to be removed from the Selector’s key set during the next select operation (discarding our HttpdConnection object with it).

Nonblocking Client-Side Operations

Our example showed SocketChannel used for nonblocking, selectable I/O in a typical server application. It’s less common to need nonblocking I/O from a client, but there is certainly no reason you can’t do it. Perhaps you’re writing a peer-to-peer (P2P) application that manages many connections from both sides.

For the client side of communications, one additional tool is provided: a nonblocking socket-connect operation. The process of creating a TCP connection from the client side involves contacting the remote host in a two-phase acknowledgment. This process normally blocks until the connection is established. However, the NIO package provides an alternative that allows you to initiate the connection and then poll for its status. When set to nonblocking mode, a call to a SocketChannel’s connect() method returns immediately. The connection is then attempted (and possibly succeeds or fails) in the background. Later, a Selector can be used, checking for the OP_CONNECT flag to see when the socket is ready to “finish connecting.” The connection is finished by invoking the SocketChannel’s finishConnect() method, which either returns or throws an IOException indicating the failure. The process of finishing the connection is really more about collecting the results of the asynchronous connection—acknowledging its success or failure—than about doing work.

Get Learning Java, 4th Edition now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.