美文网首页java高级开发
理解NIO及Reactor模式

理解NIO及Reactor模式

作者: 老鼠AI大米_Java全栈 | 来源:发表于2020-06-29 18:22 被阅读0次

    目前移动互联网应用非常流行,如微信,抖音等都需要实时与服务器通信,获取实时的信息,这种获取信息的方式就是通过长连接socket实现的,那就需要了解服务端是如何处理高并发IO的。

    IO(BIO)

    在jdk1.4之前,java中的IO类库实在是超级原始,很多我们现在熟知的概念都还没有出现,比如说管道、缓冲区等等。正是由于这些等等原因,C语言和C++一直都是IO方面的首选。这是原始的IO方式,也叫作BIO,它的原理很简单,我们使用一张图来表示一下:

    1.png
    也就是说BIO时代,每次有一个客户端连接进来的时候,都会有一个新的线程去处理,缺点显而易见,如果连接比较多的时候,我们就要建立大量的线程去一一处理。

    NIO

    jdk1.4开始被正式发布了,做出的一个巨大的改变就是新增了NIO包。它提供了很多异步的IO操作方法,比如说缓冲区ByteBuffer、Pipe、Channel还有多路复用器Selector等等。新的NIO类库的出现,极大地促进了java对异步非阻塞式编程的发展。NIO的原理也是很简单。在这里同样使用一张图来演示一遍:


    2.png

    现在我们可以看到,所有的客户端连接都可以只用一个线程就可以实现了。

    AIO

    在2011年7月28日,官方将用了将近十年的NIO类库做了升级,也被称为NIO2.0。后来也叫作AIO。AIO的原理是在之前的基础上进行的改进,意思是异步非阻塞式IO,也就是说你的客户端在进行读写操作的时候,只需要给服务器发送一个请求,不用一直等待回答就可以去做其他的事了。
    由于AIO过于复杂,并没有广泛使用,这里就不展开说了。

    NIO解决的问题

    Nio要解决的问题网上的解释一大堆,诸如银行取号、餐厅点餐等等。这些列子就不再具体地重复了,实际上就是为了使用现有的资源提供更高的生产效率。

    如何提高呢?举个简单例子,一个汽车生产厂商有若干条生产线(一条生产线负责汽车制造的所有环节),每个生产线都有相同的工人数目,每个工人都负责一个生产环节,也就是说生产发动机和生产轮胎的工人数目是一样的,但是很明显生产发动机需要的时间肯定比轮胎要长很多,那么在每一条生产线上生产发动机的那个工人往往满负荷工作,而生产轮胎的工人却很闲,这样生产效率很低。因此厂家打破了这种一条生产线生产汽车所有环节的模式,改为一个汽车零部件一条生产线,那么在发动机生产线雇佣的工人数目一定多于轮胎生产线,这样每条生产线的工人都不会闲着,通过资源的合理分配最大化利用了工人的价值,提高了生产效率,赚取了剩余价值。

    而如何通过资源合理分配来提高生产效率就是nio在计算机io领域要解决的问题。

    同步/异步、阻塞/非阻塞

    同步和异步针对应用程序来,关注的是程序中间的协作关系;
    阻塞与非阻塞更关注的是单个进程的执行状态。

    同步:执行一个操作之后,等待结果,然后才继续执行后续的操作。
    异步:执行一个操作后,可以去执行其他的操作,然后等待通知再回来执行刚才没执行完的操作。

    阻塞:进程给CPU传达一个任务之后,一直等待CPU处理完成,然后才执行后面的操作。
    非阻塞:进程给CPU传达任我后,继续处理后续的操作,隔断时间再来询问之前的操作是否完成。这样的过程其实也叫轮询。

    什么是Reactor

    后期随着jdk的一步步发展,nio非堵塞技术开始变多越来越加广泛,可分为三种reactor模式。

    Reactor可以理解为反应器模式。当一个主体发生改变时,所有依属体都得到通知。不过,观察者模式与单个事件源关联,而反应器模式则与多个事件源关联 。

    NIO 有一个主要的类Selector,这个类似一个观察者,只要把需要探知的SocketChannel告诉Selector,接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,读取这些Key,就会获得刚刚注册过的SocketChannel,然后,我们从这个Channel中读取数据,接着可以处理这些数据。

    单线程的Reactor模式

    Reactor里面的单线程模式的结构图:


    1.png

    当有多个请求发送到server的时候,会经过反应器对其进行处理,相应的代码如下所示:

     
    import lombok.extern.slf4j.Slf4j;
     
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
     
    /**
     * @author idea
     * @data 2019/4/11
     */
    @Slf4j
    public class NioServer {
     
        public static void main(String[] args) {
            try {
                Selector selector = Selector.open();
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.bind(new InetSocketAddress(9090));
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("server is open!");
                while (true) {
                    if (selector.select() > 0) {
                        Set<SelectionKey> keys = selector.selectedKeys();
                        Iterator<SelectionKey> iterator = keys.iterator();
                        while (iterator.hasNext()) {
     
                            SelectionKey selectionKey = iterator.next();
                            if (selectionKey.isReadable()) {
                                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                                int len = 0;
                                //当管道的数据都读取完毕了
                                while ((len = (socketChannel.read(byteBuffer))) > 0) {
                                    byteBuffer.flip();
                                    System.out.println(new String(byteBuffer.array(), 0, len));
                                    byteBuffer.clear();
                                }
                            } else if (selectionKey.isAcceptable()) {
                                //第一次链接到server,需要构建一个通道
                                ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) selectionKey.channel();
                                //开通通道
                                SocketChannel socketChannel = acceptServerSocketChannel.accept();
                                //设置为非堵塞
                                socketChannel.configureBlocking(false);
                                //注册可读的监听事件
                                socketChannel.register(selector, SelectionKey.OP_READ);
                                System.out.println("[server]接收到新的链接");
                            }
                            iterator.remove();
                        }
                    }
     
                }
            } catch (IOException e) {
                log.error("[server]异常出现,信息为{}", e);
            }
     
        }
    }
    

    单线程模式的Reactor有一个很明显的缺点,那就是在处理请求的时候,对于不同状态的通道处理,以及请求的监听全部都放在了单个线程上进行,(多个Channel可以注册到同一个Selector对象上,实现了一个线程同时监控多个请求状态(Channel))因此效率很低下。因此就会有了第二种Reactor模式。

    多线程的Reactor模式

    在原先的单线程模式中,一个线程同时处理多个请求,但是所有的读写请求以及对于数据的处理都在同一线程中,无法充分利用多cpu的优势,因此诞生了这种多线程的Reactor模式。

    多线程的Reactor模式基本结构图如下所示:


    2.png

    代码如下所示:

     
     
    import lombok.extern.slf4j.Slf4j;
     
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
     
    /**
     * @author idea
     * @data 2019/4/11
     */
    @Slf4j
    public class Server {
     
        public static void main(String[] args) {
            try {
                Selector selector = Selector.open();
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.bind(new InetSocketAddress(9090));
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("[server]开始启动服务器");
                while (true) {
                    if (selector.selectNow() < 0) {
                        continue;
                    }
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = keys.iterator();
                    while (iterator.hasNext()) {
     
                        SelectionKey selectionKey = iterator.next();
                        if (selectionKey.isReadable()) {
                            Processor processor = (Processor) selectionKey.attachment();
                            processor.process(selectionKey);
                        } else if (selectionKey.isAcceptable()) {
                            ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) selectionKey.channel();
                            SocketChannel socketChannel = acceptServerSocketChannel.accept();
                            socketChannel.configureBlocking(false);
                            SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
     
                            //绑定处理器线程
                            key.attach(new Processor());
                            System.out.println("[server]接收到新的链接");
     
                        }
                        iterator.remove();
                    }
                }
            } catch (IOException e) {
                log.error("[server]异常出现,信息为{}", e);
            }
     
        }
    }
    

    从代码可以看到,每次当系相应的channel注册完相应的OP_READ事件后,可以对相应的SelectionKey attach一个对象(本例中attach了一个Processor对象,该对象处理读请求),并且在获取到可读事件后,可以取出该对象。

    再看到相应的Processor对象代码:

     
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.SocketChannel;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
     
    /**
     * 处理器
     *
     * @author idea
     * @data 2019/4/11
     */
    public class Processor {
     
        private static final ExecutorService service = new ThreadPoolExecutor(16, 16,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
     
        public void process(SelectionKey selectionKey) {
            service.submit(() -> {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                int count = socketChannel.read(buffer);
                if (count < 0) {
                    socketChannel.close();
                    selectionKey.cancel();
                    System.out.println("读取结束!");
                    return null;
                } else if (count == 0) {
                    return null;
                }
                System.out.println("读取内容:" + new String(buffer.array()));
                return null;
            });
        }
    }
    

    需要开启一个线程池来进行数据处理的任务。这里面就将数据处理的压力分担给了线程池来执行,充分利用了多线程的优势,将新线程的连接和数据的io操作分别放在了不同的线程中进行运行。

    在上述的多线程Reactor模式中,有专门的nio-acceptor线程来用于监听服务器,接收客户端的tcp连接。然后又有专门的线程池来处理消息的读取,发送,编码解码等工作。一个nio同时处理N条链路,每个链路只对应一个NIO线程。(防止了并发操作的发生)。看似这样的安排很美好,也确实能解决大多数应用场景的问题。

    但是在极端情况下仍然会有弊端,单独的NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如并发百万客户端连接,或者服务端需要对客户端握手进行安全认证,但是认证本身非常损耗性能。在这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型-主从Reactor多线程模型。

    主从Reactor多线程模式

    3.png

    主从Reactor线程模型的特点是:服务端用于接收客户端连接的不再是个1个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP连接请求处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到IO线程池(subreactor线程池)的某个IO线程上,由它负责SocketChannel的读写和编解码工作。Acceptor线程池仅仅只用于客户端的登陆、握手和安全认证,一旦链路建立成功,就将链路注册到后端SubReactor线程池的IO线程上,由IO线程负责后续的IO操作。

    相应代码:

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
     
    /**
     * @author idea
     * @data 2019/4/11
     */
    public class Server {
    
        public static void main(String[] args) throws IOException {
            Selector selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(1234));
            //初始化通道,标志为accept类型
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
     
            int coreNum = Runtime.getRuntime().availableProcessors();
            Processor[] processors = new Processor[coreNum];
            for (int i = 0; i < processors.length; i++) {
                processors[i] = new Processor();
            }
     
            int index = 0;
            //一直处于堵塞的状态
            while (selector.select() > 0) {
                //获取到selectionkey的集合
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    if (key.isAcceptable()) {
                        ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
                        SocketChannel socketChannel = acceptServerSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        System.out.println("Accept request from {}" + socketChannel.getRemoteAddress());
                        Processor processor = processors[(int) ((index++) / coreNum)];
                        processor.addChannel(socketChannel);
                    }
                    iterator.remove();
                }
            }
        }
    }
    

    处理器部分的代码:

     
    import lombok.extern.slf4j.Slf4j;
     
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.ClosedChannelException;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.channels.spi.SelectorProvider;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
     
    @Slf4j
    public class Processor {
     
        private static final ExecutorService service =
                Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
     
        private Selector selector;
     
        public Processor() throws IOException {
            this.selector = SelectorProvider.provider().openSelector();
            start();
        }
     
        public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
            socketChannel.register(this.selector, SelectionKey.OP_READ);
        }
     
        public void start() {
            service.submit(() -> {
                while (true) {
                    if (selector.selectNow() <= 0) {
                        continue;
                    }
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = keys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isReadable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            int count = socketChannel.read(buffer);
                            if (count < 0) {
                                socketChannel.close();
                                key.cancel();
                                System.out.println("读取结束" + socketChannel);
                                continue;
                            } else if (count == 0) {
                                System.out.println("客户端信息大小:" + socketChannel);
                                continue;
                            } else {
                                System.out.println("客户端信息:" + new String(buffer.array()));
                            }
                        }
                    }
                }
            });
        }
    }
    

    通常在互联网公司中,对于一些高并发的应用场景里面都会使用到了Reactor模式,其代替了常用的多线程处理方式,节省系统的资源,提高系统的吞吐量。类似于一些netty框架的核心原理其实就是通过nio的Reactor模式来进行设计和开发。

    相关文章

      网友评论

        本文标题:理解NIO及Reactor模式

        本文链接:https://www.haomeiwen.com/subject/nncafktx.html