netty(六)NIO、BIO与AIO

作者: 我犟不过你 | 来源:发表于2021-11-05 15:17 被阅读0次

一、BIO与NIO

本小节将BIO与NIO放到一起进行分析,主要为了突出其差别。

1.1 对比stream和channel

以前我们写代码,涉及到IO操作,首先想到的必然是一系列的stream,如InputStream等。如今随着java中nio的引入,我们多了一个选择,channel。那么两者相比有哪些不同,channel又有哪些优势呢?

1)stream 不会自动缓冲数据,channel 会利用系统提供的发送缓冲区、接收缓冲区(更为底层)。
2)stream 仅支持阻塞 API,channel 同时支持阻塞、非阻塞 API,网络 channel 可配合 selector 实现多路复用。
3)二者均为全双工,即读写可以同时进行。

二、IO模型

当调用一次 channel.read 或 stream.read 后,会切换至操作系统内核态来完成真正数据读取,而读取又分为两个阶段,分别为:

1)等待数据准备好

2)从内核向用户进程复制数据

阻塞IO模型

阻塞io模型.png

非阻塞IO模型

应用进程不停的轮询内核空间,会造成CPU浪费。

非阻塞io模型.png

多路IO复用模型

用户进程首先阻塞于select方法,当内核返回可读状态后,根据事件类型去做调用,将数据复制到用户空间缓冲区,处理区间状态阻塞。

io复用模型.png

异步IO模型

AIO是java中IO模型的一种,作为NIO的改进和增强随JDK1.7版本更新被集成在JDK的nio包中,因此AIO也被称作是NIO2.0。AIO提供了从建立连接到读、写的全异步操作。AIO可用于异步的文件读写和网络通信。

异步io模型.png

三、零拷贝

3.1 原始IO分析

如下伪代码,读取本地文件,通过socket写出:

File f = new File("helloword/data.txt");
RandomAccessFile file = new RandomAccessFile(file, "r");

byte[] buf = new byte[(int)f.length()];
// 包括2次拷贝(DMA(硬件到内核缓冲区),内核缓冲到用户缓冲)
file.read(buf);

Socket socket = ...;
//包括两次拷贝(用户缓冲区到socket缓冲区,DMA(socket缓冲区到网卡))
socket.getOutputStream().write(buf);

其内部实际的工作构成如下所示:

零拷贝 (1).png

我们根据代码的过程,结合图上的步骤逐步分析:
1)创建文件类file,定义byte数组,当真正开始执行read方法时,才开始获取数据。

java 本身并不具备 IO 读写能力,因此 read 方法调用后,要从 java 程序的用户态切换至内核态,去调用操作系统(内核)的读能力,将数据读入内核缓冲区。这期间用户线程阻塞,操作系统使用 DMA(Direct Memory Access,可以理解为硬件单元,解放CPU的同时,完成文件IO)来实现文件读,其间也不会使用 cpu。

此处可以算作第一次数据拷贝,但是通过DMA技术解决了IO问题。

2) 从内核态切换回用户态,将数据从内核缓冲区读入用户缓冲区(即 byte[] buf),这期间 cpu 会参与拷贝,无法利用 DMA。

此处是第二次数据拷贝。

3)调用 write 方法,这时将数据从用户缓冲区(byte[] buf)写入 socket 缓冲区,cpu 会参与拷贝。

此处是第三次拷贝。

4)接下来要向网卡写数据,这项能力 java 又不具备,因此又得从用户态切换至内核态,调用操作系统的写能力,使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu。

此处算作第四次拷贝,DMA技术解决IO问题。

总结
通过上面的分析,我们得到java本身不具备物理设备级别的IO读写,而是缓存级别的读写,通过调用操作系统来完成硬件级别的读写。

上述步骤总共经历3次状态切换,4次的数据拷贝。

3.2 NIO优化

3.2.1 使用直接内存

在前面我们学习ByteBuffer时,介绍到了其可以使用直接内存DirectByteBuffer。

ByteBuffer buffer = ByteBuffer.allocateDirect(16);

那么通过这个直接内存,能使我们前面的过程做到哪些优化呢?

零拷贝-直接内存.png

如上图所示,由于直接内存的引入,java 可以使用 DirectByteBuf 将堆外内存(内核缓冲区)映射到 jvm 内存(用户缓冲区)中来直接访问使用。而其他的步骤没有变化。

  • 这块内存不受 jvm 垃圾回收的影响,因此内存地址固定,有助于 IO 读写。

  • java 中的 DirectByteBuf 对象仅维护了此内存的虚引用,内存回收分成以下两步:
    1)DirectByteBuf 对象被垃圾回收,将虚引用加入引用队列
    2)通过专门线程访问引用队列,根据虚引用释放堆外内存

  • 减少了一次数据拷贝,用户态与内核态的切换次数没有减少

3.2.2 channel的transferTo/transferFrom

底层采用了 linux 2.1
进一步优化(底层采用了 linux 2.1 后提供的 sendFile 方法),java 中对应着两个 channel 调用transferTo/transferFrom 方法拷贝数据。

其过程如下图所示:

零拷贝-channel的transferTo_transferFrom.png

1)java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
2)数据从内核缓冲区传输到 socket 缓冲区,cpu 会参与拷贝
3)最后使用 DMA 将 socket 缓冲区的数据写入网卡,不会使用 cpu

如上图和过程所示,其中只经历了一次状态切换,数据拷贝仍然是3次。

底层采用了 linux 2.4
linux底层对于整体的效率又有了优化,如下图所示:

零拷贝-channel的transferTo_transferFrom(linux2.4).png

1)java 调用 transferTo 方法后,要从 java 程序的用户态切换至内核态,使用 DMA将数据读入内核缓冲区,不会使用 cpu
2)只会将一些 offset 和 length 信息拷入 socket 缓冲区,几乎无消耗
3)使用 DMA 将 内核缓冲区的数据写入网卡,不会使用 cpu

整个过程只发生了一次状态切换,实际只通过DMA经过两次数据拷贝。

实际所谓的零拷贝并不是真正的没有拷贝过程,而是不会有数据拷贝到用户态,即jvm内存中的过程。

四、AIO

4.1 简单介绍及使用

AIO 用来解决数据复制阶段的阻塞问题

  • 同步意味着,在进行读写操作时,线程需要等待结果,还是相当于闲置
  • 异步意味着,在进行读写操作时,线程不必等待结果,而是将来由操作系统来通过回调方式由另外的线程来获得结果

异步模型需要底层操作系统(Kernel)提供支持

  • Windows 系统通过 IOCP 实现了真正的异步 IO
  • Linux 系统异步 IO 在 2.6 版本引入,但其底层实现还是用多路复用模拟了异步 IO,性能没有优势

示例代码:

public class TestAio {

    public static void main(String[] args) throws IOException {
        try {
            AsynchronousFileChannel s = AsynchronousFileChannel.open(
                            Paths.get("C:\\Users\\P50\\Desktop\\text.txt"), StandardOpenOption.READ);
            ByteBuffer buffer = ByteBuffer.allocate(10);
            System.out.println("begin...");
            s.read(buffer, 0, null, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer attachment) {
                    System.out.println("read completed..." + result);
                    buffer.flip();
                    System.out.println(Thread.currentThread().getName() + ",内容是:" + print(buffer));
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    System.out.println("ead failed...");
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("do other things...");
        System.in.read();
    }

    static String print(ByteBuffer b) {
        StringBuilder stringBuilder = new StringBuilder();
        for (int i = 0; i < b.limit(); i++) {
            stringBuilder.append((char) b.get(i));
        }
        return stringBuilder.toString();
    }
}

结果,打印内容的并不是主线程,多次尝试,每次都是不同的,并且主线程并没有阻塞:

begin...
do other things...
read completed...10
Thread-7,内容是:helloworld

默认文件 AIO 使用的线程都是守护线程,所以最后要执行 System.in.read() 以避免守护线程意外结束。

4.2 网络编程

服务端示例代码:

public class AioServer {

    public static void main(String[] args) throws IOException {
        AsynchronousServerSocketChannel ssc = AsynchronousServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        ssc.accept(null, new AcceptHandler(ssc));
        System.in.read();
    }

    private static void closeChannel(AsynchronousSocketChannel sc) {
        try {
            System.out.printf("[%s] %s close\n", Thread.currentThread().getName(), sc.getRemoteAddress());
            sc.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
        private final AsynchronousSocketChannel sc;

        public ReadHandler(AsynchronousSocketChannel sc) {
            this.sc = sc;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            try {
                if (result == -1) {
                    closeChannel(sc);
                    return;
                }
                System.out.printf("[%s] %s read\n", Thread.currentThread().getName(), sc.getRemoteAddress());
                attachment.flip();
                System.out.println(Charset.defaultCharset().decode(attachment));
                attachment.clear();
                // 处理完第一个 read 时,需要再次调用 read 方法来处理下一个 read 事件
                sc.read(attachment, attachment, this);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            closeChannel(sc);
            exc.printStackTrace();
        }
    }

    private static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
        private final AsynchronousSocketChannel sc;

        private WriteHandler(AsynchronousSocketChannel sc) {
            this.sc = sc;
        }

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            // 如果作为附件的 buffer 还有内容,需要再次 write 写出剩余内容
            if (attachment.hasRemaining()) {
                sc.write(attachment);
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            exc.printStackTrace();
            closeChannel(sc);
        }
    }

    private static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {
        private final AsynchronousServerSocketChannel ssc;

        public AcceptHandler(AsynchronousServerSocketChannel ssc) {
            this.ssc = ssc;
        }

        @Override
        public void completed(AsynchronousSocketChannel sc, Object attachment) {
            try {
                System.out.printf("[%s] %s connected\n", Thread.currentThread().getName(), sc.getRemoteAddress());
            } catch (IOException e) {
                e.printStackTrace();
            }
            ByteBuffer buffer = ByteBuffer.allocate(16);
            // 读事件由 ReadHandler 处理
            sc.read(buffer, buffer, new ReadHandler(sc));
            // 写事件由 WriteHandler 处理
            sc.write(Charset.defaultCharset().encode("server hello!"), ByteBuffer.allocate(16), new WriteHandler(sc));
            // 处理完第一个 accpet 时,需要再次调用 accept 方法来处理下一个 accept 事件
            ssc.accept(null, this);
        }

        @Override
        public void failed(Throwable exc, Object attachment) {
            exc.printStackTrace();
        }
    }
}

相关文章

网友评论

    本文标题:netty(六)NIO、BIO与AIO

    本文链接:https://www.haomeiwen.com/subject/mkzlzltx.html