NIO解析

作者: GIT提交不上 | 来源:发表于2019-11-18 23:22 被阅读0次

一、BIO概述

  BIO 就是传统的java.io包,它是基于流模型实现的,交互的方式是同步、阻塞方式,也就是说在读入输入流或者输出流时,在读写动作完成之前,线程会一直阻塞在那里,它们之间的调用是可靠的线性顺序。(面向字节流/字符流

图1-1 IO流

  同步与异步的差别在于是否需要等待被依赖项目完成,才开始执行任务
  阻塞与非阻塞针对CPU的消耗来说的,阻塞就是 CPU 停下来等待一个慢的操作完成 CPU 才接着完成其它的事。非阻塞就是在这个慢的操作在执行时 CPU 去干其它别的事,等这个慢的操作完成时,CPU 再接着完成后续的操作。

参考链接:Java核心(五)深入理解BIO、NIO、AIO

二、NIO概述

  NIO 是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer 等新的抽象,可以构建多路复用的、同步非阻塞 IO程序,同时提供了更接近操作系统底层高性能的数据操作方式。

三、NIO核心组件

  NIO的核心组件:1)Channel(负责连接) 2)Buffer(负责存储) 3)Selector。通过Channel,可以从Channel把数据写到Buffer中,也可以把数据从Buffer写入到Channel中。

图3-1 Channel & Buffer关系图

  选择器允许单线程操作多个通道。使用Selector时,需要将Channel注册到Selector上。

图3-2 选择器

3.1 Channel通道

  1. 通道是双向的,可读也可写,流一般是单向的。
  2. 通道可读也可写。
  3. 通道基于Buffer读取。

3.1.1 Channel实现

  FileChannel:文件读写
  DatagramChannel:UDP数据读写
  SocketChannel:TCP数据读写
  ServerSocketChannel:监听TCP链接请求,每个请求会创建会一个SocketChannel

3.2 Buffer缓冲区

  Buffer是一块内存区,用于读写数据。分为非直接缓冲区(JVM)和直接缓存区(物理内存)。使用该内存区包含以下步骤:
  1. 把数据写入Buffer(Channel->Buffer)
  2. 调用flip方法(写模式->读模式,调用flip方法会把position归零,并设置limit为之前的position的值。 也就是说,现在position代表的是读取位置,limit标示的是已写入的数据位置)
  3. 从Buffer中读取数据,get方法
  4. 清空Buffer,调用clear(清空整个Buffer)或compact(清空已读取数据)方法
  Buffer包含3个属性,capacity(容量,以字节为单位)、position(位置)、limit(限制)。读和写模式下position和limit含义如图3-1所示:

图3-1 不同模式下position和limit含义

  Buffer类型包括:1)ByteBuffer 2)MappedByteBuffer 3)CharBuffer 4)DoubleBuffer 5)FloatBuffer 6)IntBuffer 7)LongBuffer 8)ShortBuffer。

  示例代码如下所示:

package com.example.niotest;
//import省略
/**
 * @Author luffy
 */
public class NioTest {

    public static void main(String[] args) {
        try {
            RandomAccessFile file = new RandomAccessFile("nio.txt", "rw");
            FileChannel channel = file.getChannel();
            //allocate()/allocateDirect()
            ByteBuffer buffer = ByteBuffer.allocate(48);
            //将channel读取到buffer
            int flag = channel.read(buffer);
            while (flag != -1) {
                //切换为读模式
                buffer.flip();
                while (buffer.hasRemaining()) {
                    System.out.println((char) buffer.get());
                }
                //清空buffer
                buffer.clear();
                flag = channel.read(buffer);
            }
            file.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

3.3 分散读与聚合写

  Scattering Reads:把数据从单个Channel写入到多个Buffer,必须写满一个buffer后才会向后移动到下一个buffer,不适合消息大小会动态改变的情形。(依次填满)示意图如下所示:

图3-2 分散读
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);

ByteBuffer[] bufferArray = { header, body };

channel.read(bufferArray);

  Gathering Writes:把多个Buffer的数据写入到同一个Channel中,按顺序将数组内的内容写进channel,只写入Buffer中position到limit之间的数据,适用于可变大小的消息的情形。示意图如下所示:

图3-3 聚合写
ByteBuffer header = ByteBuffer.allocate(128);
ByteBuffer body   = ByteBuffer.allocate(1024);

ByteBuffer[] bufferArray = { header, body };

channel.write(bufferArray);

3.4 通道传输接口

  FileChannel.transferFrom方法把数据从通道源传输到FileChannel,transferTo方法把FileChannel数据传输到另一个Channel,具体查看API文档。(直接缓冲区

3.5 Selector选择器

  Selector用于检查一个或多个NIO Channel的状态是否处于可读、可写。如此可以实现单线程管理多个Channels,也就是可以管理多个网络链接
  1. Selector创建

 Selector selector = Selector.open();

  2. 向Selector注册通道(FileChannel不适用)

ServerSocketChannel channel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8080);
channel.bind(address);
//非阻塞
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
/**
SelectionKey.OP_CONNECT  连接就绪
SelectionKey.OP_ACCEPT  接收就绪
SelectionKey.OP_READ  读就绪
SelectionKey.OP_WRITE  写就绪
*/

  3. 选择器选择通道

//select()方法会返回读事件已经就绪的那些通道
int select():阻塞到至少有一个通道在你注册的事件上就绪了
int select(long timeout):同select()方法,除了最长会阻塞timeout毫秒
int selectNow():不会阻塞,不管什么通道就绪都立刻返回
Set<SelectionKey> selectedKeys():一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,
然后可以通过调用selector的selectedKeys()方法,访问已选择键集中的就绪通道

  SelectionKey.channel方法返回的channel实例需要强转为我们实际使用的具体的channel类型。
  由于调用select而被阻塞的线程,可以通过调用Selector.wakeup()来唤醒即便此时已然没有channel处于就绪状态。具体操作是,在另外一个线程调用wakeup(),被阻塞的select方法的线程就会立刻返回
  当操作Selector完毕后,需要调用close方法。close的调用会关闭Selector并使相关的SelectionKey都无效,channel本身不会被关闭。

3.6 FileChannel

  FileChannel总是运行在阻塞状态下。无法直接打开一个FileChannel,需要通过使用一个InputStream、OutputStream或RandomAccessFile来获取一个FileChannel实例,关闭FileChannel使用close方法。

RandomAccessFile file = new RandomAccessFile("nio.txt", "rw");
FileChannel channel = file.getChannel();
//关闭
channel.close();

  从FileChannel中读取数据,read()方法返回的int值表示了有多少字节被读到了Buffer中。如果返回-1,表示到了文件末尾。

ByteBuffer buf = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buf);

  向FileChannel写数据,FileChannel.write()是在while循环中调用的。因为无法保证write()方法一次能向FileChannel写入多少字节,因此需要重复调用write()方法,直到Buffer中已经没有尚未写入通道的字节

String newData = "test data";
ByteBuffer buf = ByteBuffer.allocate(48);
buf.clear();
buf.put(newData.getBytes());
buf.flip();
while(buf.hasRemaining()) {
    channel.write(buf);
}

  可能需要在FileChannel的某个特定位置进行数据的读/写操作。可以通过调用position()方法获取FileChannel的当前位置。也可以通过调用position(long pos)方法设置FileChannel的当前位置。(如果将位置设置在文件结束符之后,注意此时的读写情况)

long pos = channel.position();
channel.position(pos +123);

  FileChannel的size方法:返回该实例所关联文件的大小;FileChannel的truncate方法:截取一个文件,截取文件时,文件将中指定长度后面的部分将被删除;FileChannel的force方法:将通道里尚未写入磁盘的数据强制写到磁盘上,该方法有一个boolean类型的参数,指明是否同时将文件元数据(权限信息等)写到磁盘上。

long fileSize = channel.size();
channel.truncate(1024);
channel.force(true);

3.7 SocketChannel套接字通道

  创建一个SocketChannel有两种方式:
  1. 开一个SocketChannel并连接网络上的一台服务器。
  2. 当ServerSocketChannel接收到一个连接请求时,会创建一个SocketChannel。
  建立一个SocketChannel连接,如下所示:

SocketChannel channel = SocketChannel.open();
InetSocketAddress address = new InetSocketAddress("127.0.0.1",8080);
channel.connect(address);
//关闭链接
channel.close();

  向SocketChannel读写数据同FileChannel,将SocketChannel设置成非阻塞模式,此时connect()、read()、write()方法都是异步的。

channel.configureBlocking(false);
while (!channel.finishConnect()){

}

3.8 ServerSocketChannel服务端套接字通道

  打开ServerSocketChannel,如下所示:

ServerSocketChannel channel=ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8080);
channel.socket().bind(address);
//非阻塞
channel.configureBlocking(false);
//关闭
channel.close();

  监听链接,调用ServerSocketChannel的accept()方法,返回一个SocketChannel连接实例, accept()方法会一直阻塞到有新连接到达。

while (true) {
    SocketChannel socketChannel = channel.accept();
    if (socketChannel != null){
                    
    }
}

3.9 DatagramChannel通道

  DatagramChannel是一个可以发送、接收UDP(面向无连接的网络协议)数据包的通道,无法直接读写数据。

DatagramChannel datagramChannel = DatagramChannel.open();
InetSocketAddress address = new InetSocketAddress("127.0.0.1",8080);
datagramChannel.socket().bind(address);
//关闭
datagramChannel.close();

  接收数据与发送数据,如下所示:

ByteBuffer buffer = ByteBuffer.allocate(48);
buffer.clear();
datagramChannel.receive(buffer);

datagramChannel.send(buffer,new InetSocketAddress("127.0.0.1",8080));

  连接到特定的地址,连接到特定地址并不会像TCP通道那样创建一个真正的连接。而是锁住DatagramChannel ,让其只能从特定地址收发数据。

datagramChannel.socket().connect(new InetSocketAddress("127.0.0.1",8080));
datagramChannel.read(buffer);
datagramChannel.write(buffer);

3.10 Pipe管道

  Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取

图3-4 Pipe管道

  创建管道,进行读写如下所示:

Pipe pipe = Pipe.open();
Pipe.SinkChannel sinkChannel =  pipe.sink();
String content = "hello world";
ByteBuffer buffer = ByteBuffer.allocate(48);
buffer.clear();

buffer.put(content.getBytes());
buffer.flip();
while (buffer.hasRemaining()){
    sinkChannel.write(buffer);
}

Pipe.SourceChannel sourceChannel = pipe.source();
ByteBuffer newBuffer = ByteBuffer.allocate(48);
sourceChannel.read(newBuffer);

3.11 Path

  创建Path实例,创建绝对/相对路径等如下所示:

Path path = Paths.get("c:\\data\\myfile.txt");
Path newPath = Paths.get("c:\\data","myfile.txt");

Path dir = Paths.get(".");
dir.toAbsolutePath();
// . 和 .. 的使用
//格式化
path.normalize();

  Path搭配Files使用,具体参考API文档。

3.12 其他

  异步文件通道和非阻塞式服务器见参考链接。

3.13 NIO Socket通信Demo

  Demo包括服务端和客户端两部分,Github完整代码地址如下:

Github-NIOSocketDemo

  服务端代码如下所示:

//import 省略

/**
 * @author luffy
 * @date 2019-11-28
 * @version 1.0
 */
public class ServerTest {
    private static final String CHARSET = "utf-8";
    private static final String END_MSG = "bye";
    private Logger logger = Logger.getLogger(getClass().getName());
    private Selector selector;

    /**
     * 连接准备
     */
    private void connectReady() {
        try {
            selector = Selector.open();
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            InetSocketAddress address = new InetSocketAddress(9999);
            serverSocketChannel.socket().bind(address);
            //非阻塞
            serverSocketChannel.configureBlocking(false);
            //将ServerSocketChannel注册到Selector,将状态置为接收就绪
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 接收就绪
     * @param key
     */
    private void acceptReady(SelectionKey key) {
        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
        SocketChannel tmpChannel = null;
        try {
            //获取客户端连接
            tmpChannel = channel.accept();
        } catch (IOException e) {
            e.printStackTrace();
        }
        logger.info("客户端IP:" + tmpChannel.socket().getRemoteSocketAddress() + "连接进来了!");
        try {
            //非阻塞
            tmpChannel.configureBlocking(false);
            //将channel注册到selector,切换为读就绪状态
            tmpChannel.register(selector, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 读取客户端消息,切换为写就绪状态
     * @param key
     */
    private void readMsg(SelectionKey key) {
        SocketChannel tmpSocketChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int sum = 0;
        try {
            sum = tmpSocketChannel.read(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (sum != -1) {
            //切换到读模式
            buffer.flip();
            String msg = null;
            try {
                msg = new String(buffer.array(), buffer.position(), buffer.limit(), CHARSET);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            //清空buffer
            buffer.clear();

            logger.info("接收来自客户端的数据:" + new String(buffer.array()));
            //切换为写就绪态
            key.interestOps(SelectionKey.OP_WRITE);
            //如果接受到"bye",客户端断开连接
            if (END_MSG.equals(msg)) {
                logger.info(tmpSocketChannel.socket().getRemoteSocketAddress() + "断开连接!");
                try {
                    key.channel().close();
                    key.cancel();
                    tmpSocketChannel.close();
                    return;
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 向客户端发送数据,切换为读就绪状态
     * @param key
     */
    private void writeMsg(SelectionKey key) {
        SocketChannel tmpSocketChannel = (SocketChannel) key.channel();
        String[] strArray = new String[]{"I do", "I love", "I think"};
        Random random = new Random();
        //随机数 0-2
        int randomNum = random.nextInt(3);

        String msg = strArray[randomNum];
        byte[] resByte = msg.getBytes();
        ByteBuffer buffer = ByteBuffer.wrap(resByte);
        try {
            tmpSocketChannel.write(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
        //切换为读就绪状态
        key.interestOps(SelectionKey.OP_READ);
        logger.info("发送给客户端完毕!");
    }

    /**
     * 开启服务端
     */
    private void startServer() {
        this.connectReady();
        while (true) {
            int num = 0;
            try {
                num = selector.select();
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (num > 0) {
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isValid() && key.isAcceptable()) {
                        this.acceptReady(key);
                    } else if (key.isValid() && key.isReadable()) {
                        this.readMsg(key);
                    } else if (key.isValid() && key.isWritable()) {
                        this.writeMsg(key);
                    }
                }
            } else {
                logger.info("没有连接!");
            }
        }
    }

    public static void main(String[] args) {
        new ServerTest().startServer();
    }
}

  客户端代码如下所示:

//import省略

/**
 * @author luffy
 * @date 2019-11-28
 * @version 1.0
 */
public class ClientTest {
    private Logger logger = Logger.getLogger(this.getClass().getName());
    private Selector selector;
    private static final String IP_ADDRESS = "127.0.0.1";
    private static final int PORT = 9999;
    private static final String END_MSG = "bye";

    /**
     * 连接服务
     */
    private void connectServer() {
        try {
            SocketChannel socketChannel = SocketChannel.open();
            selector = Selector.open();
            InetSocketAddress address = new InetSocketAddress(IP_ADDRESS, PORT);
            //非阻塞
            socketChannel.configureBlocking(false);
            //指定IP和端口-连接服务器
            socketChannel.connect(address);
            //将channel注册到selector并设置为连接态
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 连接就绪状态,切换为写就绪状态
     * @param key
     */
    private void connectReady(SelectionKey key) {
        try {
            SocketChannel channel = (SocketChannel) key.channel();
            channel.finishConnect();
            key.interestOps(SelectionKey.OP_WRITE);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 读就绪状态,读取服务端发回的数据,切换为写就绪状态
     * @param key
     */
    private void readMsg(SelectionKey key) {
        ByteBuffer buffer = null;
        try {
            SocketChannel channel = (SocketChannel) key.channel();
            buffer = ByteBuffer.allocate(1024);
            //读取服务端数据
            channel.read(buffer);
            key.interestOps(SelectionKey.OP_WRITE);
        } catch (IOException e) {
            e.printStackTrace();
        }
        logger.info("收到服务端数据:" + new String(buffer.array()));
    }

    /**
     * 写就绪状态,向服务端发送数据,切换为读就绪状态
     * @param key
     */
    private void writeMsg(SelectionKey key) {
        SocketChannel channel = null;
        try {
            channel = (SocketChannel) key.channel();
        } catch (Exception e) {
            e.printStackTrace();
        }
        String[] strArray = new String[]{"hello", "world", "bye"};
        Random random = new Random();
        //随机取数0-2 向服务端发送数据
        int randomNum = random.nextInt(3);
        String msg = strArray[randomNum];
        byte[] array = msg.getBytes();
        ByteBuffer buffer = ByteBuffer.wrap(array);
        try {
            //将数据写入channel
            channel.write(buffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
        logger.info("已向服务器发送消息:" + msg);
        //切换为读就绪状态
        key.interestOps(SelectionKey.OP_READ);
        //如果发送"bye",则和服务器断开连接
        if (END_MSG.equals(msg)) {
            logger.info(channel.socket().getRemoteSocketAddress() + "已和服务器断开连接!");
            try {
                key.channel().close();
                key.cancel();
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 客户端启动
     */
    private void clientStart() {
        this.connectServer();
        while (true) {
            int num = 0;
            try {
                num = selector.select();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            if (num > 0) {
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isValid() && key.isConnectable()) {
                        this.connectReady(key);
                    } else if (key.isReadable()) {
                        this.readMsg(key);
                    } else if (key.isValid() && key.isWritable()) {
                        this.writeMsg(key);
                    }
                }
            }
        }

    }

    public static void main(String[] args) {
        new ClientTest().clientStart();
    }
}

参考链接一:Java NIO简明教程
参考链接二:Java NIO 系列教程
参考链接三:Java NIO教程
参考链接四:Java NIO Tutorial
参考链接五:Java 8 API-中文
参考链接六:Java 8 API-英文

相关文章

  • 图解Java NIO

    目录: NIO结构 NIO与传统IO异同 NIO使用步骤 NIO代码 ByteBuffer难点解析 1:NIO结构...

  • NIO 解析

    1、传统IO,他是面向流的模式、阻塞式(read() 或write()时线程被阻塞,直到读完)2、NIO 面向缓冲...

  • NIO解析

    一、BIO概述   BIO 就是传统的java.io包,它是基于流模型实现的,交互的方式是同步、阻塞方式,也就是说...

  • nio

    参考文章 Java Nio Java NIO学习笔记 - NIO客户端时序图 Java NIO学习笔记 - NIO...

  • NIO(二、Buffer)

    目录 NIO(一、概述)NIO(二、Buffer)NIO(三、Channel)NIO(四、Selector) Bu...

  • NIO(四、Selector)

    目录 NIO(一、概述)NIO(二、Buffer)NIO(三、Channel)NIO(四、Selector) Se...

  • NIO(三、Channel)

    目录 NIO(一、概述)NIO(二、Buffer)NIO(三、Channel)NIO(四、Selector) Ch...

  • 大厂面试系列(二)::NIO和Netty

    NIO和Netty面试题 NIO 阐述 NIO原理? BIO/NIO/AIO有什么区别?有那些实现? 讲讲NIO的...

  • 大厂面试系列(二)::NIO和Netty

    NIO和Netty面试题 NIO 阐述 NIO原理? BIO/NIO/AIO有什么区别?有那些实现? 讲讲NIO的...

  • NIO教程 ——检视阅读(上)

    NIO教程 ——检视阅读 参考 BIO,NIO,AIO 总结 Java NIO浅析 Java NIO 教程——极客...

网友评论

      本文标题:NIO解析

      本文链接:https://www.haomeiwen.com/subject/rdjiictx.html