美文网首页
Java网络编程:Netty框架学习(二)---Java NIO

Java网络编程:Netty框架学习(二)---Java NIO

作者: singleZhang2010 | 来源:发表于2020-12-03 17:30 被阅读0次

    概述

    上篇中已经讲到Java中的NIO类库,Java中也称New IO,类库的目标就是要让Java支持非阻塞IO,基于这个原因,更多的人喜欢称Java NIO为非阻塞IO(Non-Block IO),称“老的”阻塞式Java IO为OIO(Old IO)。
    总体上说,NIO弥补了原来面向流的OIO同步阻塞的不足,它为标准Java代码提供了高速的、面向缓冲区的IO。
    了解上一篇讲到的四种I/O模型的话,我们可以很容易看出Java NIO采用的是IO多路复用(IO Multiplexing)模型。
    NIO特征:

    1. NIO中引入了Channel(通道)和Buffer(缓冲区)的概念。读取和写入,只需要从通道中读取数据到缓冲区中,或将数据从缓冲区中写入到通道中,读取Buffer的数据可以是无序的读取。
    2. NIO使用了通道和通道的多路复用技术 来实现非阻塞的操作,当我们调用read方法时,如果此时有数据,则read读取数据并返回;如果此时没有数据,则read直接返回,而不会阻塞当前线程。
    3. NIO有选择器,NIO的选择器实现,是基于底层的选择器的系统调用,需要底层操作系统提供支持

    Java NIO 核心组件

    Java NIO由以下三个核心组件组成:

    • Channel(通道)
    • Buffer(缓冲区)
    • Selector(选择器)

    1. 通道(Channel)
    在NIO中,同一个网络连接使用一个通道表示,所有NIO的IO操作都是从通道开始的。一个通道类似于OIO中的两个流的结合体,既可以从通道读取,也可以向通道写入。

    2. 缓冲区(Buffer)
    应用程序与通道(Channel)主要的交互操作,就是进行数据的read读取和write写入,这里就需要依赖NIO Buffer(NIO缓冲区),它是数据的载体。
    通道的读取,就是将数据从通道读取到缓冲区中;通道的写入,就是将数据从缓冲区中写入到通道中
    3. 选择器(Selector)
    在上一篇中提到过文件句柄数,这里的文件句柄其实就是文件描述符,它标识的就是一个网络连接。
    一个进程/线程可以同时监视多个文件描述符。在NIO中通过选择器(Selector)对这些文件描述符进行监视,监视哪些文件描述符是可读或者可写的。

    selector
    IO多路复用,从具体的开发层面来说,首先把通道注册到选择器中,然后通过选择器内部的机制,可以查询(select)这些注册的通道是否有已经就绪的IO事件(例如可读、可写、网络连接完成等)。
    一般来说,一个单线程处理一个选择器,一个选择器可以监控很多通道。通过选择器,一个单线程可以处理数百、数千、数万、甚至更多的通道。在极端情况下(数万个连接),只用一个线程就可以处理所有的通道,这样会大量地减少线程之间上下文切换的开销。
    由于Java NIO的Selector组件和操作系统底层的IO多路复用的支持,我们可以很简单地使用一个线程,通过选择器去管理多个通道。

    NIO Buffer(缓冲区)

    在NIO中有8种缓冲区类,分别如下:

    • ByteBuffer
    • CharBuffer
    • DoubleBuffer
    • FloatBuffer
    • IntBuffer
    • LongBuffer
    • ShortBuffer
    • MappedByteBuffer
      ※MappedByteBuffer是专门用于内存映射的一种ByteBuffer类型

    这些Buffer类在其内部,有一个byte[]数组内存块,作为内存缓冲区。
    查看其中源码,如下

        //ByteBuffer类 代码片段
    
        //
        final byte[] hb;                  // Non-null only for heap buffers
    

    Buffer类的重要成员属性:capacity(容量)、position(读写位置)、limit(读写的限制)、mark(标记)
    ※说明:capacity容量不是指内存块byte[]数组的字节的数量。capacity容量指的是写入的数据对象的数量。

    通过简单地使用Buffer示例加深对这四个属性的印象,创建BufferTest.java

    package com.zhxin.nettylab.nio.chapter1;
    
    import java.nio.CharBuffer;
    
    /**
     * @ClassName BufferTest
     * @Description //BufferTest
     * @Author singleZhang
     * @Email 405780096@qq.com
     * @Date 2020/12/3 0003 下午 4:03
     **/
    public class BufferTest {
    
        public static void main(String[] args){
    
            //创建Buffer,capacity为10
            CharBuffer cbf = CharBuffer.allocate(10);
    
            System.out.println(cbf.capacity()); //容量:10
            System.out.println(cbf.limit());    //读写限制:10
            System.out.println(cbf.position()); //读写位置:0 起始值
    
            cbf.put("a");
            cbf.put("b");
            cbf.put("c");
            System.out.println(cbf.position()); //输出3
    
            cbf.flip(); //buffer从写入转换成读取,把limit设置为position,把position还原成0
            System.out.println(cbf.position());
            System.out.println(cbf.limit());
    
            //取值
            System.out.println(cbf.get()); //取第一个元素 a
            System.out.println(cbf.position()); //读写位置变为1
    
            cbf.clear(); //clear方法将limit设置成capacity,position设置成0
            System.out.println(cbf.limit());
            System.out.println(cbf.position());
            System.out.println(cbf.get(2)); //读取第三个元素c
    
            System.out.println(cbf.position());//读写位置不变,get方法加了索引值,根据索引来取值不影响position
    
            System.out.println(cbf.get());
            System.out.println(cbf.get());
            cbf.mark(); //标记
            System.out.println(cbf.position());  //标记后的位置为2
            System.out.println(cbf.get());
            System.out.println(cbf.position());
            cbf.reset();//返回标记
            System.out.println(cbf.position()); //返回标记的位置2
    
            cbf.clear();//读取完成后,调用Buffer.clear() 或Buffer.compact()方法,将缓冲区转换为写入模式
            System.out.println(cbf.limit());
            System.out.println(cbf.position());
            System.out.println(cbf.capacity());
        }
    }
    

    除了前面的3个属性,第4个属性mark(标记)比较简单。就是相当一个暂存属性,暂时保存position的值,方便后面的重复使用position值


    Buffer四个重要成员属性

    NIO Channel(通道)

    NIO中一个连接就是用一个Channel(通道)来表示。更广泛的层面来说,一个通道可以表示一个底层的文件描述符。
    JavaNIO的通道还可以更加细化。例如,对应不同的网络传输协议类型,在Java中都有不同的NIO Channel(通道)实现。
    四种重要的Channel类型,分别如下:

    • FileChannel
      文件通道,用于文件的数据读写
    • SocketChannel
      套接字通道,用于Socket套接字TCP连接的数据读写
    • ServerSocketChannel
      服务器嵌套字通道(或服务器监听通道),允许我们监听TCP连接请求,为每个监听到的请求,创建一个SocketChannel套接字通道
    • DatagramChannel
      数据报通道,用于UDP协议的数据读写
      通过简单的FileChannel示例加深印象,其他的Channel可以自行举一反三,创建ChannelTest.java
    package com.zhxin.nettylab.nio.chapter2;
    
    import java.io.*;
    import java.nio.CharBuffer;
    import java.nio.MappedByteBuffer;
    import java.nio.channels.FileChannel;
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;
    
    /**
     * @ClassName BufferTest
     * @Description //BufferTest FileChannel文件通道 demo
     * @Author singleZhang
     * @Email 405780096@qq.com
     * @Date 2020/12/3 0003 下午 2:43
     **/
    public class ChannelTest {
    
        public static void main(String[] args){
            File bt = new File("E:/project/nettylab/src/main/resources/buffer.txt");
    
            //当try语句块运行结束时,FileInputStream 会被自动关闭
            // 这是因为FileInputStream 实现了java中的java.lang.AutoCloseable接口
            // 所有实现了这个接口的类都可以在try-with-resources结构中使用
            // 以FileInputStream、FileOutputStream 文件输入流和文件输出流来创建FileChannel
            try(FileChannel inCnl = new FileInputStream(bt).getChannel();
                FileChannel outCnl = new FileOutputStream("E:/project/nettylab/src/main/resources/buffer1.txt").getChannel()){
                MappedByteBuffer bf = inCnl.map(FileChannel.MapMode.READ_ONLY,0,bt.length()); //从Channel获取数据
                Charset crt = Charset.forName("UTF-8");
                outCnl.write(bf); //向Channel写数据
                bf.clear();
                CharsetDecoder cd = crt.newDecoder();
    
                //decode 把ByteBuffer 转 CharBuffer
                CharBuffer cb = cd.decode(bf);
                System.out.println(cb);
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    }
    

    NIO Selector(选择器)

    选择器是NIO中非常重要的角色,选择器的使命是完成IO的多路复用。一个通道代表一条连接通路,通过选择器可以同时监控多个通道的IO(输入输出)状况。选择器和通道的关系,是监控和被监控的关系。

    通道和选择器之间的关系,通过register(注册)的方式完成。
    调用通道的Channel.register(Selector sel, int ops)方法,可以将通道实例注册到一个选择器中。
    register方法有两个参数:

    • 第一个参数,指定通道注册到的选择器实例;
    • 第二个参数,指定选择器要监控的IO事件类型,它包括以下四种类型:SelectionKey.OP_READ(可读)、SelectionKey.OP_WRITE(可写)、SelectionKey.OP_CONNECT(连接)、SelectionKey.OP_ACCEPT(接收)
      查看SelectionKey类源码,如下:
    //SelectionKey.java 部分源码
    
    public static final int OP_READ = 1 << 0;
    
    public static final int OP_WRITE = 1 << 2;
    
    public static final int OP_CONNECT = 1 << 3;
    
    public static final int OP_ACCEPT = 1 << 4;
    
    

    SelectionKey选择键
    通道和选择器的监控关系注册成功后,就可以选择就绪事件。这些IO事件类型指的就是通道的某个IO操作的一种就绪状态,表示通道具备完成某个IO操作的条件。
    例如,某个SocketChannel通道,完成了和服务端的握手连接,则处于“连接就绪”(OP_CONNECT)状态;
    某个ServerSocketChannel服务器通道,监听到一个新连接的到来,则处于“接收就绪”(OP_ACCEPT)状态。
    SelectableChannel类
    ※FileChannel文件通道就不能被选择器监控或选择,判断一个通道能否被选择器监控或选择,有一个前提:判断它是否继承了抽象类SelectableChannel(可选择通道)
    Java NIO中所有网络链接Socket套接字通道,都继承了SelectableChannel类,都是可选择的。
    选择器使用流程
    使用选择器,主要有以下三步:

    1. 获取选择器实例;
    2. 将通道注册到选择器中;
    3. 轮询感兴趣的IO就绪事件(选择键集合)
      通过示例加深一下印象,创建服务端demo SelectorTest.java
    package com.zhxin.nettylab.nio.chapter3;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    /**
     * @ClassName SelectorTest
     * @Description //选择器 使用  服务器端示例
     * @Author singleZhang
     * @Email 405780096@qq.com
     * @Date 2020/12/4 0004 上午 9:17
     **/
    public class SelectorTest {
        private static Selector selector;
        public static void main(String[] args){
    
            try{
                /*
                 * 获取选择器示例
                 * Selector选择器的类方法open()的内部,是向选择器SPI(SelectorProvider)发出请求,
                 * 通过默认的SelectorProvider(选择器提供者)对象,获取一个新的选择器实例。
                 * Java中SPI全称为(Service Provider Interface,服务提供者接口),是JDK的一种可以扩展的服务提供和发现机制。
                 * Java通过SPI的方式,提供选择器的默认实现版本
                 */
                selector = Selector.open();
    
                /*
                * 将通道注册到选择器实例
                * */
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //打开ServerSocketChannel,获取通道
                serverSocketChannel.configureBlocking(false); //设为非阻塞
                serverSocketChannel.bind(new InetSocketAddress(8989)); //将该通道对于的serverSocket绑定到port端口
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//将通道注册到选择器上,监听"接收连接"事件
    
                /*
                 * 选出感兴趣的IO就绪事件(选择键集合)
                 * 通过Selector选择器的select()方法,选出已经注册的、已经就绪的IO事件,保存到SelectionKey选择键集合中
                 * 遍历这些IO事件,进行对应的处理
                 */
                while (selector.select() > 0){
                    Set<SelectionKey> selectKeys = selector.selectedKeys();
                    Iterator<SelectionKey> keyIterator = selectKeys.iterator();
    
                    while(keyIterator.hasNext()){
                        SelectionKey key = keyIterator.next();
                        if(key.isAcceptable()){
                            // ServerSocketChannel服务器监听通道有新连接
                            handleAccept(key);
                        } else if(key.isReadable()){
                            // 传输通道可读
                            handleRead(key);
                        } else if(key.isWritable()){
                            //传输通道可读
                            handleWrite(key);
                        }
    
                        //移除处理完的选择键
                        keyIterator.remove();
                    }
    
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    
        /**
         * 处理客户端新连接事件
         */
        private static void handleAccept(SelectionKey key) throws IOException {
            // 获取客户端连接通道
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            SocketChannel socketChannel = server.accept();
            socketChannel.configureBlocking(false);
    
            // 信息通过通道发送给客户端
            String msg = "Hello Client!";
            socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
    
            // 给通道设置读事件,客户端监听到读事件后,进行读取操作
            socketChannel.register(selector, SelectionKey.OP_READ);
        }
    
        /**
         * 监听到可读,处理客户端发送过来的信息
         */
        private static void handleRead(SelectionKey key) throws IOException {
            SocketChannel channel = (SocketChannel) key.channel();
    
            // 从通道读取数据到缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(128);
            channel.read(buffer);
    
            // 输出客户端发送过来的消息
            byte[] data = buffer.array();
            String msg = new String(data).trim();
            System.out.println("server received msg from client:" + msg);
        }
    
        private static void handleWrite(SelectionKey key){
        }
    }
    

    创建客户端demo ClientTest.java

    package com.zhxin.nettylab.nio.chapter3;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    /**
     * @ClassName ClientTest
     * @Description //客户端
     * @Author singleZhang
     * @Email 405780096@qq.com
     * @Date 2020/12/4 0004 上午 9:52
     **/
    public class ClientTest {
    
        private static Selector selector;
    
        public static void main(String[] args) throws IOException {
    
            // 创建通道管理器(Selector)
            selector = Selector.open();
    
            // 创建通道SocketChannel
            SocketChannel channel = SocketChannel.open();
            // 将通道设置为非阻塞
            channel.configureBlocking(false);
    
            // 客户端连接服务器,其实方法执行并没有实现连接,需要在handleConnect方法中调channel.finishConnect()才能完成连接
            channel.connect(new InetSocketAddress("localhost", 8989));
    
            /**
             * 将通道(Channel)注册到通道管理器(Selector),并为该通道注册selectionKey.OP_CONNECT
             * 注册该事件后,当事件到达的时候,selector.select()会返回,
             * 如果事件没有到达selector.select()会一直阻塞。
             */
            channel.register(selector, SelectionKey.OP_CONNECT);
    
            while (selector.select() > 0) {
    
                Set<SelectionKey> selectKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectKeys.iterator();
    
                while(keyIterator.hasNext()){
                    SelectionKey key = keyIterator.next();
                    if(key.isConnectable()){
                        // 传输通道连接成功 一般用在客户端
                        handleConnect(key);
                    } else if(key.isReadable()){
                        // 传输通道可读
                        handleRead(key);
                    } else if(key.isWritable()){
                        //传输通道可读
                        handleWrite(key);
                    }
    
                    //移除处理完的选择键
                    keyIterator.remove();
                }
    
            }
    
        }
    
        /**
         * 处理 和服务器端连接成功事件
         * */
        private static void handleConnect(SelectionKey key) throws IOException {
    
            // 获取与服务端建立连接的通道
            SocketChannel channel = (SocketChannel) key.channel();
            if (channel.isConnectionPending()) {
                // channel.finishConnect()才能完成连接
                channel.finishConnect();
            }
    
            channel.configureBlocking(false);
    
            // 数据写入通道
            String msg = "Hello Server!";
            channel.write(ByteBuffer.wrap(msg.getBytes()));
    
            // 通道注册到选择器,并且这个通道只对读事件感兴趣
            channel.register(selector, SelectionKey.OP_READ);
        }
    
        /**
         * 监听到可读,处理服务端发送过来的信息
         */
        private static void handleRead(SelectionKey key) throws IOException {
            SocketChannel channel = (SocketChannel) key.channel();
    
            // 从通道读取数据到缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(128);
            channel.read(buffer);
    
            // 输出服务端响应发送过来的消息
            byte[] data = buffer.array();
            String msg = new String(data).trim();
            System.out.println("client received msg from server:" + msg);
        }
    
        private static void handleWrite(SelectionKey key){
    
        }
    }
    

    总结

    到这里已经算是踏入了JAVA NIO的大门了,以上都是比较简单的demo实践,没有看到“粘包”和“拆包”等复杂问题,后续会接触到。
    Java NIO编程大致的特点如下:

    1. 在NIO中,服务器接收新连接的工作,是异步进行的。不像Java的OIO那样,服务器监听连接,是同步的、阻塞的。NIO可以通过选择器(也可以说成:多路复用器),后续不断地轮询选择器的选择键集合,选择新到来的连接。
    2. 在NIO中,SocketChannel传输通道的读写操作都是异步的。如果没有可读写的数据,负责IO通信的线程不会同步等待。这样,线程就可以处理其他连接的通道;不需要像OIO那样,线程一直阻塞,等待所负责的连接可用为止。
    3. 在NIO中,一个选择器线程可以同时处理成千上万个客户端连接,性能不会随着客户端的增加而线性下降。

    代码示例地址:
    https://gitee.com/kaixinshow/java-nionetty-learning

    相关文章

      网友评论

          本文标题:Java网络编程:Netty框架学习(二)---Java NIO

          本文链接:https://www.haomeiwen.com/subject/wrxmwktx.html