美文网首页
Netty源码之ByteBuf(二)

Netty源码之ByteBuf(二)

作者: 0爱上1 | 来源:发表于2019-05-24 13:45 被阅读0次

引言

在上一篇文章Netty源码之ByteBuf(一)
我们简单介绍了ByteBuf的基础知识并根据源码分析了ByteBuf的创建过程,本文我们会继续学习ByteBuf,并从使用读写原理两方面来归纳整理


使用篇

使用篇会介绍在日常开发中,ByteBuf都是如何使用的

先回顾一下ByteBuf结构图

ByteBuf.png

使用ByteBuf代码示例

主要通过读写操作方法,观察buf结构中读写索引变化情况

  • 写操作

  // 实例化一个堆内字节缓冲区
  ByteBuf buf = Unpooled.buffer();
  
  System.out.println("写操作前==========================");
  System.out.println("readerIndex: " + buf.readerIndex());
  System.out.println("buffer readable bytes is " + buf.readableBytes());
  System.out.println("writerIndex: " + buf.writerIndex());
  System.out.println("buffer writable bytes is " + buf.writableBytes());
  System.out.println("buffer capacity is " + buf.capacity());

  buf.writeByte(new Random().nextInt());

  System.out.println("写操作后==========================");
  System.out.println("readerIndex: " + buf.readerIndex());
  System.out.println("buffer readable bytes is " + buf.readableBytes());
  System.out.println("writerIndex: " + buf.writerIndex());
  System.out.println("buffer writable bytes is " + buf.writableBytes());
  System.out.println("buffer capacity is " + buf.capacity());

控制台输出结果如下


写操作前==========================
readerIndex: 0
buffer readable bytes is 0
writerIndex: 0
buffer writable bytes is 256
buffer capacity is 256
写操作后==========================
readerIndex: 0
buffer readable bytes is 1
writerIndex: 1
buffer writable bytes is 255
buffer capacity is 256

通过以上结果,我们得出当调用buf的写相关操作方法时,写索引(writerIndex)会根据写入的字节数量而增加。与此同时,buf的可读字节数 (readableBytes) 值也会随写入的字节数量而增加,可写字节数(writableBytes)值随之减少

  • 读操作

基于以上写入的字节,我们执行读操作,观察读写索引以及相关属性的变化情况

代码示例如下:


System.out.println("读操作前==========================");
System.out.println("readerIndex: " + buf.readerIndex());
System.out.println("buffer readable bytes is " + buf.readableBytes());
System.out.println("writerIndex: " + buf.writerIndex());
System.out.println("buffer writable bytes is " + buf.writableBytes());
System.out.println("buffer capacity is " + buf.capacity());

byte readByte = buf.readByte();
System.out.println("readByte is " + readByte);

System.out.println("读操作后==========================");
System.out.println("readerIndex: " + buf.readerIndex());
System.out.println("buffer readable bytes is " + buf.readableBytes());
System.out.println("writerIndex: " + buf.writerIndex());
System.out.println("buffer writable bytes is " + buf.writableBytes());
System.out.println("buffer capacity is " + buf.capacity());

控制台输出如下:


读操作前==========================
readerIndex: 0
buffer readable bytes is 1
writerIndex: 1
buffer writable bytes is 255
buffer capacity is 256
readByte is 47
读操作后==========================
readerIndex: 1
buffer readable bytes is 0
writerIndex: 1
buffer writable bytes is 255
buffer capacity is 256

可以看到当我们进行读操作(readByte())调用时,读索引(readerIndex)会随读取字节数量增加,于此同时可读取字节数(readableBytes)随之减少,可写字节数保持不变

那如果buffer中没有可读字节的情况下,继续调用读取方法会发生什么呢?


Exception in thread "main" java.lang.IndexOutOfBoundsException: readerIndex(1) + length(1) exceeds writerIndex(1)

可以看到,当buffer中没有可读字节时,继续调用读取字节方法会抛出IndexOutOfBoundsException下标越界异常,那么实际开发中如何避免出现异常呢?

实际开发过程中,我们不可能这样不假思索的调用读写方法,都会进行相应的判断后再执行,类似的代码示例如下


while (buf.isReadable()) {
    System.out.println("------buffer中有至少一个字节可取,执行单字节读取操作-------";
    byte readByte1 = buf.readByte();
}

while (buf.isReadable(100)) {
    System.out.println("------buffer中有至少100个字节可取,执行一次读取100个字节操作-------";
    ByteBuf readByte = buf.readBytes(100);
}

原理篇

  • 读原理

\color{red}{ByteBuf}抽象类

/**
* Transfers this buffer's data to a newly created buffer starting at
* the current {@code readerIndex} and increases the {@code readerIndex}
* by the number of the transferred bytes (= {@code length}).
* The returned buffer's {@code readerIndex} and {@code writerIndex} are
* {@code 0} and {@code length} respectively.
*
* @param length the number of bytes to transfer
*
* @return the newly created buffer which contains the transferred bytes
*
* @throws IndexOutOfBoundsException
*         if {@code length} is greater than {@code this.readableBytes}
*/
public abstract ByteBuf readBytes(int length);

大概的意思就是将调用者的数据(从当前readerIndex开始,并增加指定length数量值)传输到一个新创建的buffer中

通俗点来说就分两步:

  1. 是从调用者buffer中读取length长度的数据

  2. 写入到新创建的buffer中并返回这个新创建的buffer,注意这里新创建buffer在被写入数据前,其readerIndex和writerIndex值都为0

需要注意的是参数length长度如果超过了调用者buffer的可读字节数,会抛出IndexOutOfBoundsException异常

下面我们看该抽象方法的具体实现

\color{red}{AbstractByteBuf}抽象类

AbstractByteBuf抽象类继承了ByteBuf抽象类,并实现了大部分父类中的抽象方法


@Override
public ByteBuf readBytes(int length) {
    // 1. 校验参数非负以及是否越界
    checkReadableBytes(length);
    // 2. 如果length是0,就返回空的buffer
    if (length == 0) {
        return Unpooled.EMPTY_BUFFER;
    }
    // 3. 分配一个新的buffer, 初始容量就是length,最大容量是调用者的最大容量
    ByteBuf buf = alloc().buffer(length, maxCapacity);
    // 4. 从原buffer中读取length长度的数据并写入新创建的buf中
    buf.writeBytes(this, readerIndex, length);
    // 5. 增加原buffer的读索引值
    readerIndex += length;
    // 6. 返回新创建的buf
    return buf;
}

这里我们重点分析一下以上方法中的第3.4.步

alloc()方法返回创建原始buffer的分配器ByteBufAllocator

再利用这个分配器去分配一个新的buffer,分配的过程上一篇文章已经分析过来,不清楚的回去看一下就ok了

ByteBuf抽象类中定义的方法


/**
* Transfers the specified source buffer's data to this buffer starting at
* the current {@code writerIndex} and increases the {@code writerIndex}
* by the number of the transferred bytes (= {@code length}).
* If {@code this.writableBytes} is less than {@code length}, {@link #ensureWritable(int)}
* will be called in an attempt to expand capacity to accommodate.
*
* @param srcIndex the first index of the source
* @param length   the number of bytes to transfer
*
* @throws IndexOutOfBoundsException
*         if the specified {@code srcIndex} is less than {@code 0}, or
*         if {@code srcIndex + length} is greater than {@code src.capacity}
*/
public abstract ByteBuf writeBytes(ByteBuf src, int srcIndex, int length);

将src源buffer中的数据(从srcIndex开始,length长度)读取出来并写入当前调用者buf中

这里依然是AbstractByteBuf抽象类实现了该方法


@Override
public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
    // 1. 确保可写
    ensureWritable(length);
    // 2. 开始写入src源buf中的数据,从writerIndex开始写
    setBytes(writerIndex, src, srcIndex, length);
    // 3. 增加writerIndex值
    writerIndex += length;
    return this;
}

setBytes方法也是在ByteBuf抽象类中定义的


/**
* Transfers the specified source buffer's data to this buffer starting at
* the specified absolute {@code index}.
* This method does not modify {@code readerIndex} or {@code writerIndex}
* of both the source (i.e. {@code this}) and the destination.
*
* @param srcIndex the first index of the source
* @param length   the number of bytes to transfer
*
* @throws IndexOutOfBoundsException
*         if the specified {@code index} is less than {@code 0},
*         if the specified {@code srcIndex} is less than {@code 0},
*         if {@code index + length} is greater than
*            {@code this.capacity}, or
*         if {@code srcIndex + length} is greater than
*            {@code src.capacity}
*/
public abstract ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length);

需要注意的是这个方法涉及到了真正的转移数据,读取以及写入,所以AbstractByteBuf抽象类没有实现该方法,而是交由各个实现类ByteBuf自己实现具体的读写字节操作

假设新创建的buf是以直接内存方式创建的(InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf

我们看其父类UnpooledUnsafeDirectByteBuf实现的setBytes方法

\color{red}{UnpooledUnsafeDirectByteBuf}


@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
    UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
    return this;
}

可以看到这里借助了UnsafeByteBufUtil工具类,参数也发生了变化,我们先分析一下参数

源参数:

数量4个

  • index 代表当前即将被写入(设置)数据的buf的writerIndex索引

  • src 代表的是提供写入数据的源buf

  • srcIndex 代表的是提供数据源buf的readerIndex索引

  • length 代表的是从提供数据源buf里读取多少数量的数据

变化后参数:

数量5个

  • this 代表被写入数据的buf指针

  • addr(index) 代表的是即将被写入数据的buf的实际可写内存地址


long addr(int index) {
  return memoryAddress + index;
}

另外三个参数不变

下面我们看UnsafeByteBufUtil工具类的setBytes方法做了什么


static void setBytes(AbstractByteBuf buf, long addr, int index, ByteBuf src, int srcIndex, int length) {
    buf.checkIndex(index, length);
    checkNotNull(src, "src");
    if (isOutOfBounds(srcIndex, length, src.capacity())) {
        throw new IndexOutOfBoundsException("srcIndex: " + srcIndex);
    }
    // 1. 重点在这里
    if (length != 0) {
        if (src.hasMemoryAddress()) {
            // 1.1. 
            PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, addr, length);
        } else if (src.hasArray()) {
            // 1.2. 
            PlatformDependent.copyMemory(src.array(), src.arrayOffset() + srcIndex, addr, length);
        } else {
            // 1.3. 
            src.getBytes(srcIndex, buf, index, length);
        }
    }
}

1.1.

提供数据的源buf有底层内存地址,则调用PlatformDependent.copyMemory(src.memoryAddress() + srcIndex, addr, length)完成内存拷贝

直接取到源buf的内存地址+readerIndex最为源地址,最后利用了Unsafe类完成内存拷贝

UNSAFE.copyMemory(srcAddr, dstAddr, length);

1.2.

提供数据的源buf没有底层内存地址,有后备字节数组,则调用
PlatformDependent.copyMemory(src.array(), src.arrayOffset() + srcIndex, addr, length)完成内存拷贝

获取源buf持有的数组,初始偏移量+readerIndex作为参数

\color{red}{PlatformDependent}


public static void copyMemory(byte[] src, int srcIndex, long dstAddr, long length) {
    PlatformDependent0.copyMemory(src, BYTE_ARRAY_BASE_OFFSET + srcIndex, null, dstAddr, length);
}

而内部最终调用了Unsafe类的copyMemory方法

public native void copyMemory(Object var1, long var2, Object var4, long var5, long var7);

1.3.

以上都不满足,源buf是一个复合类型,则调用具体的实现完成内存拷贝,我们这里先不考虑这种情况

至此我们知道了对于ByteBuf读操作的原理,其实说白了都是需要进行内存拷贝,就看怎么个拷贝法,是堆内与堆内之间的拷贝还是堆内与堆外之间的拷贝,其最终都利用Unsafe类完成

相关文章

网友评论

      本文标题:Netty源码之ByteBuf(二)

      本文链接:https://www.haomeiwen.com/subject/ifhbzqtx.html