Netty 权威指南笔记(五):ByteBuf 源码解读
功能介绍
Java 本身提供了 ByteBuffer 类,为什么 Netty 还要搞一个 ByteBuf 类呢?因为 ByteBuffer 类有着许多缺点:
- ByteBuffer 长度固定,无法动态伸缩。
- ByteBuffer 只有一个位置指针 position,读写的时候需要手工调用 flip 和 rewind 方法进行模式转换,操作繁琐,容易出错。
- 功能太少,缺少一些高级特性。
为了弥补这些不足,Netty 提供了自己的缓冲区类实现 ByteBuf。有什么特点呢?
- 两个位置指针协助缓冲区的读写操作:readerIndex、writerIndex,读写之间不需要调整指针位置,大大简化了读写操作。
- 可以动态扩容。
- 当有部分内容已经读取完成时,可以通过 discard 操作对缓冲区进行整理,在不重新申请内存的情况下,增大可写字节数目。
- 支持标记和回滚的功能。
- 支持在 ByteBuf 中查找某个字符串。
- 派生出另一个 ByteBuf:duplicate、copy、slice。
- 转化成标准的 ByteBuffer,这是因为在使用 NIO 进行网络读写时,操作的对象还是 JDK 标准的 ByteBuffer。
- 随机读写。
源码分析
继承关系
ByteBuf 的主要功能类继承关系如下图所示:
ByteBuf 类图.png从内存分配的角度看,ByteBuf 可以分为两类:
- 堆内存字节缓冲区 HeapByteBuf:优点是内存分配和回收速度快,可以被 JVM 自动回收。缺点是,如果进行 Socket 的 I/O 读写,需要额外做一次内存复制,在堆内存缓冲区和内核 Channel 之间进行复制,性能会有一定程度下降。
- 直接内存字节缓冲区 DirectByteBuf:非堆内存,直接在堆外进行分配。相比于堆内存,内存分配和回收稍慢,但是可以减少复制,提升性能。
两种内存,各有利弊。Netty 最佳实践表明:在 I/O 通信线程的读写缓冲区使用 DirectByteBuf,后端业务消息的编解码模块使用 HeapByteBuf,这样组合可以达到性能最优。
从内存回收的角度看,ByteBuf 也分为两类:基于对象池的 ByteBuf 和普通 ByteBuf。两者区别在于基于对象池的 ByteBuf 可以重用 ByteBuf 对象,它自己维护了一个内存池,可以循环利用创建的 ByteBuf,提升内存的使用效率,降低由于高负载导致的频繁 GC。内存池的缺点是管理和维护比较复杂,使用时需要更加谨慎。
下面我们对一些关键类进行分析和解读。
AbstractByteBuf
AbstractByteBuf 都做了哪些事儿呢?我们先看一下其主要的成员变量:
- 读写指针。
- 用于标记回滚的 marked 读写指针。
- 最大容量 maxCapacity,用于进行内存保护。
- 与本 ByteBuf 大小端属性相反的 ByteBuf:SwappedByteBuf。
我们发现这里没有真正存储数据的数据结构,例如 byte 数组或 DirectByteBuffer,原因是这里还不知道子类是要基于堆内存还是直接内存。
public abstract class AbstractByteBuf extends ByteBuf {
static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class);
int readerIndex;
int writerIndex;
private int markedReaderIndex;
private int markedWriterIndex;
private int maxCapacity;
private SwappedByteBuf swappedBuf;
}
接下来我们看看读操作 readBytes 方法,AbstractByteBuf 类做了什么呢?
- 在 checkReadableBytes 方法中,检查入参有效性。
- 修改读指针 readerIndex。
@Override
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
checkReadableBytes(length);
// getBytes 方法未在 AbstractByteBuf 中实现
getBytes(readerIndex, dst, dstIndex, length);
readerIndex += length;
return this;
}
protected final void checkReadableBytes(int minimumReadableBytes) {
ensureAccessible();
if (minimumReadableBytes < 0) {
throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
}
if (readerIndex > writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
readerIndex, minimumReadableBytes, writerIndex, this));
}
}
从当前 ByteBuf 中复制数据到 dst 是在 getBytes 方法中,该方法未在 AbstractByteBuf 中实现,也是因为此时具体如何存储数据尚不确定。
下面我们看一下写操作 writeBytes 方法,AbstractByteBuf 负责实现了哪些操作呢?
- 有效性检查,如果引用计数 refCnt 为 0,表示该 ByteBuf 已经被回收,不能再写入。
- 输入参数有效性检查:要写入的数据量不能小于 0,写入之后总数据量也不能大于最大容量。
- 当容量不足时,如果尚未超过最大容量,则进行扩容。
- 修改写指针。
@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
ensureAccessible();
ensureWritable(length);
// setBytes 交给子类实现。
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}
protected final void ensureAccessible() {
if (refCnt() == 0) {
throw new IllegalReferenceCountException(0);
}
}
@Override
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
if (minWritableBytes <= writableBytes()) {
return this;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// Normalize the current capacity to the power of 2.
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
// 具体扩容操作由子类实现。
capacity(newCapacity);
return this;
}
在读写操作中,AbstractByteBuf 主要负责参数校验、读写指针修改,以及写操作时的扩容计算。
除此之外,AbstractByteBuf 还提供了以下功能:
- 操作索引:修改读写指针、mark & reset。
- 重用缓冲区:discardReadBytes。
- 丢弃部分数据:skipBytes。因为丢弃时,只需要修改读指针即可,与数据具体如何存储无关。
总结:在 AbstractByteBuf 中实现的是各个子类中通用的功能。
AbstractReferenceCountedByteBuf
从类名可以看出来,该类主要提供引用计数的功能,类似于 JVM 内存回收的对象引用计数器,用于跟踪对象的分配和回收,实现手动控制内存回收。
首先,我们看一下其成员变量:
- refCnt:记录对象引用次数。
- refCntUpdater:用于对 refCnt 进行原子更新。
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;
static {
AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =
PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
if (updater == null) {
updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
}
refCntUpdater = updater;
}
private volatile int refCnt = 1;
接下来,我们看一下增加引用计数的 retain 方法。该方法是用 CAS 操作对 refCnt 进行加 1。另外,refCnt 值为 0 或 Integer.MAX_VALUE 值不能再操作,会抛出异常。
@Override
public ByteBuf retain() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, 1);
}
if (refCnt == Integer.MAX_VALUE) {
throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
break;
}
}
return this;
}
另一个 release 方法表示释放资源,会将引用计数 refCnt 减 1,如果当前 refCnt 等于 1,减 1 之后等于 0,表示对象已经没有被引用,可以被回收了,会调用 deallocate 方法释放内存。
@Override
public final boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
在 UnpooledHeapByteBuf 中,释放内存仅仅是把 array 数组置为 null,剩下的内存回收工作交由 JVM 来完成。
// in UnpooledHeapByteBuf.java
private byte[] array;
@Override
protected void deallocate() {
array = null;
}
在 UnpooledDirectByteBuf 中,则是调用 PlatformDependent.freeDirectBuffer 来释放直接内存。
// in UnpooledDirectByteBuf.java
@Override
protected void deallocate() {
ByteBuffer buffer = this.buffer;
if (buffer == null) {
return;
}
this.buffer = null;
if (!doNotFree) {
freeDirect(buffer);
}
}
protected void freeDirect(ByteBuffer buffer) {
PlatformDependent.freeDirectBuffer(buffer);
}
UnpooledHeapByteBuf
UnpooledHeapByteBuf 是基于堆内存进行内存分配的字节缓冲区,它没有基于对象池实现,意味着每次 I/O 读写都会创建一个新的 UnpooledHeapByteBuf 对象,频繁进行内存的分配和释放对性能会有一定的影响,但是相对堆外内存的申请和释放,成本稍低。
相比于 PooledHeapByteBuf,不需要自己管理内存池,不容易出现内存管理方面的问题,更容易使用和维护。因此,在满足性能的情况下,推荐使用 UnpooledHeapByteBuf。
首先看一下 UnpooledHeapByteBuf 的成员变量:
- 负责内存分配的 ByteBufAllocator。
- 缓冲区实现 byte 数组。
- 从 ByteBuf 到 NIO 的 ByteBuffer 的转换对象 tmpNioBuf。
private final ByteBufAllocator alloc;
private byte[] array;
private ByteBuffer tmpNioBuf;
在将 AbstractByteBuf 的时候,我们提到 getBytes、capacity 等方法是由子类来实现的,这里我们先看看 getBytes 的实现,从代码中可以看出来,是直接调用 System.arraycopy 进行的数组复制。
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
checkDstIndex(index, length, dstIndex, dst.length);
System.arraycopy(array, index, dst, dstIndex, length);
return this;
}
接下来看一下动态伸缩的 capacity 方法,主要做了以下几件事:
- 参数校验,newCapacity 不能小于 0,大于 maxCapacity。
- 如果 maxCapacity 大于 oldCapacity 表示扩容,直接申请新的 byte 数组,进行内存复制即可。
- 如果 maxCapacity 小于 oldCapacity 就是缩容了,同样申请 byte 数组。不同的是,需要根据读指针 readerIndex 与 newCapacity 的大小来决定是否需要进行内存复制。当 readerIndex 小于 newCapacity 时,需要复制内存,否则不需要。
- 设置合适的读写指针位置。
- 更新缓冲区字节数组引用 array 的值。
@Override
public ByteBuf capacity(int newCapacity) {
ensureAccessible();
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = array.length;
if (newCapacity > oldCapacity) {
byte[] newArray = new byte[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
setArray(newArray);
} else if (newCapacity < oldCapacity) {
byte[] newArray = new byte[newCapacity];
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
}
return this;
}
private void setArray(byte[] initialArray) {
array = initialArray;
tmpNioBuf = null;
}
public ByteBuf setIndex(int readerIndex, int writerIndex) {
if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",
readerIndex, writerIndex, capacity()));
}
this.readerIndex = readerIndex;
this.writerIndex = writerIndex;
return this;
}
从 ByteBuf 到 ByteBuffer 的转换,主要是使用了 ByteBuffer 的 wrap 方法:
@Override
public ByteBuffer nioBuffer(int index, int length) {
ensureAccessible();
return ByteBuffer.wrap(array, index, length).slice();
}
PooledByteBuf
PooledByteBuf 是 ByteBuf 的内存池实现,应用自己实现的内存池管理策略,一般和操作系统的内存管理策略差不多,往往会更简单些。PooledByteBuf 内存池的分配和释放,主要通过 PoolArena 来实现。比如在 capacity 方法中,最终会使用 arena 的 reallocate 方法来重新分配内存。
public final ByteBuf capacity(int newCapacity) {
ensureAccessible();
// If the request capacity does not require reallocation, just update the length of the memory.
if (chunk.unpooled) {
if (newCapacity == length) {
return this;
}
} else {
if (newCapacity > length) {
if (newCapacity <= maxLength) {
length = newCapacity;
return this;
}
} else if (newCapacity < length) {
if (newCapacity > maxLength >>> 1) {
if (maxLength <= 512) {
if (newCapacity > maxLength - 16) {
length = newCapacity;
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
return this;
}
} else { // > 512 (i.e. >= 1024)
length = newCapacity;
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
return this;
}
}
} else {
return this;
}
}
// 最终使用 arena 的 reallocate 方法来重新分配内存。
chunk.arena.reallocate(this, newCapacity, true);
return this;
}
PoolArena 是由多个 PoolChunk 组成的大块内存区域。
abstract class PoolArena<T> {
static final int numTinySubpagePools = 512 >>> 4;
final PooledByteBufAllocator parent;
private final int maxOrder;
final int pageSize;
final int pageShifts;
final int chunkSize;
final int subpageOverflowMask;
final int numSmallSubpagePools;
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
}
// PoolChunkList 是 PoolChunk 组成的链表
final class PoolChunkList<T> {
private final PoolArena<T> arena;
private final PoolChunkList<T> nextList;
PoolChunkList<T> prevList;
private final int minUsage;
private final int maxUsage;
private PoolChunk<T> head;
}
每个 PoolChunk 由多个 PoolSubpage 组成。
final class PoolChunk<T> {
final PoolArena<T> arena;
final T memory;
final boolean unpooled;
private final byte[] memoryMap;
private final byte[] depthMap;
private final PoolSubpage<T>[] subpages;
/** Used to determine if the requested capacity is equal to or greater than pageSize. */
private final int subpageOverflowMask;
private final int pageSize;
private final int pageShifts;
private final int maxOrder;
private final int chunkSize;
private final int log2ChunkSize;
private final int maxSubpageAllocs;
/** Used to mark memory as unusable */
private final byte unusable;
private int freeBytes; // 当前 chunk 空闲字节数目
PoolChunkList<T> parent; // 父节点
PoolChunk<T> prev; // 链表前一个节点
PoolChunk<T> next; // 链表后一个节点
}
PoolSubpage 负责管理一个 Page 的内存,通过 bitmap 中的每一位来标记每一块儿内存的占用状态。
final class PoolSubpage<T> {
final PoolChunk<T> chunk;
private final int memoryMapIdx; // 当前page在chunk中的id
private final int runOffset; // 当前page在chunk.memory的偏移量
private final int pageSize; // page大小
private final long[] bitmap; // 通过对每一个二进制位的标记来修改一段内存的占用状态
PoolSubpage<T> prev;
PoolSubpage<T> next;
boolean doNotDestroy;
int elemSize;
private int maxNumElems;
private int bitmapLength;
private int nextAvail;
private int numAvail;
}
PooledDirectByteBuf
PooledDirectByteBuf 基于内存池实现,与 UnPooledDirectByteBuf 的唯一区别就是,缓冲区的分配和销毁策略不同。不仅缓冲区所需内存使用内存池分配管理,PooledDirectByteBuf 对象本身,也使用 Recycler 管理。 比如 PooledDirectByteBuf 创建示例调用的是 Recycler 的 get 方法。
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
@Override
protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
return new PooledDirectByteBuf(handle, 0);
}
};
static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
buf.maxCapacity(maxCapacity);
return buf;
}
}
Recycler 是一个轻量级的对象池,一个对象池最核心的方法是从池中获取对象和回收对象到池中,分别对应其 get 和 recycle 方法。
public abstract class Recycler<T> {
public final T get() {
Stack<T> stack = threadLocal.get();
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
public final boolean recycle(T o, Handle<T> handle) {
DefaultHandle<T> h = (DefaultHandle<T>) handle;
if (h.stack.parent != this) {
return false;
}
h.recycle(o);
return true;
}
}
PooledDirectByteBuf 中的 copy 方法用于复制一个新的字节缓冲区实例,该方法首先调用 PooledByteBufAllocator 的 directBuffer 来生成新的 ByteBuf,然后复制数据。
@Override
public ByteBuf copy(int index, int length) {
checkIndex(index, length);
ByteBuf copy = alloc().directBuffer(length, maxCapacity());
copy.writeBytes(this, index, length);
return copy;
}
directBuffer 是在抽象类 AbstractByteBufAllocator 中实现的,进行参数校验之后调用 newDirectBuffer 来获取 ByteBuf,该方法由子类来实现。
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}
在内存池版本 PooledByteBufAllocator 的实现中,判断如果内存池 directArena 可用,则从中获取,否则自行 new 一个。
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
而在非内存池版本 UnpooledByteBufAllocator 中,则是直接 new 一个。
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
辅助类
- 内存分配相关: ByteBufAllocator 及其子类 UnpooledByteBufAllocator、PooledByteBufAllocator。
- 组合视图:CompositeByteBuf。
- 工具类:ByteBufUtil。
网友评论