美文网首页
5.NIO阻塞与非阻塞

5.NIO阻塞与非阻塞

作者: xialedoucaicai | 来源:发表于2018-05-17 18:06 被阅读0次

    在讨论阻塞/非阻塞之前,我们先看看IO的两个阶段

    对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,它会经历两个阶段:

    1. 等待数据准备 (Waiting for the data to be ready)
    2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
      ---------引用自IO - 同步,异步,阻塞,非阻塞 (亡羊补牢篇)

    1.阻塞

    阻塞:传统的IO操作是阻塞的,在等待数据和真正读写数据期间(即上述1 2两步),线程被阻塞,直到操作完成,在此期间不能做其他事情。
    以传统的socket通信为例,看代码:

    public void server() throws Exception{
        //获取服务端通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        //绑定端口号
        serverChannel.bind(new InetSocketAddress(9999));
        while(true){
            //获取客户端通道
            SocketChannel clientChannel = serverChannel.accept();
            //读取通道中数据,并回应客户端
                //缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                while(clientChannel.read(buffer) != -1){
                    buffer.flip();
                    System.out.println("收到消息"+new String(buffer.array(),0,buffer.limit()));
                    buffer.clear();
                }
                //通知客户端,服务器收到了
                buffer.put("服务端响应客户端!".getBytes());
                buffer.flip();
                clientChannel.write(buffer);
                //向客户端发送-1,否则客户端会一直阻塞
                clientChannel.shutdownOutput();
        }
    }
    
    public void client1() throws IOException, InterruptedException{
        //获取通道
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",9999));
        //分配缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //本地文件传输到服务器
        //for(int i=0;i<10;i++){
            String msg = "客户端1发消息"+System.currentTimeMillis();
            System.out.println(msg);
            buffer.put(msg.getBytes());
            //模拟网络很慢
            Thread.sleep(20000);
            buffer.flip();
            socketChannel.write(buffer);
            buffer.clear();
        //}
        System.out.println("客户端1消息发完了");
        //加入这一句,向服务端发送-1,告诉服务端我文件传完了
        //否则服务端会一直读取,导致阻塞,因为没有得到-1
        socketChannel.shutdownOutput();
        //获取服务端的反馈
        int len = 0;
        while((len = socketChannel.read(buffer)) != -1){
            buffer.flip();
            System.out.println(new String(buffer.array(),0,len));
        }
        socketChannel.close();
    }
    
    public void client2() throws IOException, InterruptedException{
        //获取通道
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",9999));
        //分配缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put("我是client2".getBytes());
        buffer.flip();
        socketChannel.write(buffer);
        buffer.clear();
        //加入这一句,向服务端发送-1,告诉服务端我文件传完了
        //否则服务端会一直读取,导致阻塞,因为没有得到-1
        socketChannel.shutdownOutput();
        //获取服务端的反馈
        int len = 0;
        while((len = socketChannel.read(buffer)) != -1){
            buffer.flip();
            System.out.println(new String(buffer.array(),0,len));
        }
        
        socketChannel.close();
    }
    
    1. 服务端启动,在while(true)中等待客户端连接。
    2. 客户端1连接,模拟网络很慢,20s后才发送数据给服务端。
    3. 在此期间,客户端2连接,但服务端未完成读取数据,所以无法响应客户端2
    4. 等客户端1数据读取结束后,才能处理客户端2的请求

    为解决主线程被阻塞无法响应其他请求,一般会在accept()之后,结合线程池开启线程去处理客户端请求,这样即使阻塞子线程,主线程还能继续响应,但阻塞问题仍然存在,且线程切换在并发量大的时候会很频繁,影响服务器性能。

    2.非阻塞

    非阻塞:在等待数据期间是可以做其他操作的,但在真正读写数据时也是阻塞的。

    解决思路:阻塞产生的主要原因是发起读请求后,数据并未真正可读,主线程一直在那里等,等到可读为止。那我们引入一个监控通知机制,让数据真正可读的时候再发起读请求,不可读时,先处理其他请求,不要干等着。

    Selector:在NIO中通过Selector实现非阻塞通信。
    服务端将Channel和它所感兴趣的事件注册到selector,然后轮询selector中感兴趣的事件是否发生,事件未发生时,主线程被操作系统阻塞(select()阻塞直到Channel感兴趣事件发生),事件发生时,select()返回,主线程遍历selector中所有事件,针对不同的事件做相应的处理。

    看刘欣的<<Http Server : 一个差生的逆袭>>,认为这里事件未发生时,主线程是阻塞的,是操作系统在感兴趣的事件发生时,对事件进行标记,将主线程唤醒去处理的,唤醒后需要遍历所有注册在selector上的事件,再对被标记的事件进行处理。
    每次主线程被唤醒还得遍历所有事件,一点也不智能。让操作系统直接把可处理的事件告诉主线程,免去了主线程遍历查找的步骤,这就是epoll。

    还是上面的例子,我们看非阻塞方式是如何处理的:

    @Test
    public void server() throws IOException {
        try {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8000));
            ssc.configureBlocking(false);
    
            Selector selector = Selector.open();
            // 注册 channel,并且指定感兴趣的事件是 Accept
            ssc.register(selector, SelectionKey.OP_ACCEPT);
    
            ByteBuffer readBuff = ByteBuffer.allocate(1024);
            ByteBuffer writeBuff = ByteBuffer.allocate(128);
            writeBuff.put("服务端反馈".getBytes());
            writeBuff.flip();
    
            while (true) {
                // select会阻塞直到指定通道感兴趣的事件发生
                int nReady = selector.select();
                System.out.println("select()返回值:" + nReady);
                if (nReady == 0) {
                    continue;
                }
                // 已选择的键集合 被select()判断为已准备好的键
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
    
                while (it.hasNext()) {
                    // SelectionKey表示通道和选择器的注册关系
                    SelectionKey key = it.next();
                    //System.out.println("移除监控事件" + key.interestOps());
                    it.remove();
    
                    if (key.isAcceptable()) {
                        System.out.println("服务端accept");
                        // 创建新的连接,并且把连接注册到selector上,而且,
                        // 声明这个channel只对读操作感兴趣。
                        SocketChannel socketChannel = ssc.accept();
                        socketChannel.configureBlocking(false);
                        System.out.println("注册读事件");
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        System.out.println("服务端readable");
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        socketChannel.read(readBuff);
                        readBuff.flip();
                        System.out.println("服务端received : " + new String(readBuff.array(),0,readBuff.limit()));
                        readBuff.clear();
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    @Test
    public void client1() throws IOException, InterruptedException {
        try {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));
    
            ByteBuffer writeBuffer = ByteBuffer.allocate(32);
            ByteBuffer readBuffer = ByteBuffer.allocate(32);
    
            writeBuffer.put("hello".getBytes());
            writeBuffer.flip();
            // 模拟网络延迟
            Thread.sleep(10000);
            socketChannel.write(writeBuffer);
            socketChannel.read(readBuffer);
            readBuffer.flip();
            System.out.println("客户端收到反馈:" + new String(readBuffer.array(), 0, readBuffer.limit()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    @Test
    public void client2() throws IOException, InterruptedException {
        try {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));
    
            ByteBuffer writeBuffer = ByteBuffer.allocate(32);
            ByteBuffer readBuffer = ByteBuffer.allocate(32);
    
            writeBuffer.put("hello2".getBytes());
            writeBuffer.flip();
            socketChannel.write(writeBuffer);
            socketChannel.read(readBuffer);
            readBuffer.flip();
            System.out.println("客户端收到反馈:" + new String(readBuffer.array(), 0, readBuffer.limit()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    1. 服务端启动后,告诉selector帮我监控下ServerSocketChannel的accept()事件
    2. 客户端连接之后,accept()事件触发,告诉selector帮我监控下SocketChannel的read()事件。
    3. 由于客户端1的数据迟迟未到,主线程阻塞在select()。
    4. 在此期间,客户端2连接到了服务器,select()返回,主线程被操作系统唤醒,开始遍历selector中的就绪事件,发现accept()就绪,处理accept(),客户端数据可读,处理read()事件。
    5. 客户端1的数据终于到了,处理客户端1的read()事件。
      也就是主线程没有一直在哪里干等客户端1的数据,而是做了其他事(处理客户端2的accept()事件),这就实现了非阻塞的目的。

    到这里,应该就能理解NIO与IO的第二点区别了吧:

    2.借助Selector实现了非阻塞

    3. 异步

    异步IO等待数据和读取数据期间都不会发生阻塞。
    对于read操作,主线程只负责将read的命令信息告诉内核,然后就返回。剩下的两步工作都由内核完成,然后通知主线程,任务完成了。

    4. 总结

    举一个不是非常贴切的例子类比一下:
    阻塞:基层码农,增删改查
    非阻塞:架构师,写框架
    异步:老板,动嘴不动手

    关于阻塞/非阻塞,再举一个服务员的例子:
    阻塞:服务员,客人来了(accept事件),客人点菜(处理accept),厨师做菜(等待客户端数据),服务员一直等了10分钟,厨师做好了(读就绪),给客人端菜
    非阻塞:服务员,不停地找活干,客人来了(accept),客人点菜(处理accept),厨师做菜,服务员并没有在哪里等10分钟,而是继续找其他的活,这期间还接待了很多组客人(处理了多个accept事件),发现厨师做好了(读就绪),给客人端菜

    其实,Java NIO的细节还有很多,这里只是对其有一个简单了解,如果需要更深入学习,可以参考这些资料:

    实际使用的时候,我们一般也不会直接用NIO来完成Socket通信,可以使用现成的易用的框架,比如Netty,后面有空了可以再研究下这个框架,感觉功能很强大啊

    相关文章

      网友评论

          本文标题:5.NIO阻塞与非阻塞

          本文链接:https://www.haomeiwen.com/subject/sduedftx.html