美文网首页
nio核心三组件

nio核心三组件

作者: 以梦为马驾驾驾 | 来源:发表于2021-11-07 02:06 被阅读0次

    nio核心三组件

    Buffer

    Buffer 是一个特定原始类型的容器。Buffer 是一个原始类型的线性的、有限序列,除了 Buffer 存储的内容外,关键属性还包括:capacity, limitposition

    capacity:Buffer 包含的元素的数量,capacity 永远不会为负,也不会改变。
    limit:Buffer 中第一个不能读取或写入的元素索引。limit 永远不会为负,且永远小于等于 capacity。
    position:下一个待读取、写入的元素索引。position 永远不会为负,且永远小于等于 limit。
    每个基本类型(布尔类型除外),都有一个 Buffer 的子类。Java NIO 中 Buffer 的一些实现,其中最重要的是 ByteBuffer

    创建

    两种方式:

    1. 堆内存,读写效率相对低下,收到GC的影响
    2. 直接内存:读写相对快(少一次拷贝),不受GC影响,但是分配空间效率低
        public static void main(String[] args) {
            System.out.println(ByteBuffer.allocate(16).getClass());
            System.out.println(ByteBuffer.allocateDirect(16).getClass());
        }
    
    class java.nio.HeapByteBuffer
    class java.nio.DirectByteBuffer
    

    写入buffer

    1. 使用channel
    2. 直接put

    每写入一个元素,position += 1

    int readByted = channel.read(buf);
    
    buf.put((byte)126);
    buf.put(index, byte b);
    buf.put(ByteBuffer)
    
    

    读取buffer

    读取也是用的positon,而position在写入后对应的是下一个写入的位置的索引,
    需要调用flip方法来切换模式, 在读取的时候,读取当前的position上的值。
    可以通过channel写入buffer,也可以通过channel读取buffer。

    Buffer flip() {
        limit = position;
        position = 0;
        mark = -1;
        return this;
    }
    
    对于
    
    get() // position += 1,
    get(Int index) // position = position
    
    
    // channel读取buffer
    int writed = channel.write(buf)
    

    变量mark, 用于暂时保存position的值, 以便待会能够回来。

    Buffer mark() {
        mark = position;
        return this;
    }
    Buffer reset() {
        int m = mark;
        if( m < 0 ) {
            throw InvalidMarkException(); // 说明没有调用 mark() 方法
        }
        position = m;
        return this;
    }
    

    其他的一些方法

    rewind(), clear(), compact()

    在读模式下,调用rewind会重置position和mark到初始0和-1. 即重头开始读吧。
    而clear,会重置position=0, limit = capacity, mark = -1。这是清空了。而compact会将上一次没读完的移动到前面来

    string <=> byte数组

    1. "string".getBytes();
    2. StandardCharsets.UTF_8.encode("string");
    3. 读模式下: StandardCharsets.UTF_8.decode(buf).toString();

    分散读取和集中写入

    有点是减少数据在buffer中的读写次数

    分散读取

    将数据从channel读出来,写入到buffer里,需要buffer数组,按照顺序填满buffer

    // 分散读取
    try(FileChannel channel = new RandomAccessFile("D:\\develop\\idea_projects\\nio_stuff\\nio_selector\\src\\main\\java\\bytebuff_stuff\\TestByteBufferAllocate.java", "r").getChannel()) {
        ByteBuffer b1 = ByteBuffer.allocate(16000);
        ByteBuffer b2 = ByteBuffer.allocate(16000);
        ByteBuffer b3 = ByteBuffer.allocate(16000);
        channel.read(new ByteBuffer[]{b1,b2,b3});
        b1.flip(); // 必须切换到读模式
        b2.flip();
        b3.flip();
        System.out.println(StandardCharsets.UTF_8.decode(b1));
        System.out.println(StandardCharsets.UTF_8.decode(b2));
        System.out.println(StandardCharsets.UTF_8.decode(b3));
    } catch (FileNotFoundException e) {
        e.printStackTrace();
    } catch (IOException e) {
        e.printStackTrace();
    }
    

    集中写入

     // 集中写入
            ByteBuffer bx = StandardCharsets.UTF_8.encode("enjoy");
            ByteBuffer by = StandardCharsets.UTF_8.encode("happy");
            ByteBuffer bz = StandardCharsets.UTF_8.encode("幸福");
            try(FileChannel channel = new RandomAccessFile("./tmp.txt", "rw").getChannel()) {
                channel.write(new ByteBuffer[]{bx, by, bz});
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
    
    

    粘包与半包的解决

    粘包:合并多条消息,然后依次发送

    半包: 因为接受的缓冲区大小有限,一条数据可能缓冲区存不下

    两个需要解决的问题:1. 接收端拆粘包 2. 接收端合并半包

    public static void main(String[] args) {
            ByteBuffer bf = ByteBuffer.allocate(32);
            bf.put("hello world\ni am xxx\n ho".getBytes(StandardCharsets.UTF_8));
    
            LinkedList<ByteBuffer> ts = new LinkedList<>(split(bf));
            Consumer<ByteBuffer> c = byteBuffer -> System.out.println(StandardCharsets.UTF_8.decode(byteBuffer));
            bf.put("w are you?\n".getBytes(StandardCharsets.UTF_8));
            ts.addAll(split(bf));
            ts.forEach(c);
        }
    
        private static LinkedList<ByteBuffer> split(ByteBuffer bf) {
            // 首先切换成读模式,如果还是写模式的话
            bf.flip();
    
            LinkedList<ByteBuffer> ts = new LinkedList<>();
            for(int i = 0; i < bf.limit(); ++ i) {
                if(bf.get(i) == '\n') {
                    int length = i - bf.position();
                    ByteBuffer t = ByteBuffer.allocate(length);
                    for(int j=0; j < length; j ++) {
                        t.put(bf.get());
                    }
                    t.flip();
                    ts.add(t);
                }
            }
            bf.compact(); // 丢弃已读
            return ts;
        }
    

    Channel

    是对设备,文件,网络,进程间的连接的一种抽象,channel 有两种模式: 阻塞和非阻塞

    • FileChannel
    • DatagramChannel : udp链接
    • SocketChannel: Tcp链接通道, tcp客户端
    • ServerSocketChannel: Tcp对应的服务端, 用来监听端口进来的请求

    FileChannel只能在阻塞模式下工作, 网络channel既可以阻塞也可以非阻塞模式。

    channel开启用完后必须关闭, 任何新的调用都会抛异常,用isOpen来检测。

    FileChannel

    channel.position(idx)如果设置成末尾,那么再读的时候就是-1,如果超过了文件末尾, 再写入的话,中间会出现空洞0000.

    使用transfer的优势是零拷贝,效率高,缺点是最大2g。但是可以通过不断判断transferTo返回的大小,和文件剩余大小,循环transferTo,因为是追加动作。

           try(FileChannel from = new FileInputStream("tmp.txt").getChannel();
               FileChannel to = new FileOutputStream("out.txt").getChannel();
           ) {
               //
               from.transferTo(1, from.size(), to);
           } catch (FileNotFoundException e) {
               e.printStackTrace();
           } catch (IOException e) {
               e.printStackTrace();
           }
    

    path和files

    Path和工具类Paths以及Files来表示文件路径,和获取Path实例,这样可以解决文件路径名的繁琐,通过相对路径,绝对路径,目录操作,linux和windows的路径操作,支持.和../ ,甚至通过Files工具类拷贝文件(效率也高,调用的os的接口),walkFileTree,原子move,删除等等。

    网络channel

            // server
            ByteBuffer bf = ByteBuffer.allocate(16);
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.bind(new InetSocketAddress(8080));
            List<SocketChannel> channels = new LinkedList<>();
            while(true) {
                SocketChannel sc = ssc.accept();
                channels.add(sc);
                for(SocketChannel c: channels) {
                    c.read(bf);
                    bf.flip();
    
                    bf.clear();
                }
            }
            // client
            SocketChannel sc = SocketChannel.open();
            sc.connect(new InetSocketAddress("localhost", 8080));
    

    阻塞方法

    ServerSocketChannel.accept 和 SocketChannel.read是阻塞的

    非阻塞方法

    ServerSocketChannel打开后, 只需要在
    ssc.configureBlocking(false); 非阻塞,线程不被被accept方法阻塞,如果没有客户端连接进来,那么accept()方法直接返回null.

    SocketChannel同样

    sc.configureBlocking(false); read方法也不会阻塞线程了,如果没有数据,read返回0;

    非阻塞模式虽然解决了阻塞的问题,但是没有办法解决空忙的问题,所以我们用Selector。

    Selector

    Selector管理多个channel, 需要运行在非阻塞模式下(FileChannel不行)。

        public static void main(String[] args) throws IOException {
            Selector selector = Selector.open();
            ByteBuffer bf = ByteBuffer.allocate(16);
    
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false); // 必须的
            // 建立selector和channel的关系
            // 将这个channel上的实践注册到selector上,让他来关注和管理
            SelectionKey sscKey = ssc.register(selector, 0, null);
    
            // 设置需要关注的事件
            sscKey.interestOps(SelectionKey.OP_ACCEPT);
    
            ssc.bind(new InetSocketAddress(8080));
            while (true) {
                selector.select(); // 阻塞的,只有有时间发生,才会被唤醒
                // 唤醒后,keys里面包含这次发生的所有事件(因为只关注了OP_ACCEPT,所以其实只有客户端连接进来)
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> selectionKeysIter = selectionKeys.iterator();
                while(selectionKeysIter.hasNext()) {
                    SelectionKey key = selectionKeysIter.next();
                   // selectionKeysIter.remove();
                    System.out.println(key);
                    ServerSocketChannel sccKey = (ServerSocketChannel) key.channel();
                    SocketChannel sc = sccKey.accept();
                    System.out.println("new channel");
                    System.out.println(sc);
                    // 取消
                    // key.cancel(); 
                    //还可以继续把 sc 注册到Selector上,关注它的READAble
                    
                }
            }
        }
    

    四种事件:

    static final int OP_READ = 1 << 0; //可读
    static final int OP_WRITE = 1 << 2; // 可写
    static final int OP_CONNECT = 1 << 3; // 客户端连接
    static final int OP_ACCEPT = 1 << 4; // 客户端请求连接
    

    在事件发生后,要么处理,要么取消,如果两者都不做的话,就会留在Selector中,使得下次Selector立刻返回上次未处理的事件。尤其要警惕,客户端断开连接(用channel.read的返回值判断是异常断开还是主动断开-1),对的情况,因为没法处理,会抛弃异常,需要finally取消它。

    处理粘包与半包

    无论采用固定大小,还是变长(判断分割符),或者申明数据长度的方式,都需要每个channel独享各自的ByteBuffer,通过附件的方法,独享的也要和消息大小适配:1. 大小可变 2. 多个buffer组成的数组(分散读,集中写)

    SelectionKey scKey = sc.register(selector, 0, ByteBuffer.allocate(16));
    
    ByteBuffer bf = (ByteBuffer)key.attachment();
    

    selector的方法调用的时候是阻塞的,只有在这些时候不阻塞:

    • 发生关注的事件的时候
      • 客户端的连接请求,accept
      • 客户端的发数据,客户端的断开,都会触发读事件,且如果发送的数据大于buffer缓冲区,会多次触发读
      • channel可以写,触发write事件
    • selector.wakeup()方法
    • 调用selector.close()
    • selector所在的线程被interrupt

    相关文章

      网友评论

          本文标题:nio核心三组件

          本文链接:https://www.haomeiwen.com/subject/tibszltx.html