美文网首页Netty
Netty基础-NIO(一)

Netty基础-NIO(一)

作者: 石头耳东 | 来源:发表于2022-06-04 20:32 被阅读0次

    零、本文纲要

    一、NIO三大组件

    1. Channel
    2. Buffer
    3. Selector

    二、Buffer

    1. 基础依赖
    2. ByteBuffer使用
    3. ByteBuffer结构
    4. ByteBuffer常见方法

    三、Buffer使用模拟

    1. 情景模拟
    2. 模拟还原数据

    一、NIO三大组件

    NIO,non-blocking io 非阻塞 IO

    Channel / Buffer / Selector

    1. Channel

    双向通道,可以从channel将数据读入buffer,也可以将buffer的数据写入channel;
    与stream对比,stream是单向的,要么输入要么输出。

    常见的Channel:
    FileChannel / DatagramChannel / SocketChannel / ServerSocketChannel

    2. Buffer

    用来缓冲读写数据。

    常见的Buffer:
    ByteBuffer(MappedByteBuffer/DirectByteBuffer/HeapByteBuffer) /
    ShortBuffer / IntBuffer / LongBuffer / FloatBuffer / DoubleBuffer / CharBuffer

    3. Selector

    ① 多线程处理多个Socket连接

    单个Thread对应单个Socket

    内存占用高 / 线程上下文切换成本高 / 仅适合【连接数少】的场景

    ② 线程池处理多个Socket连接

    单个Thread可以处理多个Socket

    阻塞模式下线程只能处理一个Socket / 仅适合【短连接】的场景

    ③ selector配合线程处理多个Socket

    selector 的作用就是配合一个线程来管理多个 channel,获取这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,不会让线程吊死在一个 channel 上。
    适合连接数特别多,但流量低的场景(low traffic)。

    调用 selector 的 select() 会阻塞直到 channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理。

    二、Buffer

    0. 基础依赖

    netty-all           4.1.39.Final
    lombok              1.16.18
    gson                2.8.5
    guava               19.0
    logback-classic     1.2.3
    protobuf-java       3.11.3
    

    1. ByteBuffer使用

    a、向 buffer 写入数据,例如调用 channel.read(buffer)
    b、调用 flip() 切换至读模式
    c、从 buffer 读取数据,例如调用 buffer.get()
    d、调用 clear() 或 compact() 切换至写模式
    e、重复 1~4 步骤

    try (RandomAccessFile file = new RandomAccessFile("src/main/resources/data.txt", "rw")) {
        FileChannel channel = file.getChannel();
        ByteBuffer buffer = ByteBuffer.allocate(16);
        do {
            //1. 向 buffer 写入
            int len = channel.read(buffer);
            log.debug("读到的字节数:{}", len);
            if (len == -1) {
                break;
            }
            //2. 切换 buffer 读模式
            buffer.flip();
            while (buffer.hasRemaining()) {
                log.debug("{}", (char) buffer.get());
            }
            //3. 切换 buffer 写模式
            buffer.clear();
        } while (true);
    } catch (IOException e) {
        log.info(e.getMessage());
    }
    

    2. ByteBuffer结构

    // Creates a new buffer with the given mark, position, limit, capacity,
    // backing array, and array offset
    ByteBuffer(int mark, int pos, int lim, int cap,   // package-private
                 byte[] hb, int offset)
    {
        super(mark, pos, lim, cap);
        this.hb = hb;
        this.offset = offset;
    }
    
    mark            标记位
    position        当前位
    limit           界限位
    capacity        容量
    backing array   支撑数组
    array offset    数组偏移
    

    3. ByteBuffer常见方法

    ① allocate方法

    用来给ByteBuffer分配空间

    public static ByteBuffer allocate(int capacity) {
        if (capacity < 0)
            throw new IllegalArgumentException();
        return new HeapByteBuffer(capacity, capacity);
    }
    
    HeapByteBuffer(int cap, int lim) {...} //此时容量对应limit写上线
    
    allocate方法.png

    ② channel#read方法 / buffer#put方法

    向 buffer 写入数据

    FileChannelImpl#read → IOUtil#readIntoNativeBuffer

    public final ByteBuffer put(byte[] src) {
        return put(src, 0, src.length);
    }
    
    public ByteBuffer put(byte[] src, int offset, int length) {
        checkBounds(offset, length, src.length);
        if (length > remaining())
            throw new BufferOverflowException();
        int end = offset + length;
        for (int i = offset; i < end; i++)
            this.put(src[i]);
        return this;
    }
    
    channel的read方法.png

    ③ filp方法

    切换至【读模式】,重置position、limit,可从buffer中读取数据

    public final Buffer flip() {
        limit = position;
        position = 0;
        mark = -1;
        return this;
    }
    
    filp方法.png

    注意:
    a、filp方法将 写limit定位到读limit,position重置为0,进而读取内容。
    b、另外此时mark也会被清除。

    ④ hasRemaining方法

    判断是否仍有剩余数据

    public final boolean hasRemaining() {
        return position < limit;
    }
    

    ⑤ buffer#get方法 / channel#write

    HeapByteBuffer#get → Buffer#nextGetIndex
    FileChannel#write(ByteBuffer[] srcs)

    get方法注意点:
    a、会使 position 读指针向后走;
    b、可以使用 rewind 方法,使 position 重置,而limit不变,用来重复度;
    c、调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针。

    get方法.png
    public final Buffer rewind() {
        position = 0;
        mark = -1;
        return this;
    }
    

    注意:rewind方法会重置mark标记。

    对比filp与rewind:后者 rewind 没有改变 limit指针 所指向的读上限。

    ⑥ clear方法

    public final Buffer clear() {
        position = 0;
        limit = capacity;
        mark = -1;
        return this;
    }
    
    clear方法.png

    注意:clear方法并没有清除内容,而是改变了指针的指向,提升了效率。

    ⑦ compact方法

    HeapByteBuffer#compact

    compact方法.png

    注意:compact方法允许我们未读完,而且可以在未读的后一个位置重新开始写。

    ⑧ mark方法 & reset方法

    public final Buffer mark() {
        mark = position;
        return this;
    }
    
    public final Buffer reset() {
        int m = mark;
        if (m < 0)
            throw new InvalidMarkException();
        position = m;
        return this;
    }
    
    mark方法 & reset方法.png

    注意:mark方法与reset方法允许我们在任意mark位置重新读,rewind方法是从头开始。

    ⑨ 字符串 与 buffer 互相转换

    ByteBuffer buffer = StandardCharsets.UTF_8.encode("StrToBuffer");
    CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer);
    

    三、Buffer使用模拟

    1. 情景模拟

    网络通信:
    a、客户端发送多条数据给服务端,数据间使用"\n"分隔;
    b、数据接收时为了提升效率,数据会被服务端重新组合。

    模拟数据为:
    a、Hello, NIO.\n
    b、I`m Stone.\n
    c、How are you?\n

    此时,服务器将数据重组,出现ByteBuffer (黏包,半包),如下:
    a、Hello, NIO.\nI`m Stone.\nHo 【24bytes】
    c、w are you?\n 【11bytes】

    2. 模拟还原数据

    省略了buffer动态扩容与收缩的业务逻辑,实际使用时,框架内一般会有代码实现。

    @Slf4j
    public class BufferDemo01 {
        public static void main(String[] args) {
            ByteBuffer buffer = ByteBuffer.allocate(32);
            //1. 接收到第一组数据
            //1.1 模拟接收到第一组数据
            buffer.put("Hello, NIO.\nI`m Stone.\nHo".getBytes(StandardCharsets.UTF_8));
            //1.2 处理第一组数据
            split(buffer);
            //2. 接收到第二组数据
            //2.1 模拟接收到第二组数据
            buffer.put("w are you?\n".getBytes(StandardCharsets.UTF_8));
            //2.2 处理第二组数据
            split(buffer);
        }
    
        public static void split(ByteBuffer buffer) {
            //1. 切换至 读模式
            buffer.flip();
            //2. 记录当前 读上限
            int originLimit = buffer.limit();
            //3. 处理当前数据
            for (int i = 0; i < originLimit; i++) {
                //3.1 如果读取到的数据是规定的 分隔符"\n"
                if (buffer.get(i) == '\n') {
                    log.debug("当前分隔符所在的位置:{},buffer.position():{}。", i, buffer.position());
                    ByteBuffer message = ByteBuffer.allocate(i + 1 - buffer.position());
                    buffer.limit(i + 1); //3.2 调整当前读上限为 message 容量
                    message.put(buffer); //3.3 从 buffer 读,向 message 写
                    //debugAll(message); //该方法是打印当前 message 的方法
                    buffer.limit(originLimit); //3.4 调整当前读上限为原先读originLimit
                }
            }
            //4. 如果当前数据有剩余,则将当前数据拼接至下组数据
            buffer.compact();
        }
    }
    
    输出内容.png

    四、结尾

    以上即为Netty基础-NIO(一)的全部内容,感谢阅读。

    相关文章

      网友评论

        本文标题:Netty基础-NIO(一)

        本文链接:https://www.haomeiwen.com/subject/vzokmrtx.html