Wednesday, March 20, 2013

Reactor Pattern Explained - Part 2

This is the continuation of my previous post Reactor Pattern Explained - Part 1 which gives a high-level understanding of the topic.

In this blog post I will explain the implementation of Reactor Pattern with a simple Client - Server system where the server will send Hello messages to each client when their names are told to the server. The server will listen to port 9900 and multiple clients will connect to the server to shout out their names. A thread pool will not be used here. First lets run the server in a single thread. Part 3 of this series will explain how a Thread pool is used.

First lets make the Client to connect to port 9900.

public class Client {
    String hostIp;
    int hostPort;

    public Client(String hostIp, int hostPort) {
        this.hostIp = hostIp;
        this.hostPort = hostPort;
    }

    public void runClient() throws IOException {
        Socket clientSocket = null;
        PrintWriter out = null;
        BufferedReader in = null;

        try {
            clientSocket = new Socket(hostIp, hostPort);
            out = new PrintWriter(clientSocket.getOutputStream(), true);
            in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
        } catch (UnknownHostException e) {
            System.err.println("Unknown host: " + hostIp);
            System.exit(1);
        } catch (IOException e) {
            System.err.println("Couldn't connect to: " + hostIp);
            System.exit(1);
        }

        BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));
        String userInput;

        System.out.println("Client connected to host : " + hostIp + " port: " + hostPort);
        System.out.println("Type (\"Bye\" to quit)");
        System.out.println("Tell what your name is to the Server.....");

        while ((userInput = stdIn.readLine()) != null) {

            out.println(userInput);

            // Break when client says Bye.
            if (userInput.equalsIgnoreCase("Bye"))
                break;

            System.out.println("Server says: " + in.readLine());
        }

        out.close();
        in.close();
        stdIn.close();
        clientSocket.close();
    }

    public static void main(String[] args) throws IOException {

        Client client = new Client("127.0.0.1", 9900);
        client.runClient();
    }
}

Notice that the client doesn't use java.nio to create the Socket. It simply uses a java.net.Socket everybody knows about.

Now lets make the Reactor in the Server.

public class Reactor implements Runnable {

    final Selector selector;
    final ServerSocketChannel serverSocketChannel;
    final boolean isWithThreadPool;

    Reactor(int port, boolean isWithThreadPool) throws IOException {

        this.isWithThreadPool = isWithThreadPool;
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        selectionKey0.attach(new Acceptor());
    }


    public void run() {
        System.out.println("Server listening to port: " + serverSocketChannel.socket().getLocalPort());
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext()) {
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null) {
            r.run();
        } 
    }

    class Acceptor implements Runnable {
        public void run() {
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                if (socketChannel != null) {
                    if (isWithThreadPool)
                        new HandlerWithThreadPool(selector, socketChannel);
                    else
                        new Handler(selector, socketChannel);
                }
                System.out.println("Connection Accepted by Reactor");
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

The Reactor is a Runnable. See the while loop in the run() method. It will call selector.select() to get the SelectionKeys which have pending IO events. When the SelectionKeys are selected, they will be dispatched one by one. See the dispatch() method. The SelectionKey will have an attatchment which is also a Runnable. This attatchement will either be an Acceptor or a Handler.
Notice how the Acceptor inner class in the Reactor accepts connections to make SocketChannels. When a SocketChannel is created a new Handler will be created as well. (HandlerWithThreadPool will be discussed in the next section)


public class Handler implements Runnable {

    final SocketChannel socketChannel;
    final SelectionKey selectionKey;
    ByteBuffer input = ByteBuffer.allocate(1024);
    static final int READING = 0, SENDING = 1;
    int state = READING;
    String clientName = "";

    Handler(Selector selector, SocketChannel c) throws IOException {
        socketChannel = c;
        c.configureBlocking(false);
        selectionKey = socketChannel.register(selector, 0);
        selectionKey.attach(this);
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }


    public void run() {
        try {
            if (state == READING) {
                read();
            } else if (state == SENDING) {
                send();
            }
        } catch (IOException ex) {
            ex.printStackTrace();
        }
    }

    void read() throws IOException {
        int readCount = socketChannel.read(input);
        if (readCount > 0) {
            readProcess(readCount);
        }
        state = SENDING;
        // Interested in writing
        selectionKey.interestOps(SelectionKey.OP_WRITE);
    }

    /**
     * Processing of the read message. This only prints the message to stdOut.
     *
     * @param readCount
     */
    synchronized void readProcess(int readCount) {
        StringBuilder sb = new StringBuilder();
        input.flip();
        byte[] subStringBytes = new byte[readCount];
        byte[] array = input.array();
        System.arraycopy(array, 0, subStringBytes, 0, readCount);
        // Assuming ASCII (bad assumption but simplifies the example)
        sb.append(new String(subStringBytes));
        input.clear();
        clientName = sb.toString().trim();
    }

    void send() throws IOException {
        System.out.println("Saying hello to " + clientName);
        ByteBuffer output = ByteBuffer.wrap(("Hello " + clientName + "\n").getBytes());
        socketChannel.write(output);
        selectionKey.interestOps(SelectionKey.OP_READ);
        state = READING;
    }
}

A Handler has 2 states, READING and SENDING. Both cant be handled at the same time because a Channel supports only one operation at one time. Since its the client who speaks first, a server Handler will start with the READING state. Notice how this Handler is attatched to the SelectionKey and how the Interested Operation is set to OP_READ. This means that the Selector should only select this SelectionKey when a Read Event occurs. Once the read process is done, the Handler will change its state to SENDING and will change the Interested Operation to OP_WRITE. Now the Selector will select this SelectionKey only when it gets a Write Event from the Channel when its ready to be written with data. When a Write Event is dispatched to this Handler, it will write the Hello message to the output buffer since now the state is SENDING. Once sending is done, it will change back to READING state with Interested Operation changed to OP_READ again. It should be obvious that since both Handler and Acceptor are Runnables, the dispatch() method of the Reactor can execute the run() method of any attatchment it gets from a selected SelectionKey.

Here is the main method. We will run it without a Thread pool for the moment.

public static void main(String[] args) throws IOException{

    Reactor reactor  = new Reactor(9900, false);
    new Thread(reactor).start();
}

To see how this works first run the server. Then run several clients and see how they get connected to the server. When each client writes a name to standard in of the client, the sever will respond to the client with a Hello message. Notice that the server runs in a single Thread but responds to any number of clients which connect to the server.

Read the next section Reacter Pattern Explained - Part 3 to see how to use a Thread pool to run Handlers.