BIO NIO AIO

作者: AlienPaul | 来源:发表于2020-06-13 09:42 被阅读0次

    BIO

    BIO意为阻塞IO(Blocking IO),即所有的通信方法如Accept,connect,read和write方法均阻塞。线程在大部分时间处于阻塞等待状态,无法支持高负载高并发的场景,资源利用率极为低下。
    Java传统BIO是面向流的。流是一种单向的数据传输媒介。根据传输数据的不同,流总的来说分为字符流和字节流两种。其中字节流以Stream结尾,字符流以Reader/Writer结尾。

    这些流有如下分类(只列出常用的):

    • InputStream:字节输入流
      • BufferedInputStream:带缓冲的输入流
      • FileInputStream:文件输入流
      • ObjectInputStream:对象输入流,读入序列化的对象,要求对象必须实现Serializable接口
      • DataInputStream:数据输入流,读取Java基本数据类型
    • OutputStream:字节输出流
      • BufferedOutputStream:带缓存的输出流
      • FileOutputStream:文件输出流
      • ObjectOutputStream:对象输出流
      • DataOutputStream:数据输出流
    • InputStreamReader:字符输出流
      • BufferedReader:带缓存的字符输入流
      • FileReader:文件字符输入流
    • OutputStreamWriter:字节输入流
      • BufferedWriter:带缓存的字符输出流
      • FileWriter:文件字符输出流

    下面是BIO网络通信的示例代码:

    BioServer(BIO服务端)

    public class BioServer {
        public static void main(String[] args) throws Exception {
            ServerSocket serverSocket = new ServerSocket(9900);
            while (true) {
                // 方法阻塞,等待socket建立连接
                Socket socket = serverSocket.accept();
    
                InputStream inputStream = socket.getInputStream();
                BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
                byte[] buffer = new byte[1024];
                int read;
    
                // 方法阻塞,等待接收到数据,或者是连接关闭
                while ((read = bufferedInputStream.read(buffer)) != -1) {
                    System.out.println(new String(buffer, 0, read));
                }
                
                // 使用完毕后需要关闭socket
                socket.close();
            }
    
    //        使用完毕后必须关闭相关资源,这里演示需要使用了死循环,故不需要关闭
    //        bufferedInputStream.close();
    //        inputStream.close();
    //        serverSocket.close();
        }
    }
    

    BioClient(BIO客户端)

    public class BioClient {
        public static void main(String[] args) throws Exception {
            Socket socket = new Socket();
            // 阻塞方式,直到连接server成功
            socket.connect(new InetSocketAddress("127.0.0.1", 9900));
            OutputStream outputStream = socket.getOutputStream();
            BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream);
            bufferedOutputStream.write("Hello from client\n".getBytes());
            // 由于是缓存流,write数据并不会被立即发送,需要使用flush将缓冲区所有数据写入流
            bufferedOutputStream.flush();
            
            bufferedOutputStream.close();
            outputStream.close();
            socket.close();
        }
    }
    

    NIO

    NIO意为New IO,即新一代IO。其他地方有介绍说Java NIO为非阻塞IO(Non-blocking IO)。这一点其实是不对的,因为Java的NIO既支持阻塞方式也支持非阻塞方式。

    NIO相对传统IO,编程模型上做出了较大改变。NIO是面向通道(channel)和缓冲区(buffer)的。Channel和流最大的不同之处在于channel支持双向数据传递,而流只支持单向。

    除此之外NIO还支持零拷贝。零拷贝即CPU不参与数据复制的过程,可以避免用户空间和操作系统内核空间的上下文切换。零拷贝不仅需要应用的支持,还需要操作系统的支持。下面举一个例子,我们从文件系统读取一个文件,从socket发送出去。如果用传统方式数据流经如下步骤:

    文件系统 -> 内核读缓存 -> 应用缓存(Java) -> 内核Socket缓存 -> 网卡缓存
    

    使用零拷贝技术之后,数据流经步骤减少了2次拷贝,如下所示:

    文件系统 -> 内核读缓存 -> 内核Socket缓存 -> 网卡缓存
    

    Linux系统支持的零拷贝有两种:

    • mmap内存映射:DMA控制器从外部存储加载数据到内核缓存,然后将应用内缓存通过地址映射的方式,映射到内核缓存。这样可以省去数据从内核缓存复制到应用缓存的步骤。
    • sendfile:DMA控制器从外部存储加载数据到内核缓存,然后系统直接将内核缓存中的数据复制到socket缓存。这样可以省去数据从应用缓存复制到socket缓存的步骤。

    mmap的Java NIO使用方式:

    MappedByteBuffer mappedByteBuffer = new RandomAccessFile(file, "r").getChannel().map(FileChannel.MapMode.READ_ONLY, 0, len);
    

    该方法返回的mappedByteBuffer是文件数据的内核缓存在Java应用中的映射。

    sendfile的Java NIO使用方式:

    FileChannel sourceChannel = new RandomAccessFile("/path/to/file", "rw").getChannel();
    SocketChannel socketChannel = SocketChannel.open(...);
    sourceChannel.transferTo(0, sourceChannel.size(), socketChannel);
    

    FileChanneltransferTotransferFrom使用了零拷贝技术。transferTo负责将文件FileChannel的内容发往其他channel,transferFrom从其他channel读取内容,发送给FileChannel

    通过上面我们可以得出结论,减少内核缓存和应用缓存(主要指Java应用堆内存)之间的复制操作可以提高IO操作效率。为了满足这种场景,NIO的ByteBuffer提供了两种:堆内buffer和堆外buffer。堆内buffer使用一个字节数组作为数据载体,堆外内存不属于Java虚拟机,归操作系统管理。使用堆外内存能够显著提高IO性能。

    ByteBuffer分配堆内内存和堆外内存的方式如下:

    // 分配1024字节堆内内存
    ByteBuffer byteBuffer = ByteBuffer.allocate(1024)
    
    // 分配1024字节堆外内存
    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024)
    

    阻塞NIO

    下面举一个例子,介绍下阻塞模式的NIO网络通信的写法。

    NioServer

    public class NioServer {
        public static void main(String[] args) throws Exception {
            // 建立连接并绑定地址端口
            ServerSocketChannel channel = ServerSocketChannel.open().bind(new InetSocketAddress("127.0.0.1", 9900));
    
            // 创建一个1024字节大小的字节缓冲区
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            while (true) {
                // 接受socket连接关闭
                // 方法阻塞,直到socket链接成功
                SocketChannel socketChannel = channel.accept();
                int read;
                // 方法阻塞,直到读取到数据,或者socket连接关闭
                while ((read = socketChannel.read(byteBuffer)) != -1) {
                    System.out.println(new String(byteBuffer.array(), 0, read));
                    byteBuffer.clear();
                }
    
                // 使用完需要关闭socketChannel
                socketChannel.close();
            }
    
    //        channel.close();
        }
    }
    

    NioClient

    public class NioClient {
        public static void main(String[] args) throws Exception {
            // 建立连接
            SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9900));
            ByteBuffer byteBuffer = ByteBuffer.wrap("Hello from client".getBytes());
            socketChannel.write(byteBuffer);
            socketChannel.close();
        }
    }
    

    非阻塞NIO

    非阻塞NIO所有的数据传输方法比如accept connect read write均不阻塞。可以使用如下配置将阻塞NIO转变为非阻塞NIO:

    channel.configureBlocking(false);
    

    需要注意的是,FileChannel不支持非阻塞模式。

    非阻塞NIO的数据传输方法都是非阻塞的,调用之后无论成功或者失败都立刻返回。但问题是,我们无法知道数据是否传送成功,什么时候可以去传送/接收数据,给编程造成了很大的困难。

    为了解决这个问题,Java NIO提供了多路复用器:Selector。Selector以轮询的方式检查所有注册到该Selector下的所有channel的事件。一旦有channel准备好了连接,读或者写,Selector可以立刻检测到并处理该事件。

    Selector的厉害之处在于能够使用一个线程来处理多个channel的连接和读写事件,避免了线程上下文切换,十分的高效。

    Selector并不会主动去监听某个channel的某些事件。需要主动将channel需要被Selector检测的事件注册到Selector。

    Channel的事件有如下4种:

    • SelectionKey.OP_ACCEPT
    • SelectionKey.OP_READ
    • SelectionKey.OP_WRITE
    • SelectionKey.OP_CONNECT

    创建Selector和向Selector注册事件的方法为:

    Selector selector = Selector.open();
    
    channel.register(selector, SelectionKey.OP_ACCEPT);
    

    如果需要Selector检测同一个channel的多个事件,可以通过按位或运算符合并事件。例如SelectionKey.OP_WRITE | SelectionKey.OP_READ

    下面是一个使用Selector的非阻塞NIO网络通信server的例子。该server接受到client发来的数据之后,回复“Hello from server”给客户端。

    代码如下所示:

    public class SelectorServer {
        public static void main(String[] args) throws Exception {
            ServerSocketChannel channel = ServerSocketChannel.open().bind(new InetSocketAddress("127.0.0.1", 9900));
            // 设置为非阻塞模式
            channel.configureBlocking(false);
            Selector selector = Selector.open();
            // 向selector注册ACCEPT事件,当ServerSocketChannel发生accept事件的时候会被selector select到
            channel.register(selector, SelectionKey.OP_ACCEPT);
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            ByteBuffer echoBuffer = ByteBuffer.wrap("Hello from server\n".getBytes());
            while (true) {
                // 方法阻塞
                // selector一旦轮训到任何注册的channel发生的任意事件,立刻返回
                selector.select();
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    // 获取SelectionKey
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    // 如果是Accept事件
                    if (selectionKey.isAcceptable()) {
                        // 接受socket连接,非阻塞
                        SocketChannel socketChannel = channel.accept();
                        // 注意,这里不要忘了设置socketChannel为非阻塞模式,否则register方法会报错
                        socketChannel.configureBlocking(false);
                        // 为socketChannel注册READ事件,channel可以接收数据
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    // 如果是read事件
                    } else if (selectionKey.isReadable()) {
                        // 从SelectionKey获取附带的channel,即产生read事件的channel
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        int read = socketChannel.read(buffer);
    
                        System.out.println(new String(buffer.array(), 0, read));
                        buffer.clear();
                        // 设置接下来关心的事件为write
                        selectionKey.interestOps(SelectionKey.OP_WRITE);
                    // 如果是write事件
                    } else if (selectionKey.isWritable()) {
                        // 和read部分类似,获取触发事件的channel
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        socketChannel.write(echoBuffer);
                        // 设置接下来关心的事件为read
                        selectionKey.interestOps(SelectionKey.OP_READ);
                        echoBuffer.rewind();
                    }
                }
            }
            
    //        程序退出前记得关闭selector和ServerSocketChannel,这里演示方便使用了死循环,故不再需要关闭
    //        selector.close();
    //        channel.close();
        }
    }
    

    AIO

    AIO全称为Asynchronous IO,是真正的全异步IO。AIO可以实现“调用后不管”,无论IO是否需要等待,操作多久,线程均不会阻塞。等到操作结束时,可以使用专用的线程池,负责处理结果回调。

    AIO所有的IO操作均有两个版本:返回Future或者是参数使用CompletionHandler(异步回调)。
    例如AsynchronousServerSocketChannelaccept方法的两个版本如下:

    public abstract <A> void accept(A attachment,
                                    CompletionHandler<AsynchronousSocketChannel,? super A> handler);
                                    
    public abstract Future<AsynchronousSocketChannel> accept();
    

    CompletionHandler的代码如下所示:

    public interface CompletionHandler<V,A> {
    
        /**
         * Invoked when an operation has completed.
         *
         * @param   result
         *          The result of the I/O operation.
         * @param   attachment
         *          The object attached to the I/O operation when it was initiated.
         */
        void completed(V result, A attachment);
    
        /**
         * Invoked when an operation fails.
         *
         * @param   exc
         *          The exception to indicate why the I/O operation failed
         * @param   attachment
         *          The object attached to the I/O operation when it was initiated.
         */
        void failed(Throwable exc, A attachment);
    }
    

    其中completed在操作成功时候调用,failed在操作失败时候调用。

    注意:attachment在调用对应IO操作方法的时候(比如accept)传入。如果IO操作时候需要使用其他对象,将其作为attachment传入是一个很方便的用法。

    代码示例

    下面代码为AIO 网络通信server和client。

    AioServer

    public class AioServer {
        public static void main(String[] args) throws Exception {
            // 创建AsynchronousChannelGroup
            // 本质是一个线程池,负责接受异步回调
            AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
            // 创建通道并绑定地址端口
            AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress("127.0.0.1", 9900));
            // 接受socket连接,使用异步回调的方式
            channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
                @Override
                public void completed(AsynchronousSocketChannel result, Object attachment) {
                    // 在socket连接成功的时候调用
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    try {
                        // read方法这里使用返回Future的版本
                        // 调用future的get方法,阻塞等待数据读取结果
                        Integer read = result.read(byteBuffer).get();
                        System.out.println(new String(byteBuffer.array(), 0, read));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                    // 再次调用accept方法,接受下一个socket的连接
                    // 调用一次accept方法AsynchronousServerSocketChannel仅接受一次请求
                    channel.accept(null, this);
                }
    
                @Override
                public void failed(Throwable exc, Object attachment) {
                    // socket连接失败的时候调用
                    System.out.println("Accept failed");
                }
            });
    
            // 阻塞主线程,防止程序退出
            CountDownLatch countDownLatch = new CountDownLatch(1);
            countDownLatch.await();
        }
    }
    

    AioClient

    public class AioClient {
        public static void main(String[] args) throws Exception {
            AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
            AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
            ByteBuffer buffer = ByteBuffer.wrap("Hello from client".getBytes());
            // 连接socket服务器,使用异步回调的方式
            channel.connect(new InetSocketAddress("127.0.0.1", 9900), null, new CompletionHandler<Void, Object>() {
                @Override
                public void completed(Void result, Object attachment) {
                    // 异步将buffer的数据写入channel
                    // 由于不关心什么时候能够写入完毕,我们不处理此方法的返回future
                    channel.write(buffer);
                    buffer.rewind();
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
    
                @Override
                public void failed(Throwable exc, Object attachment) {
                    System.out.println("Connect failed");
                }
            });
    
            // 阻塞主线程,防止程序退出
            CountDownLatch countDownLatch = new CountDownLatch(1);
            countDownLatch.await();
        }
    }
    

    相关文章

      网友评论

        本文标题:BIO NIO AIO

        本文链接:https://www.haomeiwen.com/subject/mxkatktx.html