美文网首页IO
Java 中的nio

Java 中的nio

作者: 茶还是咖啡 | 来源:发表于2021-06-24 09:20 被阅读0次

    概述

    NIO主要有三大核心部分:Channel(通道)Buffer(缓冲区), Selector

    传统IO基于字节流和字符流面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区进行操作;

    NIO基于Channel和Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但同时也需要编程人员注意及时检查和处理缓冲区中的数据,否则可能导致被清除或者被覆盖的问题。

    Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道。

    IO的各种流是阻塞的。这意味着,当一个线程调用read()write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。

    NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取。而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。

    线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)。

    Channel

    首先说一下Channel,国内大多翻译成“通道”。Channel和IO中的Stream(流)是差不多一个等级的。只不过Stream是单向的,譬如:InputStream, OutputStream.而Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。

    NIO中的Channel的主要实现有:

    • FileChannel
    • DatagramChannel
    • SocketChannel
    • ServerSocketChannel

    Buffer

    NIO中的关键Buffer实现有:ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer,分别对应基本数据类型: byte, char,double, float, int, long, short。当然NIO中还有MappedByteBuffer, HeapByteBuffer, DirectByteBuffer等这里先不进行陈述。

    Selector

    Selector运行单线程处理多个Channel,如果你的应用打开了多个通道,但每个连接的流量都很低,使用Selector就会很方便。例如在一个聊天服务器中。要使用Selector, 得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新的连接进来、数据接收等。


    FileChannel

    BIO vs NIO

    bio读取文件

        private String read(File file) throws IOException {
            StringBuilder sb = new StringBuilder();
            try (BufferedReader br = new BufferedReader(new FileReader(file))) {
                String buf;
                while ((buf = br.readLine()) != null) {
                    sb.append(buf);
                }
            }
            return sb.toString();
        }
    

    nio读取文件

        private String nioRead(File file) throws IOException {
            StringBuilder sb = new StringBuilder();
            try (FileChannel channel = new FileInputStream(file).getChannel()) {
                // 分配大小为1024的缓冲区
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int readLen = 0;
                while (-1 != (readLen = channel.read(buffer))) {
                    // 将缓冲区指针重置到起点
                    buffer.flip();
                    byte[] bytes = buffer.array();
                    sb.append(new String(bytes, 0, readLen));
                    buffer.clear();
                }
            }
            return sb.toString();
        }
    

    Buffer

    buffer 可以理解为在channel中传输数据的载体,nio中所有的读写都必须使用buffer后面会专门有一期出关于buffer的内容。

    image.png

    Selector

    Selector 一般称 为选择器 ,当然你也可以翻译为 多路复用器 。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。

    image.png

    使用selector的流程:

    1. 创建一个ServerSocketChannel并配置阻塞模式为非阻塞,配置需要绑定的端口信息。
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    serverSocketChannel.bind(new InetSocketAddress(8080));
    
    1. 创建一个selector,将serverSocketChannel注册给selector
    Selector selector = Selector.open();
    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    

    SelectionKey为通过Selector监听Channel时对什么事件感兴趣,有以下4种类型

    类型 说明
    Connect 链接就绪
    Accept 接收就绪
    Read 读就绪
    Write 写就绪

    如果一个selector想监听通道的多个事件,可以使用|进行链接,如:

    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT | SelectionKey.OP_READ);
    

    一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。
    下面是select()方法:

    1. int select()
    2. int select(long timeout)
    3. int selectNow()
      select()阻塞到至少有一个通道在你注册的事件上就绪了。

    select(long timeout)select()一样,除了最长会阻塞timeout毫秒(参数)。

    selectNow()不会阻塞,不管什么通道就绪都立刻返回(译者注:此方法执行非阻塞的选择操作。如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。)。

    select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。如果调用select()方法,因为有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。

    一旦调用了select()方法,并且返回值表明有一个或更多个通道就绪了,然后可以通过调用selectorselectedKeys()方法,访问“已选择键集(selected key set)”中的就绪通道。如下所示:

    Set selectedKeys = selector.selectedKeys();
    

    注意每次迭代末尾的keyIterator.remove()调用。Selector不会自己从已选择键集中移除SelectionKey实例。必须在处理完通道时自己移除。下次该通道变成就绪时,Selector会再次将其放入已选择键集中。
    SelectionKey.channel()方法返回的通道需要转型成你要处理的类型,如ServerSocketChannel或SocketChannel等。


    SelectionKey

    这个对象包含了一些你感兴趣的属性:

    • interest集合
    • ready集合
    • Channel
    • Selector
    • 附加的对象(可选)
    1. interest集合:就像向Selector注册通道一节中所描述的,interest集合是你所选择的感兴趣的事件集合。可以通过SelectionKey读写interest集合。

    2. ready 集合是通道已经准备就绪的操作的集合。在一次选择(Selection)之后,你会首先访问这个ready set。可以这样访问ready集合:

    int readySet = selectionKey.readyOps();
    

    可以用像检测interest集合那样的方法,来检测channel中什么事件或操作已经就绪。但是,也可以使用以下四个方法,它们都会返回一个布尔类型:

    selectionKey.isAcceptable();
    selectionKey.isConnectable();
    selectionKey.isReadable();
    selectionKey.isWritable();
    
    1. 从SelectionKey获取Channel和Selector。如下:
    Channel  channel  = selectionKey.channel();
    Selector selector = selectionKey.selector();
    
    1. 可以将一个对象或者更多信息附着到SelectionKey上,这样就能方便的识别某个给定的通道。例如,可以附加 与通道一起使用的Buffer,或是包含聚集数据的某个对象。使用方法如下:
    selectionKey.attach(theObject);
    Object attachedObj = selectionKey.attachment();
    

    也可以在用register()方法向Selector注册Channel的时候附加对象。如:

    SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
    

    Talk is Cheap show me the code

    知道了上面的基础,就可以进行简单的编程了。

    一个简单的示例,客户端不断发送hello,server,服务端接收到消息后,回复hello,client

    1. 服务端
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.nio.charset.StandardCharsets;
    import java.util.Iterator;
    import java.util.Objects;
    import java.util.Set;
    
    public class NioServer {
        @SuppressWarnings("all")
        public static void main(String[] args) throws Exception {
    
            Selector serverSelector = Selector.open();
            Selector clientSelector = Selector.open();
            Thread bossThread = new Thread(() -> {
                try {
                    ServerSocketChannel listenerChannel = ServerSocketChannel.open();
                    listenerChannel.socket().bind(new InetSocketAddress(8000));
                    listenerChannel.configureBlocking(false);
                    listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
    
                    while (true) {
                        if (serverSelector.select(1) > 0) {
                            Set<SelectionKey> selectionKeys = serverSelector.selectedKeys();
                            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
    
                            while (keyIterator.hasNext()) {
                                SelectionKey key = keyIterator.next();
                                if (key.isAcceptable()) {
                                    try {
                                        SocketChannel clientChannel = ((ServerSocketChannel) (key.channel())).accept();
                                        clientChannel.configureBlocking(false);
                                        clientChannel.register(clientSelector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                                    } finally {
                                        keyIterator.remove();
                                    }
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            bossThread.start();
    
    
            Thread workerThread = new Thread(() -> {
                try {
                    while (true) {
                        if (clientSelector.select(1) > 0) {
                            Set<SelectionKey> selectionKeys = clientSelector.selectedKeys();
                            Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            while (keyIterator.hasNext()) {
                                SelectionKey selectionKey = keyIterator.next();
                                try {
                                    SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
    
                                    if (selectionKey.isReadable()) {
                                        clientChannel.read(byteBuffer);
                                        byteBuffer.flip();
                                        String msg = Charset.defaultCharset().newDecoder().decode(byteBuffer).toString();
                                        System.out.println(msg);
                                        selectionKey.interestOps(SelectionKey.OP_WRITE);
                                        selectionKey.attach("hello,client");
                                    } else if (selectionKey.isWritable()) {
                                        Object attachment = selectionKey.attachment();
                                        if (Objects.nonNull(attachment)) {
                                            byteBuffer.put(((String) attachment).getBytes(StandardCharsets.UTF_8));
                                            byteBuffer.flip();
                                            clientChannel.write(byteBuffer);
                                        }
                                        selectionKey.interestOps(SelectionKey.OP_READ);
                                    }
                                } finally {
                                    byteBuffer.clear();
                                    keyIterator.remove();
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
            workerThread.start();
        }
    }
    
    1. 客户端
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.nio.charset.StandardCharsets;
    import java.util.Iterator;
    import java.util.Objects;
    import java.util.Set;
    
    public class NioClient {
    
        @SuppressWarnings("all")
        public static void main(String[] args) throws Exception {
            SocketChannel socketChannel = SocketChannel.open();
            InetSocketAddress socketAddress = new InetSocketAddress("localhost", 8000);
            socketChannel.configureBlocking(false);
            socketChannel.connect(socketAddress);
    
            Selector selector = Selector.open();
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
    
            while (true) {
                if (selector.select() > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        try {
                            SelectionKey selectionKey = iterator.next();
                            process(selector, selectionKey);
                        } finally {
                            iterator.remove();
                        }
                    }
                }
            }
        }
    
        private static void process(Selector selector, SelectionKey selectionKey) throws IOException {
            ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
            if (selectionKey.isConnectable()) {
                while (!socketChannel.finishConnect()) {
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
                socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                selectionKey.attach("hello,server");
            } else if (selectionKey.isReadable()) {
                socketChannel.read(buffer);
                buffer.flip();
                String msg = Charset.defaultCharset().newDecoder().decode(buffer).toString();
                buffer.clear();
                System.out.println(msg);
                socketChannel.register(selector, SelectionKey.OP_WRITE);
                selectionKey.attach("hello,server");
            } else if (selectionKey.isWritable()) {
                Object attachment = selectionKey.attachment();
                if (Objects.nonNull(attachment)) {
                    buffer.put(((String) attachment).getBytes(StandardCharsets.UTF_8));
                    buffer.flip();
                    socketChannel.write(buffer);
                    buffer.clear();
                }
                socketChannel.register(selector, SelectionKey.OP_READ);
            }
        }
    }
    

    上面的代码只是一个简单的Demo,里面还有很多细节问题没有处理,比如:断开连接处理,消息的拆包封包等。而且,直观的看,nio的编程模型比bio要复杂一些,如果细节处理不到位的话,很容易导致我们的应用程序出现问题。

    相关文章

      网友评论

        本文标题:Java 中的nio

        本文链接:https://www.haomeiwen.com/subject/yklwyltx.html