美文网首页
nio核心三组件

nio核心三组件

作者: 以梦为马驾驾驾 | 来源:发表于2021-11-07 02:06 被阅读0次

nio核心三组件

Buffer

Buffer 是一个特定原始类型的容器。Buffer 是一个原始类型的线性的、有限序列,除了 Buffer 存储的内容外,关键属性还包括:capacity, limitposition

capacity:Buffer 包含的元素的数量,capacity 永远不会为负,也不会改变。
limit:Buffer 中第一个不能读取或写入的元素索引。limit 永远不会为负,且永远小于等于 capacity。
position:下一个待读取、写入的元素索引。position 永远不会为负,且永远小于等于 limit。
每个基本类型(布尔类型除外),都有一个 Buffer 的子类。Java NIO 中 Buffer 的一些实现,其中最重要的是 ByteBuffer

创建

两种方式:

  1. 堆内存,读写效率相对低下,收到GC的影响
  2. 直接内存:读写相对快(少一次拷贝),不受GC影响,但是分配空间效率低
    public static void main(String[] args) {
        System.out.println(ByteBuffer.allocate(16).getClass());
        System.out.println(ByteBuffer.allocateDirect(16).getClass());
    }

class java.nio.HeapByteBuffer
class java.nio.DirectByteBuffer

写入buffer

  1. 使用channel
  2. 直接put

每写入一个元素,position += 1

int readByted = channel.read(buf);

buf.put((byte)126);
buf.put(index, byte b);
buf.put(ByteBuffer)

读取buffer

读取也是用的positon,而position在写入后对应的是下一个写入的位置的索引,
需要调用flip方法来切换模式, 在读取的时候,读取当前的position上的值。
可以通过channel写入buffer,也可以通过channel读取buffer。

Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

对于

get() // position += 1,
get(Int index) // position = position


// channel读取buffer
int writed = channel.write(buf)

变量mark, 用于暂时保存position的值, 以便待会能够回来。

Buffer mark() {
    mark = position;
    return this;
}
Buffer reset() {
    int m = mark;
    if( m < 0 ) {
        throw InvalidMarkException(); // 说明没有调用 mark() 方法
    }
    position = m;
    return this;
}

其他的一些方法

rewind(), clear(), compact()

在读模式下,调用rewind会重置position和mark到初始0和-1. 即重头开始读吧。
而clear,会重置position=0, limit = capacity, mark = -1。这是清空了。而compact会将上一次没读完的移动到前面来

string <=> byte数组

  1. "string".getBytes();
  2. StandardCharsets.UTF_8.encode("string");
  3. 读模式下: StandardCharsets.UTF_8.decode(buf).toString();

分散读取和集中写入

有点是减少数据在buffer中的读写次数

分散读取

将数据从channel读出来,写入到buffer里,需要buffer数组,按照顺序填满buffer

// 分散读取
try(FileChannel channel = new RandomAccessFile("D:\\develop\\idea_projects\\nio_stuff\\nio_selector\\src\\main\\java\\bytebuff_stuff\\TestByteBufferAllocate.java", "r").getChannel()) {
    ByteBuffer b1 = ByteBuffer.allocate(16000);
    ByteBuffer b2 = ByteBuffer.allocate(16000);
    ByteBuffer b3 = ByteBuffer.allocate(16000);
    channel.read(new ByteBuffer[]{b1,b2,b3});
    b1.flip(); // 必须切换到读模式
    b2.flip();
    b3.flip();
    System.out.println(StandardCharsets.UTF_8.decode(b1));
    System.out.println(StandardCharsets.UTF_8.decode(b2));
    System.out.println(StandardCharsets.UTF_8.decode(b3));
} catch (FileNotFoundException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}

集中写入

 // 集中写入
        ByteBuffer bx = StandardCharsets.UTF_8.encode("enjoy");
        ByteBuffer by = StandardCharsets.UTF_8.encode("happy");
        ByteBuffer bz = StandardCharsets.UTF_8.encode("幸福");
        try(FileChannel channel = new RandomAccessFile("./tmp.txt", "rw").getChannel()) {
            channel.write(new ByteBuffer[]{bx, by, bz});
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

粘包与半包的解决

粘包:合并多条消息,然后依次发送

半包: 因为接受的缓冲区大小有限,一条数据可能缓冲区存不下

两个需要解决的问题:1. 接收端拆粘包 2. 接收端合并半包

public static void main(String[] args) {
        ByteBuffer bf = ByteBuffer.allocate(32);
        bf.put("hello world\ni am xxx\n ho".getBytes(StandardCharsets.UTF_8));

        LinkedList<ByteBuffer> ts = new LinkedList<>(split(bf));
        Consumer<ByteBuffer> c = byteBuffer -> System.out.println(StandardCharsets.UTF_8.decode(byteBuffer));
        bf.put("w are you?\n".getBytes(StandardCharsets.UTF_8));
        ts.addAll(split(bf));
        ts.forEach(c);
    }

    private static LinkedList<ByteBuffer> split(ByteBuffer bf) {
        // 首先切换成读模式,如果还是写模式的话
        bf.flip();

        LinkedList<ByteBuffer> ts = new LinkedList<>();
        for(int i = 0; i < bf.limit(); ++ i) {
            if(bf.get(i) == '\n') {
                int length = i - bf.position();
                ByteBuffer t = ByteBuffer.allocate(length);
                for(int j=0; j < length; j ++) {
                    t.put(bf.get());
                }
                t.flip();
                ts.add(t);
            }
        }
        bf.compact(); // 丢弃已读
        return ts;
    }

Channel

是对设备,文件,网络,进程间的连接的一种抽象,channel 有两种模式: 阻塞和非阻塞

  • FileChannel
  • DatagramChannel : udp链接
  • SocketChannel: Tcp链接通道, tcp客户端
  • ServerSocketChannel: Tcp对应的服务端, 用来监听端口进来的请求

FileChannel只能在阻塞模式下工作, 网络channel既可以阻塞也可以非阻塞模式。

channel开启用完后必须关闭, 任何新的调用都会抛异常,用isOpen来检测。

FileChannel

channel.position(idx)如果设置成末尾,那么再读的时候就是-1,如果超过了文件末尾, 再写入的话,中间会出现空洞0000.

使用transfer的优势是零拷贝,效率高,缺点是最大2g。但是可以通过不断判断transferTo返回的大小,和文件剩余大小,循环transferTo,因为是追加动作。

       try(FileChannel from = new FileInputStream("tmp.txt").getChannel();
           FileChannel to = new FileOutputStream("out.txt").getChannel();
       ) {
           //
           from.transferTo(1, from.size(), to);
       } catch (FileNotFoundException e) {
           e.printStackTrace();
       } catch (IOException e) {
           e.printStackTrace();
       }

path和files

Path和工具类Paths以及Files来表示文件路径,和获取Path实例,这样可以解决文件路径名的繁琐,通过相对路径,绝对路径,目录操作,linux和windows的路径操作,支持.和../ ,甚至通过Files工具类拷贝文件(效率也高,调用的os的接口),walkFileTree,原子move,删除等等。

网络channel

        // server
        ByteBuffer bf = ByteBuffer.allocate(16);
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.bind(new InetSocketAddress(8080));
        List<SocketChannel> channels = new LinkedList<>();
        while(true) {
            SocketChannel sc = ssc.accept();
            channels.add(sc);
            for(SocketChannel c: channels) {
                c.read(bf);
                bf.flip();

                bf.clear();
            }
        }
        // client
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8080));

阻塞方法

ServerSocketChannel.accept 和 SocketChannel.read是阻塞的

非阻塞方法

ServerSocketChannel打开后, 只需要在
ssc.configureBlocking(false); 非阻塞,线程不被被accept方法阻塞,如果没有客户端连接进来,那么accept()方法直接返回null.

SocketChannel同样

sc.configureBlocking(false); read方法也不会阻塞线程了,如果没有数据,read返回0;

非阻塞模式虽然解决了阻塞的问题,但是没有办法解决空忙的问题,所以我们用Selector。

Selector

Selector管理多个channel, 需要运行在非阻塞模式下(FileChannel不行)。

    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ByteBuffer bf = ByteBuffer.allocate(16);

        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false); // 必须的
        // 建立selector和channel的关系
        // 将这个channel上的实践注册到selector上,让他来关注和管理
        SelectionKey sscKey = ssc.register(selector, 0, null);

        // 设置需要关注的事件
        sscKey.interestOps(SelectionKey.OP_ACCEPT);

        ssc.bind(new InetSocketAddress(8080));
        while (true) {
            selector.select(); // 阻塞的,只有有时间发生,才会被唤醒
            // 唤醒后,keys里面包含这次发生的所有事件(因为只关注了OP_ACCEPT,所以其实只有客户端连接进来)
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> selectionKeysIter = selectionKeys.iterator();
            while(selectionKeysIter.hasNext()) {
                SelectionKey key = selectionKeysIter.next();
               // selectionKeysIter.remove();
                System.out.println(key);
                ServerSocketChannel sccKey = (ServerSocketChannel) key.channel();
                SocketChannel sc = sccKey.accept();
                System.out.println("new channel");
                System.out.println(sc);
                // 取消
                // key.cancel(); 
                //还可以继续把 sc 注册到Selector上,关注它的READAble
                
            }
        }
    }

四种事件:

static final int OP_READ = 1 << 0; //可读
static final int OP_WRITE = 1 << 2; // 可写
static final int OP_CONNECT = 1 << 3; // 客户端连接
static final int OP_ACCEPT = 1 << 4; // 客户端请求连接

在事件发生后,要么处理,要么取消,如果两者都不做的话,就会留在Selector中,使得下次Selector立刻返回上次未处理的事件。尤其要警惕,客户端断开连接(用channel.read的返回值判断是异常断开还是主动断开-1),对的情况,因为没法处理,会抛弃异常,需要finally取消它。

处理粘包与半包

无论采用固定大小,还是变长(判断分割符),或者申明数据长度的方式,都需要每个channel独享各自的ByteBuffer,通过附件的方法,独享的也要和消息大小适配:1. 大小可变 2. 多个buffer组成的数组(分散读,集中写)

SelectionKey scKey = sc.register(selector, 0, ByteBuffer.allocate(16));

ByteBuffer bf = (ByteBuffer)key.attachment();

selector的方法调用的时候是阻塞的,只有在这些时候不阻塞:

  • 发生关注的事件的时候
    • 客户端的连接请求,accept
    • 客户端的发数据,客户端的断开,都会触发读事件,且如果发送的数据大于buffer缓冲区,会多次触发读
    • channel可以写,触发write事件
  • selector.wakeup()方法
  • 调用selector.close()
  • selector所在的线程被interrupt

相关文章

  • Android 网络编程3 Java NIO

    Android网络编程 目录 1、Java NIO 的核心组件 Java NIO的核心组件包括:Channel(通...

  • Java NIO概览

    Java NIO 包含下列几个核心组件: ChannelsBuffersSelectors    Java NIO...

  • Java NIO 概述

    Java NIO 包括以下核心组件: Channels Buffers Selectors Java NIO 中除...

  • Overview

    Java NIO包括以下几个核心的组件: Channels Buffers Selectors Java NIO还...

  • Java NIO 概述

    Java NIO 主要包括一下核心组件: Channels Buffers Selectors Java NIO的...

  • 2. Java NIO 概述

    Java NIO由下面几个核心组件组成: Channel Buffer Selector Java NIO有更多的...

  • Java NIO之Channel

    本文开始讲解Java NIO 的三个核心组件,Channel,Buffer,Selector。先从Channel开...

  • 【NIO】NIO三剑客之一ByteBuffer介绍与使用

    ​谈及Java NIO,最核心的三个组件就是Channel 通道ByteBuffer 读写缓冲区Selector ...

  • 分布式理论架构设计

    IO模型 BIO 同步阻塞 NIO 同步非阻塞 AIO 异步非阻塞 NIO核心组件 Selector Channe...

  • nio核心三组件

    nio核心三组件 Buffer Buffer 是一个特定原始类型的容器。Buffer 是一个原始类型的线性的、有限...

网友评论

      本文标题:nio核心三组件

      本文链接:https://www.haomeiwen.com/subject/tibszltx.html