什么是 ByteBuf ?
- Java NIO 提供了 ByteBuffer 类作为字节的缓冲区,但是 ByteBuffer 的使用比较复杂,尤其是需要通过 flip() 方法对读写进行切换
- 因此 Netty 重新设计了一个字节缓冲区 ByteBuf,其有以下特点:
- 可拓展性
- 定义读索引和写索引,因此读写模式不需要 flip() 切换
- 零拷贝
- 实现了 ReferenceCounted 支持引用计数
- 支持池化
- 方法可以链式调用
- 自动容量扩展
- 更好的性能
ByteBuf 结构
- ByteBuf 包含了读指针(readerIndex)、写指针(writerIndex) ,这样的目的是为了解决 NIO 中 ByteBuffer 读写需要调用 flip() 方法进行读写切换,设计成两个索引的话读写互不影响,可同时进行
ByteBuf 结构
ByteBuf 注意点
- ByteBuf 可以通过 get 方法对每一个字节进行读操作,但所有以 get... 的方法只会读取相应字节,不会移动读指针,而所有以 read...、skip... 的方法都会读取或跳过指定字节并移动读指针
- ByteBuf 可以通过 set 方法对每一个字节进行写操作,但所有 set... 方法只会更新指定位置的索引,不会移动写指针,而所有 write... 的方法会在当前 writerIndex 写入具体的字节,并移动写指针
ByteBuf 划分
- ByteBuf 的子类非常多,可以使用两种方式对 ByteBuf 进行分类
按底层实现
-
HeapByteBuf 堆缓冲区:
- 其底层实现为 Java 堆内的字节数组,堆缓冲区与普通堆对象类似,位于 JVM 堆内存区,可由 GC 回收,其申请和释放效率较高。常规 JAVA 程序使用建议使用该缓冲区
-
DirectByteBuf 直接缓冲区:
- DirectByteBuf 的底层实现为操作系统内核空间的字节数组,直接缓冲区的字节数组位于 JVM 堆外的 Native 堆,由操作系统管理申请和释放
- 而 DirectByteBuf 的引用由 JVM 管理。直接缓冲区由操作系统管理
- 一方面,申请和释放效率都低于堆缓冲区
- 另一方面,却可以大大提高 I/O 效率。由于进行 I/O 操作时,常规下用户空间的数据(JAVA 即堆缓冲区)需要拷贝到内核空间(直接缓冲区),然后内核空间写到网络 SOCKET 或者文件中。如果在用户空间取得直接缓冲区,可直接向内核空间写数据,减少了一次拷贝,可大大提高 I/O 效率,这也是常说的零拷贝
-
Composite Buffer 复合缓冲区
- 复合缓冲区是以上两种方式组合实现,这也是一种零拷贝技术
- 想象将两个缓冲区合并为一个的场景
- 一般情况下,需要将后一个缓冲区的数据拷贝到前一个缓冲区
- 而使用组合缓冲区则可以直接保存两个缓冲区,因为其内部实现组合两个缓冲区并保证用户如同操作一个普通缓冲区一样操作该组合缓冲区,从而减少拷贝操作
按是否使用对象池
-
UnpooledByteBuf
- 不使用对象池的缓冲区,不需要创建大量缓冲区对象时建议使用该类缓冲区
-
PooledByteBuf
- 对象池缓冲区,当对象释放后会归还给对象池,所以可循环使用
- 当需要大量且频繁创建缓冲区时,建议使用该类缓冲区
- Netty4.1 默认使用对象池缓冲区,4.0 默认使用非对象池缓冲区
ByteBuf 创建
-
官方建议通过 Unpooled 的辅助方法来创建 ByteBuf,它们都是静态方法,可以通过分配新空间或包装或复制现有字节数组,字节缓冲区和字符串来创建新的 ByteBuf,下面是这些方法的定义:
// 在堆上分配一个ByteBuf,并指定初始容量和最大容量
public static ByteBuf buffer(int initialCapacity, int maxCapacity) {
return ALLOC.heapBuffer(initialCapacity, maxCapacity);
}
// 在堆外分配一个ByteBuf,并指定初始容量和最大容量
public static ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
return ALLOC.directBuffer(initialCapacity, maxCapacity);
}
// 使用包装的方式,将一个byte[]包装成一个ByteBuf后返回
public static ByteBuf wrappedBuffer(byte[] array) {
if (array.length == 0) {
return EMPTY_BUFFER;
}
return new UnpooledHeapByteBuf(ALLOC, array, array.length);
}
// 返回一个组合ByteBuf,并指定组合的个数
public static CompositeByteBuf compositeBuffer(int maxNumComponents){
return new CompositeByteBuf(ALLOC, false, maxNumComponents);
}
- 以上所有的方法实际上是通过一个叫 ALLOC 的静态变量进行了调用,来实现具体的 ByteBuf 创建,而 ALLOC 实际上是一个 ByteBufAllocator
private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT;
- ByteBufAllocator 是一个专门负责 ByteBuf 分配的接口,该接口有几种实现:AbstractByteBufAllocator、PooledByteBufAllocator、PreferHeapByteBufAllocator、UnpooledByteBufAllocator。对应的 Unpooled 实现类就是 UnpooledByteBufAllocator
ByteBuf 关键类源码分析
ByteBuf 类结构
ByteBuf
- 它虽被定义为抽象类,但其中并未实现任何方法,它扩展了 ReferenceCounted 实现引用计数,该类最重要的方法如下:
ByteBuf capacity(int newCapacity); // 设置缓冲区容量
ByteBuf order(ByteOrder endianness); // 设置缓冲区字节序
ByteBuf readerIndex(int readerIndex); // 设置缓冲区读索引
ByteBuf writerIndex(int writerIndex); // 设置缓冲区写索引
ByteBuf setIndex(int readerIndex, int writerIndex); // 设置读写索引
ByteBuf markReaderIndex(); // 标记读索引,写索引可类比
ByteBuf resetReaderIndex(); // 重置为标记的读索引
ByteBuf skipBytes(int length); // 略过指定字节(增加读索引)
ByteBuf clear(); // 读写索引都置0
int readableBytes(); // 可读的字节数
boolean isReadable(); // 是否可读
boolean isReadable(int size); // 指定的字节数是否可读
boolean hasArray(); // 判断底层实现是否为字节数组
byte[] array(); // 返回底层实现的字节数组
int arrayOffset(); // 底层字节数组的首字节位置
boolean isDirect(); // 判断底层实现是否为直接缓冲区
boolean hasMemoryAddress(); // 底层直接ByteBuffer是否有内存地址
long memoryAddress(); // 直接ByteBuffer的首字节内存地址
int indexOf(int fromIndex, int toIndex, byte value); // 查找首个特定字节的绝对位置
int bytesBefore(int index, int length, byte value); // 查找首个特定字节的相对位置,相对读索引
ByteBuf copy(); // 拷贝一个缓冲区,copy() 生成的ByteBuf完全独立于原ByteBuf
ByteBuf slice(); // slice() 和 duplicate() 生成的ByteBuf与原ByteBuf共享相同的底层实现,只是各自维护独立的索引和标记,使用这两个方法时,特别需要注意结合使用场景确定是否调用retain()增加引用计数
String toString(); // JAVA中Object的标准重载方法,返回ByteBuf的JAVA描述
String toString(Charset charset); // 返回使用指定编码集编码的缓冲区字节数据的字符形式
- 此处,Netty 使用了高聚合的设计模式,ByteBuf 全部都是抽象方法,它将子类可能使用到的方法都集中到了基类,再加上工厂模式生成 ByteBuf,给程序员带来了极大便利,不用接触具体的子类,只需要使用顶层的抽象类进行操作
AbstractByteBuf
- 抽象基类 AbstractByteBuf 中定义了 ByteBuf 的通用操作,比如读写索引以及标记索引的维护、容量扩增以及废弃字节丢弃等等
- 该类的私有变量
int readerIndex; // 读索引
int writerIndex; // 写索引
private int markedReaderIndex; // 标记读索引
private int markedWriterIndex; // 标记写索引
private int maxCapacity; // 最大容量
- 计算容量扩增的方法 calculateNewCapacity(minNewCapacity) 其中参数表示扩增所需的最小容量,通过源码可分析出:
- ByteBuf 最小的容量为 64b
- 当所需的最小容量在 64b 和 4mb 之间时翻倍扩容
- 当所需的最小容量超过 4mb 时每次扩容增加 4mb
private int calculateNewCapacity(int minNewCapacity) {
final int maxCapacity = this.maxCapacity;
final int threshold = 1048576 * 4; // 4MB的阈值
if (minNewCapacity == threshold) {
return threshold;
}
// 所需的最小容量超过阈值4MB,每次增加4MB
if (minNewCapacity > threshold) {
int newCapacity = (minNewCapacity / threshold) * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity; // 超过最大容量不再扩增
} else {
newCapacity += threshold; // 增加4MB
}
return newCapacity;
}
// 此时所需的最小容量小于阈值4MB,容量翻倍
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1; // 使用移位运算表示*2
}
return Math.min(newCapacity, maxCapacity);
}
- 丢弃已读字节的方法 discardReadBytes():
public ByteBuf discardReadBytes() {
if (readerIndex == 0) {
return this;
}
if (readerIndex != writerIndex) {
// 将readerIndex之后的数据移动到从0开始
setBytes(0, this, readerIndex, writerIndex - readerIndex);
writerIndex -= readerIndex; // 写索引减少readerIndex
adjustMarkers(readerIndex); // 标记索引对应调整
readerIndex = 0; // 读索引置0
} else {
// 读写索引相同时等同于clear操作
adjustMarkers(readerIndex);
writerIndex = readerIndex = 0;
}
return this;
}
- 频繁调用 discardReadBytes() 将导致数据的频繁前移,使性能损失。由此,提供了另一个方法 discardSomeReadBytes() 当读索引超过容量的一半时,才会进行数据前移,核心实现如下:
if (readerIndex >= capacity() >>> 1) {
setBytes(0, this, readerIndex, writerIndex - readerIndex);
writerIndex -= readerIndex;
adjustMarkers(readerIndex);
readerIndex = 0;
}
- 如果并不想丢弃字节,只期望读索引前移,可使用方法 skipBytes():
public ByteBuf skipBytes(int length) {
checkReadableBytes(length);
readerIndex += length;
return this;
}
- 以 getInt() 和 readInt() 为例,分析常用的数据获取方法,readInt 将增加读索引,getInt 则不会对索引产生任何影响。数据设置方法 setInt() 和 writeInt() 的实现可对应类比
public int getInt(int index) {
checkIndex(index, 4); // 索引正确性检查
return _getInt(index);
}
protected abstract int _getInt(int index);
public int readInt() {
checkReadableBytes0(4); // 检查索引
int v = _getInt(readerIndex);
readerIndex += 4; // 读索引增加
return v;
}
AbstractReferenceCountedByteBuf
- 该抽象类实现引用计数相关的功能,当需要使用一个对象时,计数加 1,不再使用时计数减 1
- 通过 retain() 函数增加计数,release() 函数减少计数,当计数器减为 0 时触发 deallocate() 释放缓冲区内存
CompositeByteBuf
- Netty 通过该类实现了多个 ByteBuf 的组合且不需要进行对象的拷贝,其内部维护了一个 ComponentList 类型的变量 components,ComponentList 是一个继承 ArrayList 的内部类,其代码如下:
private static final class ComponentList extends ArrayList<Component> {
ComponentList(int initialCapacity) {
super(initialCapacity);
}
@Override
public void removeRange(int fromIndex, int toIndex) {
super.removeRange(fromIndex, toIndex);
}
}
- ComponentList 是一个保存 Component 的 List,而 Component 是一个内部类,它内部保存了一个 final 类型的 ByteBuf 对象
private static final class Component {
final ByteBuf buf;
final int length;
int offset;
int endOffset;
Component(ByteBuf buf) {
this.buf = buf;
length = buf.readableBytes();
}
void freeIfNecessary() {
// We should not get a NPE here. If so, it must be a bug.
buf.release();
}
}
- 往 CompositeByteBuf 中添加 ByteBuf 时,实际上是将 ByteBuf 封装成一个 Component 然后将他添加到 components 中,如下列代码所示:
private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
assert buffer != null;
boolean wasAdded = false;
try {
checkComponentIndex(cIndex);
int readableBytes = buffer.readableBytes();
// No need to consolidate - just add a component to the list.
//将ByteBuf封装成一个Component
@SuppressWarnings("deprecation")
Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
if (cIndex == components.size()) {
wasAdded = components.add(c);
if (cIndex == 0) {
c.endOffset = readableBytes;
} else {
Component prev = components.get(cIndex - 1);
c.offset = prev.endOffset;
c.endOffset = c.offset + readableBytes;
}
} else {
components.add(cIndex, c);
wasAdded = true;
if (readableBytes != 0) {
updateComponentOffsets(cIndex);
}
}
if (increaseWriterIndex) {
writerIndex(writerIndex() + buffer.readableBytes());
}
return cIndex;
} finally {
if (!wasAdded) {
buffer.release();
}
}
}
ByteBuf 池化
- 对于频繁使用的对象或创建比较耗时的对象,那么为了优化系统的性能,通常会对这些对象进行池化,例如我们所知的线程池、数据库连接池、字符串常量池等
- Netty 中 ByteBuf 也是被频繁使用的一种对象,而 Netty 对 ByteBuf 也实现了池化,引用计数就是实现池化的关键点
- ByteBuf 类实现了 ReferenceCounted 接口,该接口标记一个类是一个需要用引用计数来管理的类
public interface ReferenceCounted {
// 返回当前对象的引用计数值,如果是0则表示当前对象已经被释放了
int refCnt();
// 引用计数加1
ReferenceCounted retain();
// 引用计数加increment
ReferenceCounted retain(int increment);
// 引用计数减1
boolean release();
// 引用计数减decrement,如果当前引用计数为0,则释放当前对象,如果释放成功则返回true
boolean release(int decrement);
}
- 每一个使用引用计数的对象,都会维护一个自身的引用计数,当对象被创建时,引用计数为 1,通过 retain() 增加引用计数,release() 减少引用计数,如果引用计数为 0 则释放当前对象
- 在 ByteBuf 的各个子类中它们会自己决定如何释放对象,如果是池化的 ByteBuf 那么就会返回到池子中,如果不是池化的则销毁底层的字节数组引用或者释放对应的堆外内存
- 引用计数的 ByteBuf 是通过 AbstractReferenceCountedByteBuf 的 release() 方法实现,而 release() 方法实际调用了 release0() 方法,让我们看一下具体的方法实现:
private boolean release0(int decrement) {
// AtomicIntegerFieldUpdater类的getAndAdd方法返回的是对象原来的值,然后再进行add操作
int oldRef = refCntUpdater.getAndAdd(this, -decrement);
// 如果oldRef==decrement,则说明该对象的引用计数正好被释放完,则可以进行对象的释放操作,也即调用deallocate()方法
if (oldRef == decrement) {
deallocate();
return true;
// 如果引用计数的原值小于要释放的值,或者decrement小于0,则会抛出引用计数出错的异常IllegalReferenceCountException
} else if (oldRef < decrement || oldRef - decrement > oldRef) {
// Ensure we don't over-release, and avoid underflow.
// 此处会将引用计数的值再增加回来
refCntUpdater.getAndAdd(this, decrement);
throw new IllegalReferenceCountException(oldRef, decrement);
}
return false;
}
// 引用计数对象的释放方法是一个抽象方法,由各个子类具体实现
protected abstract void deallocate();
-
下面看看各个 ByteBuf 的实现类是如何处理对象释放的
@Override
protected void deallocate() {
freeArray(array);
// 将byte[]的引用释放
array = null;
}
- 未池化的 UnpooledDirectByteBuf
@Override
protected void deallocate() {
ByteBuffer buffer = this.buffer;
if (buffer == null) {
return;
}
this.buffer = null;
// 释放堆外Buffer
if (!doNotFree) {
freeDirect(buffer);
}
}
- 池化的 PooledHeapByteBuf 与 池化的 PooledDirectByteBuf
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
tmpNioBuf = null;
chunk.arena.free(chunk, handle, maxLength, cache);
chunk = null;
// 将该ByteBuf循环使用,即放回到池中去
recycle();
}
}
private void recycle() {
recyclerHandle.recycle(this);
}
static final class DefaultHandle<T> implements Handle<T> {
private Stack<?> stack;
// 该变量就是用来保存回收的ByteBuf对象的
private Object value;
DefaultHandle(Stack<?> stack) {
this.stack = stack;
}
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
// 把handle的对象push到栈中去
stack.push(this);
}
}
- 可以看出释放池化的 ByteBuf 对象就是将对象重新回收的一个过程
引言
- 从上面我们已经了解了 ByteBuf 的分配是通过 ByteBufAllocator 接口进行分配的,而该接口有两种实现:
- PooledByteBufAllocator
- UnpooledByteBufAllocator
- Netty 默认使用了 PooledByteBufAllocator 对分配的 ByteBuf 进行池化
- 但是我们可以通过 ChannelConfig 或者在 ServerBootStrap 引导程序中指定一个分配器来更改默认的设置
网友评论