BIO NIO AIO

作者: AlienPaul | 来源:发表于2020-06-13 09:42 被阅读0次

BIO

BIO意为阻塞IO(Blocking IO),即所有的通信方法如Accept,connect,read和write方法均阻塞。线程在大部分时间处于阻塞等待状态,无法支持高负载高并发的场景,资源利用率极为低下。
Java传统BIO是面向流的。流是一种单向的数据传输媒介。根据传输数据的不同,流总的来说分为字符流和字节流两种。其中字节流以Stream结尾,字符流以Reader/Writer结尾。

这些流有如下分类(只列出常用的):

  • InputStream:字节输入流
    • BufferedInputStream:带缓冲的输入流
    • FileInputStream:文件输入流
    • ObjectInputStream:对象输入流,读入序列化的对象,要求对象必须实现Serializable接口
    • DataInputStream:数据输入流,读取Java基本数据类型
  • OutputStream:字节输出流
    • BufferedOutputStream:带缓存的输出流
    • FileOutputStream:文件输出流
    • ObjectOutputStream:对象输出流
    • DataOutputStream:数据输出流
  • InputStreamReader:字符输出流
    • BufferedReader:带缓存的字符输入流
    • FileReader:文件字符输入流
  • OutputStreamWriter:字节输入流
    • BufferedWriter:带缓存的字符输出流
    • FileWriter:文件字符输出流

下面是BIO网络通信的示例代码:

BioServer(BIO服务端)

public class BioServer {
    public static void main(String[] args) throws Exception {
        ServerSocket serverSocket = new ServerSocket(9900);
        while (true) {
            // 方法阻塞,等待socket建立连接
            Socket socket = serverSocket.accept();

            InputStream inputStream = socket.getInputStream();
            BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
            byte[] buffer = new byte[1024];
            int read;

            // 方法阻塞,等待接收到数据,或者是连接关闭
            while ((read = bufferedInputStream.read(buffer)) != -1) {
                System.out.println(new String(buffer, 0, read));
            }
            
            // 使用完毕后需要关闭socket
            socket.close();
        }

//        使用完毕后必须关闭相关资源,这里演示需要使用了死循环,故不需要关闭
//        bufferedInputStream.close();
//        inputStream.close();
//        serverSocket.close();
    }
}

BioClient(BIO客户端)

public class BioClient {
    public static void main(String[] args) throws Exception {
        Socket socket = new Socket();
        // 阻塞方式,直到连接server成功
        socket.connect(new InetSocketAddress("127.0.0.1", 9900));
        OutputStream outputStream = socket.getOutputStream();
        BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream);
        bufferedOutputStream.write("Hello from client\n".getBytes());
        // 由于是缓存流,write数据并不会被立即发送,需要使用flush将缓冲区所有数据写入流
        bufferedOutputStream.flush();
        
        bufferedOutputStream.close();
        outputStream.close();
        socket.close();
    }
}

NIO

NIO意为New IO,即新一代IO。其他地方有介绍说Java NIO为非阻塞IO(Non-blocking IO)。这一点其实是不对的,因为Java的NIO既支持阻塞方式也支持非阻塞方式。

NIO相对传统IO,编程模型上做出了较大改变。NIO是面向通道(channel)和缓冲区(buffer)的。Channel和流最大的不同之处在于channel支持双向数据传递,而流只支持单向。

除此之外NIO还支持零拷贝。零拷贝即CPU不参与数据复制的过程,可以避免用户空间和操作系统内核空间的上下文切换。零拷贝不仅需要应用的支持,还需要操作系统的支持。下面举一个例子,我们从文件系统读取一个文件,从socket发送出去。如果用传统方式数据流经如下步骤:

文件系统 -> 内核读缓存 -> 应用缓存(Java) -> 内核Socket缓存 -> 网卡缓存

使用零拷贝技术之后,数据流经步骤减少了2次拷贝,如下所示:

文件系统 -> 内核读缓存 -> 内核Socket缓存 -> 网卡缓存

Linux系统支持的零拷贝有两种:

  • mmap内存映射:DMA控制器从外部存储加载数据到内核缓存,然后将应用内缓存通过地址映射的方式,映射到内核缓存。这样可以省去数据从内核缓存复制到应用缓存的步骤。
  • sendfile:DMA控制器从外部存储加载数据到内核缓存,然后系统直接将内核缓存中的数据复制到socket缓存。这样可以省去数据从应用缓存复制到socket缓存的步骤。

mmap的Java NIO使用方式:

MappedByteBuffer mappedByteBuffer = new RandomAccessFile(file, "r").getChannel().map(FileChannel.MapMode.READ_ONLY, 0, len);

该方法返回的mappedByteBuffer是文件数据的内核缓存在Java应用中的映射。

sendfile的Java NIO使用方式:

FileChannel sourceChannel = new RandomAccessFile("/path/to/file", "rw").getChannel();
SocketChannel socketChannel = SocketChannel.open(...);
sourceChannel.transferTo(0, sourceChannel.size(), socketChannel);

FileChanneltransferTotransferFrom使用了零拷贝技术。transferTo负责将文件FileChannel的内容发往其他channel,transferFrom从其他channel读取内容,发送给FileChannel

通过上面我们可以得出结论,减少内核缓存和应用缓存(主要指Java应用堆内存)之间的复制操作可以提高IO操作效率。为了满足这种场景,NIO的ByteBuffer提供了两种:堆内buffer和堆外buffer。堆内buffer使用一个字节数组作为数据载体,堆外内存不属于Java虚拟机,归操作系统管理。使用堆外内存能够显著提高IO性能。

ByteBuffer分配堆内内存和堆外内存的方式如下:

// 分配1024字节堆内内存
ByteBuffer byteBuffer = ByteBuffer.allocate(1024)

// 分配1024字节堆外内存
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024)

阻塞NIO

下面举一个例子,介绍下阻塞模式的NIO网络通信的写法。

NioServer

public class NioServer {
    public static void main(String[] args) throws Exception {
        // 建立连接并绑定地址端口
        ServerSocketChannel channel = ServerSocketChannel.open().bind(new InetSocketAddress("127.0.0.1", 9900));

        // 创建一个1024字节大小的字节缓冲区
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        while (true) {
            // 接受socket连接关闭
            // 方法阻塞,直到socket链接成功
            SocketChannel socketChannel = channel.accept();
            int read;
            // 方法阻塞,直到读取到数据,或者socket连接关闭
            while ((read = socketChannel.read(byteBuffer)) != -1) {
                System.out.println(new String(byteBuffer.array(), 0, read));
                byteBuffer.clear();
            }

            // 使用完需要关闭socketChannel
            socketChannel.close();
        }

//        channel.close();
    }
}

NioClient

public class NioClient {
    public static void main(String[] args) throws Exception {
        // 建立连接
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9900));
        ByteBuffer byteBuffer = ByteBuffer.wrap("Hello from client".getBytes());
        socketChannel.write(byteBuffer);
        socketChannel.close();
    }
}

非阻塞NIO

非阻塞NIO所有的数据传输方法比如accept connect read write均不阻塞。可以使用如下配置将阻塞NIO转变为非阻塞NIO:

channel.configureBlocking(false);

需要注意的是,FileChannel不支持非阻塞模式。

非阻塞NIO的数据传输方法都是非阻塞的,调用之后无论成功或者失败都立刻返回。但问题是,我们无法知道数据是否传送成功,什么时候可以去传送/接收数据,给编程造成了很大的困难。

为了解决这个问题,Java NIO提供了多路复用器:Selector。Selector以轮询的方式检查所有注册到该Selector下的所有channel的事件。一旦有channel准备好了连接,读或者写,Selector可以立刻检测到并处理该事件。

Selector的厉害之处在于能够使用一个线程来处理多个channel的连接和读写事件,避免了线程上下文切换,十分的高效。

Selector并不会主动去监听某个channel的某些事件。需要主动将channel需要被Selector检测的事件注册到Selector。

Channel的事件有如下4种:

  • SelectionKey.OP_ACCEPT
  • SelectionKey.OP_READ
  • SelectionKey.OP_WRITE
  • SelectionKey.OP_CONNECT

创建Selector和向Selector注册事件的方法为:

Selector selector = Selector.open();

channel.register(selector, SelectionKey.OP_ACCEPT);

如果需要Selector检测同一个channel的多个事件,可以通过按位或运算符合并事件。例如SelectionKey.OP_WRITE | SelectionKey.OP_READ

下面是一个使用Selector的非阻塞NIO网络通信server的例子。该server接受到client发来的数据之后,回复“Hello from server”给客户端。

代码如下所示:

public class SelectorServer {
    public static void main(String[] args) throws Exception {
        ServerSocketChannel channel = ServerSocketChannel.open().bind(new InetSocketAddress("127.0.0.1", 9900));
        // 设置为非阻塞模式
        channel.configureBlocking(false);
        Selector selector = Selector.open();
        // 向selector注册ACCEPT事件,当ServerSocketChannel发生accept事件的时候会被selector select到
        channel.register(selector, SelectionKey.OP_ACCEPT);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ByteBuffer echoBuffer = ByteBuffer.wrap("Hello from server\n".getBytes());
        while (true) {
            // 方法阻塞
            // selector一旦轮训到任何注册的channel发生的任意事件,立刻返回
            selector.select();
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                // 获取SelectionKey
                SelectionKey selectionKey = iterator.next();
                iterator.remove();
                // 如果是Accept事件
                if (selectionKey.isAcceptable()) {
                    // 接受socket连接,非阻塞
                    SocketChannel socketChannel = channel.accept();
                    // 注意,这里不要忘了设置socketChannel为非阻塞模式,否则register方法会报错
                    socketChannel.configureBlocking(false);
                    // 为socketChannel注册READ事件,channel可以接收数据
                    socketChannel.register(selector, SelectionKey.OP_READ);
                // 如果是read事件
                } else if (selectionKey.isReadable()) {
                    // 从SelectionKey获取附带的channel,即产生read事件的channel
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    int read = socketChannel.read(buffer);

                    System.out.println(new String(buffer.array(), 0, read));
                    buffer.clear();
                    // 设置接下来关心的事件为write
                    selectionKey.interestOps(SelectionKey.OP_WRITE);
                // 如果是write事件
                } else if (selectionKey.isWritable()) {
                    // 和read部分类似,获取触发事件的channel
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    socketChannel.write(echoBuffer);
                    // 设置接下来关心的事件为read
                    selectionKey.interestOps(SelectionKey.OP_READ);
                    echoBuffer.rewind();
                }
            }
        }
        
//        程序退出前记得关闭selector和ServerSocketChannel,这里演示方便使用了死循环,故不再需要关闭
//        selector.close();
//        channel.close();
    }
}

AIO

AIO全称为Asynchronous IO,是真正的全异步IO。AIO可以实现“调用后不管”,无论IO是否需要等待,操作多久,线程均不会阻塞。等到操作结束时,可以使用专用的线程池,负责处理结果回调。

AIO所有的IO操作均有两个版本:返回Future或者是参数使用CompletionHandler(异步回调)。
例如AsynchronousServerSocketChannelaccept方法的两个版本如下:

public abstract <A> void accept(A attachment,
                                CompletionHandler<AsynchronousSocketChannel,? super A> handler);
                                
public abstract Future<AsynchronousSocketChannel> accept();

CompletionHandler的代码如下所示:

public interface CompletionHandler<V,A> {

    /**
     * Invoked when an operation has completed.
     *
     * @param   result
     *          The result of the I/O operation.
     * @param   attachment
     *          The object attached to the I/O operation when it was initiated.
     */
    void completed(V result, A attachment);

    /**
     * Invoked when an operation fails.
     *
     * @param   exc
     *          The exception to indicate why the I/O operation failed
     * @param   attachment
     *          The object attached to the I/O operation when it was initiated.
     */
    void failed(Throwable exc, A attachment);
}

其中completed在操作成功时候调用,failed在操作失败时候调用。

注意:attachment在调用对应IO操作方法的时候(比如accept)传入。如果IO操作时候需要使用其他对象,将其作为attachment传入是一个很方便的用法。

代码示例

下面代码为AIO 网络通信server和client。

AioServer

public class AioServer {
    public static void main(String[] args) throws Exception {
        // 创建AsynchronousChannelGroup
        // 本质是一个线程池,负责接受异步回调
        AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
        // 创建通道并绑定地址端口
        AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress("127.0.0.1", 9900));
        // 接受socket连接,使用异步回调的方式
        channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            @Override
            public void completed(AsynchronousSocketChannel result, Object attachment) {
                // 在socket连接成功的时候调用
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                try {
                    // read方法这里使用返回Future的版本
                    // 调用future的get方法,阻塞等待数据读取结果
                    Integer read = result.read(byteBuffer).get();
                    System.out.println(new String(byteBuffer.array(), 0, read));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                // 再次调用accept方法,接受下一个socket的连接
                // 调用一次accept方法AsynchronousServerSocketChannel仅接受一次请求
                channel.accept(null, this);
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                // socket连接失败的时候调用
                System.out.println("Accept failed");
            }
        });

        // 阻塞主线程,防止程序退出
        CountDownLatch countDownLatch = new CountDownLatch(1);
        countDownLatch.await();
    }
}

AioClient

public class AioClient {
    public static void main(String[] args) throws Exception {
        AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
        AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
        ByteBuffer buffer = ByteBuffer.wrap("Hello from client".getBytes());
        // 连接socket服务器,使用异步回调的方式
        channel.connect(new InetSocketAddress("127.0.0.1", 9900), null, new CompletionHandler<Void, Object>() {
            @Override
            public void completed(Void result, Object attachment) {
                // 异步将buffer的数据写入channel
                // 由于不关心什么时候能够写入完毕,我们不处理此方法的返回future
                channel.write(buffer);
                buffer.rewind();
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("Connect failed");
            }
        });

        // 阻塞主线程,防止程序退出
        CountDownLatch countDownLatch = new CountDownLatch(1);
        countDownLatch.await();
    }
}

相关文章

网友评论

    本文标题:BIO NIO AIO

    本文链接:https://www.haomeiwen.com/subject/mxkatktx.html