netty(六)NIO、BIO与AIO

作者: 我犟不过你 | 来源:发表于2021-11-05 15:17 被阅读0次

    一、BIO与NIO

    本小节将BIO与NIO放到一起进行分析,主要为了突出其差别。

    1.1 对比stream和channel

    以前我们写代码,涉及到IO操作,首先想到的必然是一系列的stream,如InputStream等。如今随着java中nio的引入,我们多了一个选择,channel。那么两者相比有哪些不同,channel又有哪些优势呢?

    1)stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)。
    2)stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用。
    3)二者均为全双工,即读写可以同时进行。

    二、IO模型

    当调用一次 channel.read 或 stream.read 后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

    1)等待数据准备好

    2)从内核向用户进程复制数据

    阻塞IO模型

    阻塞io模型.png

    非阻塞IO模型

    应用进程不停的轮询内核空间,会造成CPU浪费。

    非阻塞io模型.png

    多路IO复用模型

    用户进程首先阻塞于select方法,当内核返回可读状态后,根据事件类型去做调用,将数据复制到用户空间缓冲区,处理区间状态阻塞。

    io复用模型.png

    异步IO模型

    AIO是java中IO模型的一种,作为NIO的改进和增强随JDK1.7版本更新被集成在JDK的nio包中,因此AIO也被称作是NIO2.0。AIO提供了从建立连接到读、写的全异步操作。AIO可用于异步的文件读写和网络通信。

    异步io模型.png

    三、零拷贝

    3.1 原始IO分析

    如下伪代码,读取本地文件,通过socket写出:

    File f = new File("helloword/data.txt");
    RandomAccessFile file = new RandomAccessFile(file, "r");
    
    byte[] buf = new byte[(int)f.length()];
    // 包括2次拷贝(DMA(硬件到内核缓冲区),内核缓冲到用户缓冲)
    file.read(buf);
    
    Socket socket = ...;
    //包括两次拷贝(用户缓冲区到socket缓冲区,DMA(socket缓冲区到网卡))
    socket.getOutputStream().write(buf);
    

    其内部实际的工作构成如下所示:

    零拷贝 (1).png

    我们根据代码的过程,结合图上的步骤逐步分析:
    1)创建文件类file,定义byte数组,当真正开始执行read方法时,才开始获取数据。

    java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(内核)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access,可以理解为硬件单元,解放CPU的同时,完成文件IO)来实现文件读,其间也不会使用 cpu。

    此处可以算作第一次数据拷贝,但是通过DMA技术解决了IO问题。

    2) 从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA。

    此处是第二次数据拷贝。

    3)调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝。

    此处是第三次拷贝。

    4)接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu。

    此处算作第四次拷贝,DMA技术解决IO问题。

    总结
    通过上面的分析,我们得到java本身不具备物理设备级别的IO读写,而是缓存级别的读写,通过调用操作系统来完成硬件级别的读写。

    上述步骤总共经历3次状态切换,4次的数据拷贝。

    3.2 NIO优化

    3.2.1 使用直接内存

    在前面我们学习ByteBuffer时,介绍到了其可以使用直接内存DirectByteBuffer。

    ByteBuffer buffer = ByteBuffer.allocateDirect(16);

    那么通过这个直接内存,能使我们前面的过程做到哪些优化呢?

    零拷贝-直接内存.png

    如上图所示,由于直接内存的引入,java 可以使用 DirectByteBuf 将堆外内存(内核缓冲区)映射到 jvm 内存(用户缓冲区)中来直接访问使用。而其他的步骤没有变化。

    • 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写。

    • java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成以下两步:
      1)DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
      2)通过专门线程访问引用队列,根据虚引用释放堆外内存

    • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

    3.2.2 channel的transferTo/transferFrom

    底层采用了 linux 2.1
    进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用transferTo/transferFrom 方法拷贝数据。

    其过程如下图所示:

    零拷贝-channel的transferTo_transferFrom.png

    1)java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
    2)数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
    3)最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

    如上图和过程所示,其中只经历了一次状态切换,数据拷贝仍然是3次。

    底层采用了 linux 2.4
    linux底层对于整体的效率又有了优化,如下图所示:

    零拷贝-channel的transferTo_transferFrom(linux2.4).png

    1)java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
    2)只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
    3)使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu

    整个过程只发生了一次状态切换,实际只通过DMA经过两次数据拷贝。

    实际所谓的零拷贝并不是真正的没有拷贝过程,而是不会有数据拷贝到用户态,即jvm内存中的过程。

    四、AIO

    4.1 简单介绍及使用

    AIO 用来解决数据复制阶段的阻塞问题

    • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
    • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

    异步模型需要底层操作系统(Kernel)提供支持

    • Windows 系统通过 IOCP 实现了真正的异步 IO
    • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

    示例代码:

    public class TestAio {
    
        public static void main(String[] args) throws IOException {
            try {
                AsynchronousFileChannel s = AsynchronousFileChannel.open(
                                Paths.get("C:\\Users\\P50\\Desktop\\text.txt"), StandardOpenOption.READ);
                ByteBuffer buffer = ByteBuffer.allocate(10);
                System.out.println("begin...");
                s.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result, ByteBuffer attachment) {
                        System.out.println("read completed..." + result);
                        buffer.flip();
                        System.out.println(Thread.currentThread().getName() + ",内容是:" + print(buffer));
                    }
    
                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        System.out.println("ead failed...");
                    }
                });
    
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("do other things...");
            System.in.read();
        }
    
        static String print(ByteBuffer b) {
            StringBuilder stringBuilder = new StringBuilder();
            for (int i = 0; i < b.limit(); i++) {
                stringBuilder.append((char) b.get(i));
            }
            return stringBuilder.toString();
        }
    }
    

    结果,打印内容的并不是主线程,多次尝试,每次都是不同的,并且主线程并没有阻塞:

    begin...
    do other things...
    read completed...10
    Thread-7,内容是:helloworld
    

    默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束。

    4.2 网络编程

    服务端示例代码:

    public class AioServer {
    
        public static void main(String[] args) throws IOException {
            AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
            ssc.bind(new InetSocketAddress(8080));
            ssc.accept(null, new AcceptHandler(ssc));
            System.in.read();
        }
    
        private static void closeChannel(AsynchronousSocketChannel sc) {
            try {
                System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
                sc.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
            private final AsynchronousSocketChannel sc;
    
            public ReadHandler(AsynchronousSocketChannel sc) {
                this.sc = sc;
            }
    
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                try {
                    if (result == -1) {
                        closeChannel(sc);
                        return;
                    }
                    System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
                    attachment.flip();
                    System.out.println(Charset.defaultCharset().decode(attachment));
                    attachment.clear();
                    // 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
                    sc.read(attachment, attachment, this);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                closeChannel(sc);
                exc.printStackTrace();
            }
        }
    
        private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
            private final AsynchronousSocketChannel sc;
    
            private WriteHandler(AsynchronousSocketChannel sc) {
                this.sc = sc;
            }
    
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                // 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
                if (attachment.hasRemaining()) {
                    sc.write(attachment);
                }
            }
    
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                exc.printStackTrace();
                closeChannel(sc);
            }
        }
    
        private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
            private final AsynchronousServerSocketChannel ssc;
    
            public AcceptHandler(AsynchronousServerSocketChannel ssc) {
                this.ssc = ssc;
            }
    
            @Override
            public void completed(AsynchronousSocketChannel sc, Object attachment) {
                try {
                    System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
                } catch (IOException e) {
                    e.printStackTrace();
                }
                ByteBuffer buffer = ByteBuffer.allocate(16);
                // 读事件由 ReadHandler 处理
                sc.read(buffer, buffer, new ReadHandler(sc));
                // 写事件由 WriteHandler 处理
                sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
                // 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
                ssc.accept(null, this);
            }
    
            @Override
            public void failed(Throwable exc, Object attachment) {
                exc.printStackTrace();
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:netty(六)NIO、BIO与AIO

        本文链接:https://www.haomeiwen.com/subject/mkzlzltx.html