美文网首页IO
反应器模型Reactor的实现

反应器模型Reactor的实现

作者: 先生zeng | 来源:发表于2019-10-19 15:41 被阅读0次

    Reactor模型

    有关Reactor模型结构,可以参考Doug Lea在 Scalable IO in Java 中的介绍。这里简单介绍一下Reactor模式的典型实现:

    Reactor单线程模型

    这是最简单的单Reactor单线程模型。Reactor线程负责多路分离套接字、accept新连接,并分派请求到处理器链中。该模型适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,所以实际使用的不多。


    image.png

    这个模型和上面的NIO流程很类似,只是将消息相关处理独立到了Handler中去了。

    代码实现如下:

    public class Reactor implements Runnable {
        final Selector selector;
        final ServerSocketChannel serverSocketChannel;
        public static void main(String[] args) throws IOException {
            new Thread(new Reactor(1234)).start();
        }
        public Reactor(int port) throws IOException {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            serverSocketChannel.configureBlocking(false);
            SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            key.attach(new Acceptor());
        }
        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    for (SelectionKey selectionKey : selectionKeys) {
                        dispatch(selectionKey);
                    }
                    selectionKeys.clear();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
        private void dispatch(SelectionKey selectionKey) {
            Runnable run = (Runnable) selectionKey.attachment();
            if (run != null) {
                run.run();
            }
        }
        class Acceptor implements Runnable {
            @Override
            public void run() {
                try {
                    SocketChannel channel = serverSocketChannel.accept();
                    if (channel != null) {
                        new Handler(selector, channel);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    class Handler implements Runnable {
        private final static int DEFAULT_SIZE = 1024;
        private final SocketChannel socketChannel;
        private final SelectionKey seletionKey;
        private static final int READING = 0;
        private static final int SENDING = 1;
        private int state = READING;
        ByteBuffer inputBuffer = ByteBuffer.allocate(DEFAULT_SIZE);
        ByteBuffer outputBuffer = ByteBuffer.allocate(DEFAULT_SIZE);
        public Handler(Selector selector, SocketChannel channel) throws IOException {
            this.socketChannel = channel;
            socketChannel.configureBlocking(false);
            this.seletionKey = socketChannel.register(selector, 0);
            seletionKey.attach(this);
            seletionKey.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        }
        @Override
        public void run() {
            if (state == READING) {
                read();
            } else if (state == SENDING) {
                write();
            }
        }
        class Sender implements Runnable {
            @Override
            public void run() {
                try {
                    socketChannel.write(outputBuffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if (outIsComplete()) {
                    seletionKey.cancel();
                }
            }
        }
        private void write() {
            try {
                socketChannel.write(outputBuffer);
            } catch (IOException e) {
                e.printStackTrace();
            }
            while (outIsComplete()) {
                seletionKey.cancel();
            }
        }
        private void read() {
            try {
                socketChannel.read(inputBuffer);
                if (inputIsComplete()) {
                    process();
                    System.out.println("接收到来自客户端(" + socketChannel.socket().getInetAddress().getHostAddress()
                            + ")的消息:" + new String(inputBuffer.array()));
                    seletionKey.attach(new Sender());
                    seletionKey.interestOps(SelectionKey.OP_WRITE);
                    seletionKey.selector().wakeup();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public boolean inputIsComplete() {
            return true;
        }
        public boolean outIsComplete() {
            return true;
        }
        public void process() {
            // do something...
        }
    }
    

    虽然上面说到NIO一个线程就可以支持所有的IO处理。但是瓶颈也是显而易见的。我们看一个客户端的情况,如果这个客户端多次进行请求,如果在Handler中的处理速度较慢,那么后续的客户端请求都会被积压,导致响应变慢!所以引入了Reactor多线程模型。

    Reactor多线程模型

    相比上一种模型,该模型在处理器链部分采用了多线程(线程池):


    image.png

    Reactor多线程模型就是将Handler中的IO操作和非IO操作分开,操作IO的线程称为IO线程,非IO操作的线程称为工作线程。这样的话,客户端的请求会直接被丢到线程池中,客户端发送请求就不会堵塞。

    可以将Handler做如下修改:

    class Handler implements Runnable {
        private final static int DEFAULT_SIZE = 1024;
        private final SocketChannel socketChannel;
        private final SelectionKey seletionKey;
        private static final int READING = 0;
        private static final int SENDING = 1;
        private int state = READING;
        ByteBuffer inputBuffer = ByteBuffer.allocate(DEFAULT_SIZE);
        ByteBuffer outputBuffer = ByteBuffer.allocate(DEFAULT_SIZE);
        
        private Selector selector;
        private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime()
                .availableProcessors());
        private static final int PROCESSING = 3;
        public Handler(Selector selector, SocketChannel channel) throws IOException {
            this.selector = selector;
            this.socketChannel = channel;
            socketChannel.configureBlocking(false);
            this.seletionKey = socketChannel.register(selector, 0);
            seletionKey.attach(this);
            seletionKey.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        }
        @Override
        public void run() {
            if (state == READING) {
                read();
            } else if (state == SENDING) {
                write();
            }
        }
        class Sender implements Runnable {
            @Override
            public void run() {
                try {
                    socketChannel.write(outputBuffer);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if (outIsComplete()) {
                    seletionKey.cancel();
                }
            }
        }
        private void write() {
            try {
                socketChannel.write(outputBuffer);
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (outIsComplete()) {
                seletionKey.cancel();
            }
        }
        private void read() {
            try {
                socketChannel.read(inputBuffer);
                if (inputIsComplete()) {
                    process();
                    executorService.execute(new Processer());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public boolean inputIsComplete() {
            return true;
        }
        public boolean outIsComplete() {
            return true;
        }
        public void process() {
        }
        synchronized void processAndHandOff() {
            process();
            state = SENDING; // or rebind attachment
            seletionKey.interestOps(SelectionKey.OP_WRITE);
            selector.wakeup();
        }
        class Processer implements Runnable {
            public void run() {
                processAndHandOff();
            }
        }
    }
    

    但是当用户进一步增加的时候,Reactor会出现瓶颈!因为Reactor既要处理IO操作请求,又要响应连接请求。为了分担Reactor的负担,所以引入了主从Reactor模型。

    主从Reactor多线程模型

    主从Reactor多线程模型是将Reactor分成两部分,mainReactor负责监听server socket,accept新连接,并将建立的socket分派给subReactor。subReactor负责多路分离已连接的socket,读写网络数据,对业务处理功能,其扔给worker线程池完成。通常,subReactor个数上可与CPU个数等同:


    image.png

    这时可以把Reactor做如下修改:

    public class Reactor {
        final ServerSocketChannel serverSocketChannel;
        Selector[] selectors; // also create threads
        AtomicInteger next = new AtomicInteger(0);
        ExecutorService sunReactors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        public static void main(String[] args) throws IOException {
            new Reactor(1234);
        }
        public Reactor(int port) throws IOException {
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            serverSocketChannel.configureBlocking(false);
            selectors = new Selector[4];
            for (int i = 0; i < 4; i++) {
                Selector selector = Selector.open();
                selectors[i] = selector;
                SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                key.attach(new Acceptor());
                new Thread(() -> {
                    while (!Thread.interrupted()) {
                        try {
                            selector.select();
                            Set<SelectionKey> selectionKeys = selector.selectedKeys();
                            for (SelectionKey selectionKey : selectionKeys) {
                                dispatch(selectionKey);
                            }
                            selectionKeys.clear();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
        }
        private void dispatch(SelectionKey selectionKey) {
            Runnable run = (Runnable) selectionKey.attachment();
            if (run != null) {
                run.run();
            }
        }
        class Acceptor implements Runnable {
            @Override
            public void run() {
                try {
                    SocketChannel connection = serverSocketChannel.accept();
                    if (connection != null)
                        sunReactors.execute(new Handler(selectors[next.getAndIncrement() % selectors.length], connection));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    可见,主Reactor用于响应连接请求,从Reactor用于处理IO操作请求。

    相关文章

      网友评论

        本文标题:反应器模型Reactor的实现

        本文链接:https://www.haomeiwen.com/subject/ktcnmctx.html