NIO解析

作者: GIT提交不上 | 来源:发表于2019-11-18 23:22 被阅读0次

    一、BIO概述

      BIO 就是传统的java.io包,它是基于流模型实现的,交互的方式是同步、阻塞方式,也就是说在读入输入流或者输出流时,在读写动作完成之前,线程会一直阻塞在那里,它们之间的调用是可靠的线性顺序。(面向字节流/字符流

    图1-1 IO流

      同步与异步的差别在于是否需要等待被依赖项目完成,才开始执行任务
      阻塞与非阻塞针对CPU的消耗来说的,阻塞就是 CPU 停下来等待一个慢的操作完成 CPU 才接着完成其它的事。非阻塞就是在这个慢的操作在执行时 CPU 去干其它别的事,等这个慢的操作完成时,CPU 再接着完成后续的操作。

    参考链接:Java核心(五)深入理解BIO、NIO、AIO

    二、NIO概述

      NIO 是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,可以构建多路复用的、同步非阻塞 IO程序,同时提供了更接近操作系统底层高性能的数据操作方式。

    三、NIO核心组件

      NIO的核心组件:1)Channel(负责连接) 2)Buffer(负责存储) 3)Selector。通过Channel,可以从Channel把数据写到Buffer中,也可以把数据从Buffer写入到Channel中。

    图3-1 Channel & Buffer关系图

      选择器允许单线程操作多个通道。使用Selector时,需要将Channel注册到Selector上。

    图3-2 选择器

    3.1 Channel通道

      1. 通道是双向的,可读也可写,流一般是单向的。
      2. 通道可读也可写。
      3. 通道基于Buffer读取。

    3.1.1 Channel实现

      FileChannel:文件读写
      DatagramChannel:UDP数据读写
      SocketChannel:TCP数据读写
      ServerSocketChannel:监听TCP链接请求,每个请求会创建会一个SocketChannel

    3.2 Buffer缓冲区

      Buffer是一块内存区,用于读写数据。分为非直接缓冲区(JVM)和直接缓存区(物理内存)。使用该内存区包含以下步骤:
      1. 把数据写入Buffer(Channel->Buffer)
      2. 调用flip方法(写模式->读模式,调用flip方法会把position归零,并设置limit为之前的position的值。 也就是说,现在position代表的是读取位置,limit标示的是已写入的数据位置)
      3. 从Buffer中读取数据,get方法
      4. 清空Buffer,调用clear(清空整个Buffer)或compact(清空已读取数据)方法
      Buffer包含3个属性,capacity(容量,以字节为单位)、position(位置)、limit(限制)。读和写模式下position和limit含义如图3-1所示:

    图3-1 不同模式下position和limit含义

      Buffer类型包括:1)ByteBuffer 2)MappedByteBuffer 3)CharBuffer 4)DoubleBuffer 5)FloatBuffer 6)IntBuffer 7)LongBuffer 8)ShortBuffer。

      示例代码如下所示:

    package com.example.niotest;
    //import省略
    /**
     * @Author luffy
     */
    public class NioTest {
    
        public static void main(String[] args) {
            try {
                RandomAccessFile file = new RandomAccessFile("nio.txt", "rw");
                FileChannel channel = file.getChannel();
                //allocate()/allocateDirect()
                ByteBuffer buffer = ByteBuffer.allocate(48);
                //将channel读取到buffer
                int flag = channel.read(buffer);
                while (flag != -1) {
                    //切换为读模式
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        System.out.println((char) buffer.get());
                    }
                    //清空buffer
                    buffer.clear();
                    flag = channel.read(buffer);
                }
                file.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    3.3 分散读与聚合写

      Scattering Reads:把数据从单个Channel写入到多个Buffer,必须写满一个buffer后才会向后移动到下一个buffer,不适合消息大小会动态改变的情形。(依次填满)示意图如下所示:

    图3-2 分散读
    ByteBuffer header = ByteBuffer.allocate(128);
    ByteBuffer body   = ByteBuffer.allocate(1024);
    
    ByteBuffer[] bufferArray = { header, body };
    
    channel.read(bufferArray);
    

      Gathering Writes:把多个Buffer的数据写入到同一个Channel中,按顺序将数组内的内容写进channel,只写入Buffer中position到limit之间的数据,适用于可变大小的消息的情形。示意图如下所示:

    图3-3 聚合写
    ByteBuffer header = ByteBuffer.allocate(128);
    ByteBuffer body   = ByteBuffer.allocate(1024);
    
    ByteBuffer[] bufferArray = { header, body };
    
    channel.write(bufferArray);
    

    3.4 通道传输接口

      FileChannel.transferFrom方法把数据从通道源传输到FileChannel,transferTo方法把FileChannel数据传输到另一个Channel,具体查看API文档。(直接缓冲区

    3.5 Selector选择器

      Selector用于检查一个或多个NIO Channel的状态是否处于可读、可写。如此可以实现单线程管理多个Channels,也就是可以管理多个网络链接
      1. Selector创建

     Selector selector = Selector.open();
    

      2. 向Selector注册通道(FileChannel不适用)

    ServerSocketChannel channel = ServerSocketChannel.open();
    InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8080);
    channel.bind(address);
    //非阻塞
    channel.configureBlocking(false);
    SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
    /**
    SelectionKey.OP_CONNECT  连接就绪
    SelectionKey.OP_ACCEPT  接收就绪
    SelectionKey.OP_READ  读就绪
    SelectionKey.OP_WRITE  写就绪
    */
    

      3. 选择器选择通道

    //select()方法会返回读事件已经就绪的那些通道
    int select():阻塞到至少有一个通道在你注册的事件上就绪了
    int select(long timeout):同select()方法,除了最长会阻塞timeout毫秒
    int selectNow():不会阻塞,不管什么通道就绪都立刻返回
    Set<SelectionKey> selectedKeys():一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,
    然后可以通过调用selector的selectedKeys()方法,访问已选择键集中的就绪通道
    

      SelectionKey.channel方法返回的channel实例需要强转为我们实际使用的具体的channel类型。
      由于调用select而被阻塞的线程,可以通过调用Selector.wakeup()来唤醒即便此时已然没有channel处于就绪状态。具体操作是,在另外一个线程调用wakeup(),被阻塞的select方法的线程就会立刻返回
      当操作Selector完毕后,需要调用close方法。close的调用会关闭Selector并使相关的SelectionKey都无效,channel本身不会被关闭。

    3.6 FileChannel

      FileChannel总是运行在阻塞状态下。无法直接打开一个FileChannel,需要通过使用一个InputStream、OutputStream或RandomAccessFile来获取一个FileChannel实例,关闭FileChannel使用close方法。

    RandomAccessFile file = new RandomAccessFile("nio.txt", "rw");
    FileChannel channel = file.getChannel();
    //关闭
    channel.close();
    

      从FileChannel中读取数据,read()方法返回的int值表示了有多少字节被读到了Buffer中。如果返回-1,表示到了文件末尾。

    ByteBuffer buf = ByteBuffer.allocate(48);
    int bytesRead = inChannel.read(buf);
    

      向FileChannel写数据,FileChannel.write()是在while循环中调用的。因为无法保证write()方法一次能向FileChannel写入多少字节,因此需要重复调用write()方法,直到Buffer中已经没有尚未写入通道的字节

    String newData = "test data";
    ByteBuffer buf = ByteBuffer.allocate(48);
    buf.clear();
    buf.put(newData.getBytes());
    buf.flip();
    while(buf.hasRemaining()) {
        channel.write(buf);
    }
    

      可能需要在FileChannel的某个特定位置进行数据的读/写操作。可以通过调用position()方法获取FileChannel的当前位置。也可以通过调用position(long pos)方法设置FileChannel的当前位置。(如果将位置设置在文件结束符之后,注意此时的读写情况)

    long pos = channel.position();
    channel.position(pos +123);
    

      FileChannel的size方法:返回该实例所关联文件的大小;FileChannel的truncate方法:截取一个文件,截取文件时,文件将中指定长度后面的部分将被删除;FileChannel的force方法:将通道里尚未写入磁盘的数据强制写到磁盘上,该方法有一个boolean类型的参数,指明是否同时将文件元数据(权限信息等)写到磁盘上。

    long fileSize = channel.size();
    channel.truncate(1024);
    channel.force(true);
    

    3.7 SocketChannel套接字通道

      创建一个SocketChannel有两种方式:
      1. 开一个SocketChannel并连接网络上的一台服务器。
      2. 当ServerSocketChannel接收到一个连接请求时,会创建一个SocketChannel。
      建立一个SocketChannel连接,如下所示:

    SocketChannel channel = SocketChannel.open();
    InetSocketAddress address = new InetSocketAddress("127.0.0.1",8080);
    channel.connect(address);
    //关闭链接
    channel.close();
    

      向SocketChannel读写数据同FileChannel,将SocketChannel设置成非阻塞模式,此时connect()、read()、write()方法都是异步的。

    channel.configureBlocking(false);
    while (!channel.finishConnect()){
    
    }
    

    3.8 ServerSocketChannel服务端套接字通道

      打开ServerSocketChannel,如下所示:

    ServerSocketChannel channel=ServerSocketChannel.open();
    InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8080);
    channel.socket().bind(address);
    //非阻塞
    channel.configureBlocking(false);
    //关闭
    channel.close();
    

      监听链接,调用ServerSocketChannel的accept()方法,返回一个SocketChannel连接实例, accept()方法会一直阻塞到有新连接到达。

    while (true) {
        SocketChannel socketChannel = channel.accept();
        if (socketChannel != null){
                        
        }
    }
    

    3.9 DatagramChannel通道

      DatagramChannel是一个可以发送、接收UDP(面向无连接的网络协议)数据包的通道,无法直接读写数据。

    DatagramChannel datagramChannel = DatagramChannel.open();
    InetSocketAddress address = new InetSocketAddress("127.0.0.1",8080);
    datagramChannel.socket().bind(address);
    //关闭
    datagramChannel.close();
    

      接收数据与发送数据,如下所示:

    ByteBuffer buffer = ByteBuffer.allocate(48);
    buffer.clear();
    datagramChannel.receive(buffer);
    
    datagramChannel.send(buffer,new InetSocketAddress("127.0.0.1",8080));
    

      连接到特定的地址,连接到特定地址并不会像TCP通道那样创建一个真正的连接。而是锁住DatagramChannel ,让其只能从特定地址收发数据。

    datagramChannel.socket().connect(new InetSocketAddress("127.0.0.1",8080));
    datagramChannel.read(buffer);
    datagramChannel.write(buffer);
    

    3.10 Pipe管道

      Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取

    图3-4 Pipe管道

      创建管道,进行读写如下所示:

    Pipe pipe = Pipe.open();
    Pipe.SinkChannel sinkChannel =  pipe.sink();
    String content = "hello world";
    ByteBuffer buffer = ByteBuffer.allocate(48);
    buffer.clear();
    
    buffer.put(content.getBytes());
    buffer.flip();
    while (buffer.hasRemaining()){
        sinkChannel.write(buffer);
    }
    
    Pipe.SourceChannel sourceChannel = pipe.source();
    ByteBuffer newBuffer = ByteBuffer.allocate(48);
    sourceChannel.read(newBuffer);
    

    3.11 Path

      创建Path实例,创建绝对/相对路径等如下所示:

    Path path = Paths.get("c:\\data\\myfile.txt");
    Path newPath = Paths.get("c:\\data","myfile.txt");
    
    Path dir = Paths.get(".");
    dir.toAbsolutePath();
    // . 和 .. 的使用
    //格式化
    path.normalize();
    

      Path搭配Files使用,具体参考API文档。

    3.12 其他

      异步文件通道和非阻塞式服务器见参考链接。

    3.13 NIO Socket通信Demo

      Demo包括服务端和客户端两部分,Github完整代码地址如下:

    Github-NIOSocketDemo

      服务端代码如下所示:

    //import 省略
    
    /**
     * @author luffy
     * @date 2019-11-28
     * @version 1.0
     */
    public class ServerTest {
        private static final String CHARSET = "utf-8";
        private static final String END_MSG = "bye";
        private Logger logger = Logger.getLogger(getClass().getName());
        private Selector selector;
    
        /**
         * 连接准备
         */
        private void connectReady() {
            try {
                selector = Selector.open();
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                InetSocketAddress address = new InetSocketAddress(9999);
                serverSocketChannel.socket().bind(address);
                //非阻塞
                serverSocketChannel.configureBlocking(false);
                //将ServerSocketChannel注册到Selector,将状态置为接收就绪
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 接收就绪
         * @param key
         */
        private void acceptReady(SelectionKey key) {
            ServerSocketChannel channel = (ServerSocketChannel) key.channel();
            SocketChannel tmpChannel = null;
            try {
                //获取客户端连接
                tmpChannel = channel.accept();
            } catch (IOException e) {
                e.printStackTrace();
            }
            logger.info("客户端IP:" + tmpChannel.socket().getRemoteSocketAddress() + "连接进来了!");
            try {
                //非阻塞
                tmpChannel.configureBlocking(false);
                //将channel注册到selector,切换为读就绪状态
                tmpChannel.register(selector, SelectionKey.OP_READ);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 读取客户端消息,切换为写就绪状态
         * @param key
         */
        private void readMsg(SelectionKey key) {
            SocketChannel tmpSocketChannel = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int sum = 0;
            try {
                sum = tmpSocketChannel.read(buffer);
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (sum != -1) {
                //切换到读模式
                buffer.flip();
                String msg = null;
                try {
                    msg = new String(buffer.array(), buffer.position(), buffer.limit(), CHARSET);
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
                //清空buffer
                buffer.clear();
    
                logger.info("接收来自客户端的数据:" + new String(buffer.array()));
                //切换为写就绪态
                key.interestOps(SelectionKey.OP_WRITE);
                //如果接受到"bye",客户端断开连接
                if (END_MSG.equals(msg)) {
                    logger.info(tmpSocketChannel.socket().getRemoteSocketAddress() + "断开连接!");
                    try {
                        key.channel().close();
                        key.cancel();
                        tmpSocketChannel.close();
                        return;
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 向客户端发送数据,切换为读就绪状态
         * @param key
         */
        private void writeMsg(SelectionKey key) {
            SocketChannel tmpSocketChannel = (SocketChannel) key.channel();
            String[] strArray = new String[]{"I do", "I love", "I think"};
            Random random = new Random();
            //随机数 0-2
            int randomNum = random.nextInt(3);
    
            String msg = strArray[randomNum];
            byte[] resByte = msg.getBytes();
            ByteBuffer buffer = ByteBuffer.wrap(resByte);
            try {
                tmpSocketChannel.write(buffer);
            } catch (IOException e) {
                e.printStackTrace();
            }
            //切换为读就绪状态
            key.interestOps(SelectionKey.OP_READ);
            logger.info("发送给客户端完毕!");
        }
    
        /**
         * 开启服务端
         */
        private void startServer() {
            this.connectReady();
            while (true) {
                int num = 0;
                try {
                    num = selector.select();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if (num > 0) {
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = keys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isValid() && key.isAcceptable()) {
                            this.acceptReady(key);
                        } else if (key.isValid() && key.isReadable()) {
                            this.readMsg(key);
                        } else if (key.isValid() && key.isWritable()) {
                            this.writeMsg(key);
                        }
                    }
                } else {
                    logger.info("没有连接!");
                }
            }
        }
    
        public static void main(String[] args) {
            new ServerTest().startServer();
        }
    }
    
    

      客户端代码如下所示:

    //import省略
    
    /**
     * @author luffy
     * @date 2019-11-28
     * @version 1.0
     */
    public class ClientTest {
        private Logger logger = Logger.getLogger(this.getClass().getName());
        private Selector selector;
        private static final String IP_ADDRESS = "127.0.0.1";
        private static final int PORT = 9999;
        private static final String END_MSG = "bye";
    
        /**
         * 连接服务
         */
        private void connectServer() {
            try {
                SocketChannel socketChannel = SocketChannel.open();
                selector = Selector.open();
                InetSocketAddress address = new InetSocketAddress(IP_ADDRESS, PORT);
                //非阻塞
                socketChannel.configureBlocking(false);
                //指定IP和端口-连接服务器
                socketChannel.connect(address);
                //将channel注册到selector并设置为连接态
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 连接就绪状态,切换为写就绪状态
         * @param key
         */
        private void connectReady(SelectionKey key) {
            try {
                SocketChannel channel = (SocketChannel) key.channel();
                channel.finishConnect();
                key.interestOps(SelectionKey.OP_WRITE);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 读就绪状态,读取服务端发回的数据,切换为写就绪状态
         * @param key
         */
        private void readMsg(SelectionKey key) {
            ByteBuffer buffer = null;
            try {
                SocketChannel channel = (SocketChannel) key.channel();
                buffer = ByteBuffer.allocate(1024);
                //读取服务端数据
                channel.read(buffer);
                key.interestOps(SelectionKey.OP_WRITE);
            } catch (IOException e) {
                e.printStackTrace();
            }
            logger.info("收到服务端数据:" + new String(buffer.array()));
        }
    
        /**
         * 写就绪状态,向服务端发送数据,切换为读就绪状态
         * @param key
         */
        private void writeMsg(SelectionKey key) {
            SocketChannel channel = null;
            try {
                channel = (SocketChannel) key.channel();
            } catch (Exception e) {
                e.printStackTrace();
            }
            String[] strArray = new String[]{"hello", "world", "bye"};
            Random random = new Random();
            //随机取数0-2 向服务端发送数据
            int randomNum = random.nextInt(3);
            String msg = strArray[randomNum];
            byte[] array = msg.getBytes();
            ByteBuffer buffer = ByteBuffer.wrap(array);
            try {
                //将数据写入channel
                channel.write(buffer);
            } catch (IOException e) {
                e.printStackTrace();
            }
            logger.info("已向服务器发送消息:" + msg);
            //切换为读就绪状态
            key.interestOps(SelectionKey.OP_READ);
            //如果发送"bye",则和服务器断开连接
            if (END_MSG.equals(msg)) {
                logger.info(channel.socket().getRemoteSocketAddress() + "已和服务器断开连接!");
                try {
                    key.channel().close();
                    key.cancel();
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 客户端启动
         */
        private void clientStart() {
            this.connectServer();
            while (true) {
                int num = 0;
                try {
                    num = selector.select();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
                if (num > 0) {
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if (key.isValid() && key.isConnectable()) {
                            this.connectReady(key);
                        } else if (key.isReadable()) {
                            this.readMsg(key);
                        } else if (key.isValid() && key.isWritable()) {
                            this.writeMsg(key);
                        }
                    }
                }
            }
    
        }
    
        public static void main(String[] args) {
            new ClientTest().clientStart();
        }
    }
    

    参考链接一:Java NIO简明教程
    参考链接二:Java NIO 系列教程
    参考链接三:Java NIO教程
    参考链接四:Java NIO Tutorial
    参考链接五:Java 8 API-中文
    参考链接六:Java 8 API-英文

    相关文章

      网友评论

          本文标题:NIO解析

          本文链接:https://www.haomeiwen.com/subject/rdjiictx.html