浅谈 Java NIO

作者: 否君 | 来源:发表于2020-06-07 19:47 被阅读0次

    Java NIO

    Java.nio 全称 Java non-blocking IO [实际上是 new IO],是指JDK 1.4及以上版本里提供的新 API[New IO] ,为所有的原始类型(布尔 Boolean 类型除外)提供缓存支持的数据容器,使用它可以提供非阻塞式的高伸缩性网络。

    可能你会问 IO 和 NIO 的区别。 IO 是面向流的,阻塞的。例如我们刚学 Java 的时候那些属于字节流的 InputStream/OutputStream,例如字符流的 Reader/Writer。而 NIO 是面向块的、非阻塞的,它们常用类下面将会详细讲。

    可是为什么我们需要 Java NIO 呢?其实有几个主要的问题:

    1. 没有数据缓冲区,I/O 性能存在问题
    2. 没有 C 或 C++ 的 Channel 概念,只有输入/输出流
    3. 同步阻塞 I/O 通信通常导致通信线程被长时间阻塞
    4. 支持的字符集有限,硬件可移植性不好

    所以,Java NIO横空出世!其实在这篇文章 IO原理及四种 IO 模型里面有简单的介绍过 NIO I/O 多路复用技术。

    <u>I/O多路复用技术通过把多个 I/O 的阻塞复用到同一个Select的阻塞上,从而使得系统在单线程的情况下可以同时通过多个客户端请求</u>。所以,多路复用技术可以较少系统的开销,降低了系统的工作量,页节省了资源。I/O多路复用技术的主要应用场景如下:

    1. 服务器需要同时处理多个处于监听状态或多个连接状态的套接字
    2. 服务器需要同时处理多种网络协议的套接字

    目前支持 I/O 多路复用的系统调用有 select/pselect/poll/epoll。最常见的有 select 和 epoll。在 Linux 系统中,select 曾经有非常长时间的使用。但是由于自身的缺陷无法有更大的性能。所以 Linux 会在后续的内核版本寻找 epoll 作为替代方案。而 epoll 做出了几方面的改进:

    1. 支持一个进程的Socket描述符(FD)的数量不受限制。也就是单个 epoll 可监听的文件描述符号,这样子就可以提高了线程处理的任务的数量。我们可以通过命令行去设置数量,但是一般要考虑硬件的内存大小。
    2. I/O效率不会随着FD的数量增加而下降。
    3. 使用mmap加速内核与用户空间的消息传递
    4. epoll的API更加简单。简单的程度包括了创建一个 epoll 描述符,添加监听事件,阻塞等待锁监听的事件发生,关闭 epoll 描述符。

    从 BIO 到 NIO,从 select 到 epoll,我们可以看出一步一步的发展使Java得到更高的性能。所以我们可以来看看java的I/O的发展简史:

    JDK 1.0 - JDK 1.3

    Java 的I/O类库非常原始,很多 UNIX 网络编程中的概念或者接口在I/O类库中都没有体现。

    JDK 1.4

    新增内容
    异步I/O的操作的缓冲区 ByteBuffer等
    异步I/O操作的管道 Pipe
    各种I/O(异步或同步)的Channel,包括 ServerSocketChannel 和 SocketChannel
    多种字符集的编码能力和解码能力
    实现非阻塞 I/O 操作的多路复用起 selector
    基于流行的 Perl 实现的正则表达式类库
    文件通道 FileChannel
    不足之处
    没有文件属性(读例如写权限)
    API 能力比较弱,例如目录的级联创建和和递归遍历,往往需要自己实现
    底层存储系统的一些高级 API 无法使用
    所有的文件操作都是同步阻塞调用,不支持异步文件读写操作

    JDK 1.7

    1.7 比较大的更新是升级了库类 - 原来的 NIO 库升级了新版本 NIO 2.0。它主要几方面改进:

    新增内容
    提供批量获取文件属性的 API,这些 API 具有平台无关性,不与特性的文件系统相耦合,另外它还提供了标准文件系统的 SPI,供各个服务商扩展实现
    提供了AIO,支持基于文件的异步 I/O 操作和针对网络套接字的异步操作
    完成JSR51定义的通道功能,包括对配置和多播数据报的支持等

    NIO 组件

    初学者对于 NIO 的组件比较难理解的。但是如果你以计算机原理的角度去理解,那就比较容易接受了。NIO 的组件包括三种:

    1. 缓冲区 Buffer
    2. 通道 Channel
    3. 多路复用器 Selector

    Buffer 缓冲区

    为什么叫缓冲区呢?

    其实你可以理解为<u>这是一个传输的介质,或者是一个可以写入数据的内存块</u>。

    原本数据就是在内存上,何必搞多一个概念缓冲区的概念出来?其实用户线程与内核数据交互是通过内存来交互,这点本质上是没变的。但是有个问题,直接放在内存上并不好操作。而<u>缓冲区提供了对数据的机构化访问以及维护了读写的位置 (limit) 等信息</u>。在这方面 NIO 起的作用就是提供了丰富的API来可以让你轻松使用内存块。

    Buffer 有支持多种 Buffer 类型,如下:

    1. ByteBuffer 字节缓冲区
    2. CharBuffer 字符缓冲区
    3. ShortBuffer 短整型缓冲区
    4. IntBuffer 整型缓冲区
    5. LongBuffer 长整型缓冲区
    6. FloatBuffer 浮点型缓冲区
    7. DoubleBuffer 双精度浮点缓冲区

    大多数标准的I/O操作都使用 ByteBuffer 所以它除了具有一般缓冲区的操作之外还提供一些特有的操作,方便网络读写。

    Channel 通道

    还记得Java IO中的 Stream 吗?Java 将可读写文件封装到 Stream,我们会通过 read/write 的方式往 Stream 读取或写入数据。而在 NIO 中,也有类似的概念,那就是Channel。Channel 可以把文件/网络连接等封装成一个一个 Channel。

    <u>但是 Channel 在使用的过程中是需要搭配 Buffer </u>。为什么需要 Buffer ?上面说了 Buffer 是抽象了内存的块,而 Channel 是类似于流的(负责传输数据),它们负责工作不相同。当它两搭配起来就好像这样:

    当从Channel读取数据到Buffer

    |----------|         |-----------|
    | Channel  |  ---->  |  Buffer   |
    |----------|         |-----------|
    

    当讲Buffer写入Channel

    |----------|         |-----------|
    |  Buffer  |  ---->  |  Channel  |
    |----------|         |-----------|
    

    从上图我们也可以发现,<u>Channel 既可以读取也可以写入,具备双向特性,这比 Stream 方便多了(单向传输)</u>。

    同样对 Channel ,在NIO中也提供了多种实现类满足各类需求:

    1. FileChannel 负责文件
    2. DatagramChannel UDP读写网络中的数据
    3. SocketChannel TCP读写网络中的数据
    4. ServerSocketChannel 负责 TCP 服务端读写网络中的数据

    select 多路复用器

    前篇文章说的 I/O 多路复用模型的实现基础就是 select。多路复用器的主要作用是能够提供选择已经就绪的任务的能力。而基<u>本原理就是 select 通过某种方式找到已经就绪,可以读写的 Channel,然后让开发者通过 SelectionKey 来获取就绪 Channel 的集合</u>。

    <u>实现原理一般来说就是轮询或函数回调</u>。

    我们开发中最长接触的两种实现模式就是:select 和 epoll。上面我们说过了,select 是有 fd 最大句柄的限制;而 epoll 已经在这面做了极大的优化。

    NIO 原生示范

    上面基本上我们已经介绍了 Java NIO 的一些基本面情况。所以接下来我做一个简单的 demo 作为文章的收尾。

    Java NIO 的 API 提供的挺齐全的,然而用起来确实挺繁杂的。可能是为了保持足够的原生态和灵活吧。但是如果不是有足够经验的开发,一般我们并不建议你直接去使用原声的 Java NIO API 去开发程序。至于原因,有兴趣的可以去 Google 了解一下。

    在写程序之前,我先写一下 demo 中<u>服务端</u>交互的一个流程:

    在流程中有三个角色:NioServer / ReactorThread / IoHandler

    1. NioServer 打开 ServerSocketChannel
    2. NioServer 绑定监听地址 InetSocketAddress
    3. ReactorThread 创建 Select,启动线程
    4. NioServer 创建 select,将 ServerSocketChannel 注册上去
    5. ReactorThread 使用 select 轮询就绪的 key
    6. ReactorThread 使用 handlerAccept() 处理新接入的客户端
    7. IoHandler 设置新建客户端连接的 Socket 参数
    8. ReactorThread 设置新建客户端连接的 Socket 参数
    9. ReactorThread 向 select 注册监听度操作的 SelectionKey.OP_READ
    10. 使用 handlerRead() 异步读请求消息到 ByteBuffer
    11. decode 请求消息
    12. 异步写 ByteBuffer 到 SocketChannel
    sequenceDiagram
    NioServer->>ReactorThread: 1.打开 ServerSocketChannel
    ReactorThread-->>NioServer: 已收到消息
    ReactorThread-->>IoHandler: 123
    

    下面首先是启动代码

    public class TimeServer {
        public static void main(String[] args) {
            int port = 8080;
    
            MultiplexerTimeServer server = new MultiplexerTimeServer(port);
            new Thread(server, "server-01").start();
        }
    }
    

    然后是服务端的实现内部代码

    /* server */
    public class MultiplexerTimeServer implements Runnable{
    
        private Selector selector;
    
        private ServerSocketChannel socketChannel;
    
        private volatile boolean stop;
    
        public MultiplexerTimeServer(int port) {
            try {
                /* 初始化 selector socketChannel 以及各种参数 */ 
                selector = Selector.open();
                socketChannel = ServerSocketChannel.open();
                socketChannel.configureBlocking(false);
                socketChannel.socket().bind(new InetSocketAddress(port), 1024);
                socketChannel.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("server init finsih");
            } catch (Exception e) {
    
            }
        }
    
        public void stop(){
            this.stop = true;
        }
    
    
        @Override
        public void run() {
            while (!stop) {
                try {
                    selector.select(1000);
                    //获取已就绪的key
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> it = selectionKeys.iterator();
                    SelectionKey key = null;
                    //迭代
                    while (it.hasNext()) {
                        key = it.next();
                        //这里需要移除
                        it.remove();
                        try {
                            //交给ioHandler处理
                            handlerInput(key);
                        }catch (Exception e ) {
                            if (key != null) {
                                key.cancel();
                                if (key.channel() != null) {
                                    key.channel().close();
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            if (selector != null) {
                try{
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void handlerInput(SelectionKey key) throws IOException {
            //查看key是否失效了
            if (key.isValid()) {
                //如果是负责监听的key
                if (key.isAcceptable()) {
                    //获取serverSocketChannle
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    //获取进来的请求
                    SocketChannel sc = ssc.accept();
                    //设置参数
                    sc.configureBlocking(false);
                    sc.register(selector, SelectionKey.OP_READ);
                }
                //如果是可读的key
                if (key.isReadable()) {
                    //获取对应的可读的channel
                    SocketChannel sc = (SocketChannel) key.channel();
                    //获取一个Buffer,用于数据装载
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int readBytes = sc.read(readBuffer);
                    //如果大于0说明有数据
                    if (readBytes > 0 ) {
                        //切换读模式
                        readBuffer.flip();
                        byte[] bytes = new byte[readBuffer.remaining()];
                        readBuffer.get(bytes);
                        String body = new String(bytes, "utf-8"); 
                        String currentTime = "QUERY CURRENT TIME"
                                .equalsIgnoreCase(body) ? new java.util.Date(
                                    System.currentTimeMillis()).toString() :
                                "BAD ORDER";
                        doWrite(sc, currentTime);
                    }else if(readBytes < 0){    //小于0说明有问题,关闭channel和channel
                        key.cancel();
                        sc.close();
                    }else {
                        ; 
                    }
                }
            }
        }
        //写入channel的操作(记得channel是双向的)
        private void doWrite(SocketChannel sc, String response) throws IOException {
            if (response != null && response.trim().length() > 0) {
                byte[] bytes = response.getBytes();
                ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
                writeBuffer.put(bytes);
                //切换写模式
                writeBuffer.flip();
                sc.write(writeBuffer);
            } 
        }
    }
    
    

    接着我们写客户端的代码

    public class TimeClient {
        public static void main(String[] args) {
            int port = 8080;
            new Thread(new MultiplexerTimeClient("127.0.0.1", port), "123").start();
        }
    }
    
    public class MultiplexerTimeClient implements Runnable {
        private String host;
        private int port;
        private Selector selector;
        private SocketChannel socketChannel;
        private volatile  boolean stop;
    
        public MultiplexerTimeClient(String host, int port) {
            this.host = host;
            this.port = port;
    
            try {
                selector = Selector.open();
                socketChannel = SocketChannel.open();
                socketChannel.configureBlocking(false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            try {
                //尝试连接
                doConnnect();
            }catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
    
            SelectionKey key = null;
            while (!stop) {
                try {
                    //select
                    selector.select(1000);
                    //获取已就绪的key
                    Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                    Iterator<SelectionKey> it =  selectionKeySet.iterator();
                    while (it.hasNext()) {
                        key = it.next();
                        //移除
                        it.remove();
                        try {
                            //处理key
                            handlerInput(key);
                        }catch (Exception e) {
                            if (key != null) {
                                key.cancel();
                                if (key.channel() != null) {
                                    key.channel().close();
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    System.exit(1);
                }
            }
            //上面退出循环后,就停止selector
            if (selector!=null) {
                try {
                    selector.close();
                }catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void doConnnect() throws IOException {
            //如果连接成功,就将SocketChannel挂上Selector,并且写入数据;如果失败,就说注册连接成功
            if (socketChannel.connect(new InetSocketAddress(host, port))) {
                socketChannel.register(selector, SelectionKey.OP_READ);
                doWrite(socketChannel);
            } else {
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
            }
        }
    
        private void handlerInput(SelectionKey key) throws IOException {
            //判断key是否有效
            if (key.isValid()) {
                //
                SocketChannel sc = (SocketChannel) key.channel();
                //如果是连接的key,说明连接成功了
                if (key.isConnectable()) {
                    if (sc.finishConnect()) {
                        sc.register(selector, SelectionKey.OP_READ);
                        doWrite(sc);
                    }else {
                        System.exit(1);
                    }
                }
                //如果是可读key,说明是数据从服务端回来了
                if (key.isReadable()) {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int readBytes = sc.read(readBuffer);
                    if (readBytes > 0) {
                         readBuffer.flip();
                         byte[] bytes = new byte[readBuffer.remaining()];
                         readBuffer.get(bytes);
                         String body = new String(bytes, "utf-8");
                        System.out.println("do connect finish result " + body);
                        this.stop = true;
                    }else if(readBytes < 0) {
                        //对端关闭链路
                        key.cancel();
                        sc.close();
                    }else {}
                }
            }
        }
    
        private void doWrite(SocketChannel sc) throws IOException {
            byte[] req = "QUERY CURRENT TIME".getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
            writeBuffer.put(req);
            //切换写模式
            writeBuffer.flip();
            sc.write(writeBuffer);
            if (!writeBuffer.hasRemaining()) {
                System.out.println("send order 2 server successd");
            }
        }
    }
    

    本文结尾

    用 Java NIO 写客户端与服务端真的挺麻烦的。而且在上面的 demo 中还没考虑的非常齐全,例如半包读半包写

    但是虽然考虑周全后代码更多,但是不妨碍 NIO 给我们带来的便利性。例如客户端发起的连接/读写都是异步的;线程模型的优化可以让一个 Selector 线程处理成千上万的客户端连接,而且性能也不会因此而下降。所以值得我们好好理解这个 NIO。

    完!

    相关文章

      网友评论

        本文标题:浅谈 Java NIO

        本文链接:https://www.haomeiwen.com/subject/tihhtktx.html