NIO Reactor模式

作者: Skymiles | 来源:发表于2018-03-16 16:46 被阅读71次

      NIO(Non-blocking I/O,在Java领域,也称为New I/O),是一种同步非阻塞的I/O模型,也是I/O多路复用的基础,已经被越来越多地应用到大型应用服务器,成为解决高并发与大量连接、I/O处理问题的有效方式。

      由于Java NIO属于基础部分,本文稍微讲解下,具体请自行了解更多NIO设计。

    传统BIO模型分析

    以下传统的服务器端同步阻塞I/O处理(也就是BIO,Blocking I/O)的经典编程模型:

    public class BioServer {
    
        //initialize socket and input stream
        private Socket socket = null;
        private ServerSocket server = null;
        private DataInputStream in = null;
    
        public static void main(String args[]) {
            BioServer server = new BioServer(5000);
        }
    
        // constructor with port
        public BioServer(int port) {
            // starts server and waits for a connection
            try {
                server = new ServerSocket(port);
                System.out.println("Server started");
    
                System.out.println("Waiting for a client ...");
    
                socket = server.accept();
                System.out.println("Client accepted");
    
                // takes input from the client socket
                in = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
    
                String line = "";
    
                // reads message from client until "Over" is sent
                while (!line.equals("Over")) {
                    try {
                        line = in.readUTF();
                        System.out.println(line);
    
                    } catch (IOException i) {
                        System.out.println(i);
                    }
                }
                System.out.println("Closing connection");
    
                // close connection
                socket.close();
                in.close();
            } catch (IOException i) {
                System.out.println(i);
            }
        }
    
    }
    

      这是一个经典的每连接每线程的模型,可以使用多线程(上面代码只有一个主现场,可以改成一个链接新建一个线程),主要原因在于socket.accept()、socket.read()、socket.write()三个主要函数都是同步阻塞的,当一个连接在处理I/O的时候,系统是阻塞的,如果是单线程的话必然就挂死在那里;但CPU是被释放出来的,开启多线程,就可以让CPU去处理更多的事情。
      当然一般都使用线程池,可以让线程的创建和回收成本相对较低。
      不过,这个模型最本质的问题在于,严重依赖于线程。但线程是很"贵"的资源,主要表现在:
      1.线程的创建和销毁成本很高,在Linux这样的操作系统中,线程本质上就是一个进程。创建和销毁都是重量级的系统函数。
      2.线程本身占用较大内存,像Java的线程栈,一般至少分配512K~1M的空间,如果系统中的线程数过千,恐怕整个JVM的内存都会被吃掉一半。
      3.线程的切换成本是很高的。操作系统发生线程切换的时候,需要保留线程的上下文,然后执行系统调用。如果线程数过高,可能执行线程切换的时间甚至会大于线程执行的时间,这时候带来的表现往往是系统load偏高、CPU sy使用率特别高(超过20%以上),导致系统几乎陷入不可用的状态。
      4.容易造成锯齿状的系统负载。因为系统负载是用活动线程数或CPU核心数,一旦线程数量高但外部网络环境不是很稳定,就很容易造成大量请求的结果同时返回,激活大量阻塞线程从而使系统负载压力过大。

      对于像BAT,TMD这样的企业,每天面对千万次连接,传统的BIO模型是无能为力的,必然需要一种更高效的I/O处理模型。

    NIO模型分析

    我们先看一下 NIO 涉及到的关联类图,如下:


    serversocketchannel.png

      上图中有两个关键类:Channel 和 Selector,它们是 NIO 中两个核心概念。我们还用前面的城市交通工具来继续比喻 NIO 的工作方式,这里的 Channel 要比 Socket 更加具体,它可以比作为某种具体的交通工具,如汽车或是高铁等,而 Selector 可以比作为一个车站的车辆运行调度系统,它将负责监控每辆车的当前运行状态:是已经出战还是在路上等等,也就是它可以轮询每个 Channel 的状态。这里还有一个 Buffer 类,它也比 Stream 更加具体化,我们可以将它比作为车上的座位,Channel 是汽车的话就是汽车上的座位,高铁上就是高铁上的座位,它始终是一个具体的概念,与 Stream 不同。Stream 只能代表是一个座位,至于是什么座位由你自己去想象,也就是你在去上车之前并不知道,这个车上是否还有没有座位了,也不知道上的是什么车,因为你并不能选择,这些信息都已经被封装在了运输工具(Socket)里面了,对你是透明的。NIO 引入了 Channel、Buffer 和 Selector 就是想把这些信息具体化,让程序员有机会控制它.

      下面是典型的一段 NIO 代码:

    public class PlainNioServer {
    
        public static void main(String[] args) throws Exception{
            int port = 9876;
            System.out.println("Listening for connections on port " + port);
            ServerSocketChannel serverChannel;
            Selector selector;
    
            serverChannel = ServerSocketChannel.open();
            // 开启非阻塞模式
            serverChannel.configureBlocking(false);
            ServerSocket ss = serverChannel.socket();
            InetSocketAddress address = new InetSocketAddress(port);
            //绑定相应IP及port
            ss.bind(address);
            selector = Selector.open();
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
    
            while (true) {
                try {
                    selector.select();
                } catch (IOException ex) {
                    ex.printStackTrace();
                    // handle in a proper way
                    break;
                }
    
                Set<SelectionKey> readyKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = readyKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    try {
                        if (key.isAcceptable()) {
                            ServerSocketChannel server = (ServerSocketChannel) key.channel();
    
                            SocketChannel client = server.accept();
                            client.configureBlocking(false);
                            client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
                            System.out.println("Accept connection from " + client);
                        }
                        if (key.isWritable()) {
                            SocketChannel client = (SocketChannel) key.channel();
                            ByteBuffer buffer = (ByteBuffer) key.attachment();
                            while (buffer.hasRemaining()) {
                                if (client.write(buffer) == 0) {
                                    break;
                                }
                            }
                            System.out.println("服务器端写到客户端信息:" + buffer.toString());
                            client.close();
                        }
                        if (key.isReadable()) {
                            SocketChannel client = (SocketChannel) key.channel();
                            client.configureBlocking(false);
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            client.read(buffer);
                            byte[] data = buffer.array();
                            System.out.println("服务器端收到客户端信息:" + data);
                        }
                    }
                    catch (IOException e) {
                        key.cancel();
                        try {
                            key.channel().close();
                        } catch (IOException ioe) {
                            System.out.println(ioe);
                        }
                    }
                }
            }
        }
    }
    

    这个是NIO的服务端代码,调用 Selector 的EpollSelectorProvider创建一个选择器Selector, 创建的服务端的 ServerSocketChannel 绑定到一个 Socket 对象,并把这个通信channel注册到Selector上而且要监听OP_ACCEPT,客户端连接事件,再把channel设置为非阻塞模式并绑定9876端口。
    代码可以看到,方法整体是一个死循环,轮询访问Selector,发生某些已经注册在Selector上的事件时,该方法返回。可以通过selector.selectedKeys获取发生的事件,我们通过key获取到注册它的那个Channel,在这里是ServerSocketChannel,通过server.accept()获取客户端连接,这里同样可以类比到传统的阻塞IO,在阻塞IO中我们可以通过ServerSocket.accept获取到socket,唯一不同的是,阻塞IO中的accept方法是阻塞操作,而NIO中是非阻塞的。
    而轮询到的selectionKey是客户端的Writable,Readable的话,直接进行读写操作,利用NIOBuffer。


    nio.png

    上图中的 Selector 可以同时监听一组通信信道(Channel)上的 I/O 状态,选择器 Selector 可以调用 select() 方法检查已经注册的通信信道上的是否有 I/O 已经准备好,如果没有至少一个信道 I/O 状态有变化,那么 select 方法会阻塞等待或在超时时间后会返回 0。上图中如果有多个信道有数据,那么将会将这些数据分配到对应的数据 Buffer 中。所以关键的地方是有一个线程来处理所有连接的数据交互,每个连接的数据交互都不是阻塞方式,所以可以同时处理大量的连接请求。

    优化线程模型

      上面铺垫了这么多BIO,NIO知识,终于来到了我们今天的主题,Reactor模式。NIO虽然大大改善了BIO中,原来的阻塞读写(占用线程)变成了单线程轮询事件,找到可以进行读写的网络描述符进行读写。除了事件的轮询是阻塞的(没有可干的事情必须要阻塞),剩余的I/O操作都是纯CPU操作,没有必要开启多线程。

      但是现代计算机都是多核心的,如果能够利用多核心进行I/O,无疑对效率会有更大的提高。所以引入了Reactor 模式。

    Reactor模式思想:分而治之+事件驱动

    一. 分而治之

      一个连接里完整的网络处理过程一般分为accept、read、decode、process、encode、send这几步。

      Reactor模式将每个步骤映射为一个Task,服务端线程执行的最小逻辑单元不再是一次完整的网络请求,而是Task,且采用非阻塞方式执行。

    二. 事件驱动

      每个Task对应特定网络事件。当Task准备就绪时,Reactor收到对应的网络事件通知,并将Task分发给绑定了对应网络事件的Handler执行。

    三. 几个角色

      Reactor:负责响应事件,将事件分发给绑定了该事件的Handler处理;

      Handler:事件处理器,绑定了某类事件,负责执行对应事件的Task对事件进行处理;

      Acceptor:Handler的一种,绑定了connect事件。当客户端发起connect请求时,Reactor会将accept事件分发给Acceptor处理。

    单线程Reactor

    单线程Reactor.png

    1. 优点:

      不需要做并发控制,代码实现简单清晰。

    2. 缺点:

    a. 不能利用多核CPU;

    b. 一个线程需要执行处理所有的accept、read、decode、process、encode、send事件,当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了). 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。

    下面是单线程Reactor的代码:

    public class Reactor implements Runnable {
    
        final Selector selector;
        final ServerSocketChannel serverSocketChannel;
        int port;
    
        public Reactor(int port) throws IOException {
            this.port = port;
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            serverSocketChannel.configureBlocking(false);
            SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            //利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
            sk.attach(new Acceptor(this));
            System.out.println("-->attach(new Acceptor()!");
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    selector.select();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                Set<SelectionKey> readyKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = readyKeys.iterator();
                while (it.hasNext()) {
                    dispatch((SelectionKey) (it.next()));
                }
                readyKeys.clear();
            }
        }
    
        //运行Acceptor或handler
        private void dispatch(SelectionKey sk) {
            Runnable task = (Runnable) sk.attachment();
            if (task != null) {
                task.run();
            }
        }
    }
    
    public class Acceptor implements Runnable {
    
        private Reactor reactor;
    
        public Acceptor(Reactor reactor) {
            this.reactor = reactor;
        }
    
        @Override
        public void run() {
            try {
                SocketChannel socketChannel = reactor.serverSocketChannel.accept();
                if (socketChannel != null) {
                    // 调用Handler来处理channel, 并向Reactor注册此Handler
                    new NioHandler(reactor.selector, socketChannel);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    public class NioHandler implements Runnable {
        final SocketChannel socketChannel;
        final SelectionKey selectionKey;
        ByteBuffer input = ByteBuffer.allocate(1024);
        static final int READING = 0, SENDING = 1;
        int state = READING;
        String clientName = "";
    
        NioHandler(Selector selector, SocketChannel c) throws IOException {
            socketChannel = c;
            c.configureBlocking(false);
            selectionKey = socketChannel.register(selector, 0);
            selectionKey.attach(this);
            selectionKey.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
        }
    
        @Override
        public void run() {
            ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
            inputBuffer.clear();
            try {
                int n = socketChannel.read(inputBuffer);
                if (n != -1) {
                    byte[] data = new byte[n];
                    System.arraycopy(inputBuffer.array(), 0, data, 0, n);
                    System.out.println("Output: " + new String(data) + "\n");
                }
                System.out.println("Finish......");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    多线程Reactor

    多线程Reactor.png

    特点:

    a. Reactor 的多线程模型与单线程模型的区别就是 acceptor 是一个单独的线程处理, 并且有一组特定的 NIO 线程来负责各个客户端连接的 IO 操作

    b. 有专门一个线程, 即 Acceptor 线程用于监听客户端的TCP连接请求.

    c. 客户端发送的IO操作由一个worker reactor线程池负责,由线程池中的NIO线程负责监听客户端SocketChannel事件,进行消息的读取、解码、编码和发送。

    d. 一个NIO线程可以同时处理N条链路,但是一个链路只注册在一个NIO线程上处理,防止发生并发操作问题。

    4、主从多线程Reactor

    主从多线程Reactor.png

      在绝大多数场景下,Reactor多线程模型都可以满足性能需求;但是在极个别特殊场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。

    特点:

    a. 服务端用于接收客户端连接的不再是个1个单独的reactor线程,而是一个boss reactor线程池;

    b. 服务端启用多个ServerSocketChannel监听不同端口时,每个ServerSocketChannel的监听工作可以由线程池中的一个NIO线程完成。


    二、Netty线程模型

    Netty线程模型.png

    a. netty线程模型采用“服务端监听线程”和“IO线程”分离的方式,与多线程Reactor模型类似。

    b. netty服务器端的 ServerSocketChannel 只绑定到了 bossGroup 中的一个线程, 因此在调用 Java NIO 的 Selector.select 处理客户端的连接请求时, 实际上是在一个线程中的, 所以对只有一个服务的应用来说, bossGroup 设置多个线程是没有什么作用的, 反而还会造成资源浪费.

    c. Netty线程模型是多Reactor模型,bossGroup一个线程,workerGroup多个线程(cpu核数*2)。

    相关文章

      网友评论

        本文标题:NIO Reactor模式

        本文链接:https://www.haomeiwen.com/subject/btblqftx.html