什么是 ByteBuf ?
- Java NIO 提供了 ByteBuffer 类作为字节的缓冲区,但是 ByteBuffer 的使用比较复杂,尤其是需要通过 flip() 方法对读写进行切换
- 因此 Netty 重新设计了一个字节缓冲区 ByteBuf,其有以下特点:
- 可拓展性
- 定义读索引和写索引,因此读写模式不需要 flip() 切换
- 零拷贝
- 实现了 ReferenceCounted 支持引用计数
- 支持池化
- 方法可以链式调用
- 自动容量扩展
- 更好的性能
ByteBuf 结构
- ByteBuf 包含了读指针(readerIndex)、写指针(writerIndex) ,这样的目的是为了解决 NIO 中 ByteBuffer 读写需要调用 flip() 方法进行读写切换,设计成两个索引的话读写互不影响,可同时进行

- ByteBuf 包含了三部分:
-
readable bytes:
- 表示未读字节,如果可读字节已耗尽,再次尝试从 ByteBuf 中读取内容则会抛出 IndexOutOfBoundsException,因此在读数据之前最好通 isReadable() 进行判断
while(byteBuf.isReadable()){ // 读内容 }
-
writable bytes:
- 表示剩余可写的字节空间,ByteBuf 默认最大容量为 Integer.MAX_VALUE,如果没有可写空间仍往 ByteBuf 中写数据会抛出 IndexOutOfBoundsException 异常,因此在写数据之前最好通过 isWriteable() 进行判断
while(byteBuf.isWriteable()){ // 写内容 }
-
discardable bytes:
- 表示已读的字节,可以被丢弃 ,下标为 0 ~ readerIndex 的字节就被视为 discardable bytes
-
readable bytes:
ByteBuf 注意点
- ByteBuf 可以通过 get 方法对每一个字节进行读操作,但所有以 get... 的方法只会读取相应字节,不会移动读指针,而所有以 read...、skip... 的方法都会读取或跳过指定字节并移动读指针
- ByteBuf 可以通过 set 方法对每一个字节进行写操作,但所有 set... 方法只会更新指定位置的索引,不会移动写指针,而所有 write... 的方法会在当前 writerIndex 写入具体的字节,并移动写指针
ByteBuf 缓冲区种类
-
HeapByteBuf 堆缓冲区:
- 堆缓冲区位于 JVM 堆内存,由 GC 回收,其申请和释放效率较高
-
DirectByteBuf 直接缓冲区:
- 直接缓冲区位于 JVM 堆外的操作系统内核空间,由操作系统管理申请和释放
- 直接缓冲区申请和释放效率都低于堆缓冲区
- 但直接缓冲区可以大大提高 I/O 效率。由于进行 I/O 操作时,常规下用户空间的数据(JAVA 即堆缓冲区)需要拷贝到内核空间(直接缓冲区),然后内核空间写到网络 SOCKET 或者文件中。如果在用户空间取得直接缓冲区,可直接向内核空间写数据,减少了一次拷贝,可大大提高 I/O 效率,也就是所说的零拷贝技术
- 直接缓冲区位于 JVM 堆外的操作系统内核空间,由操作系统管理申请和释放
-
Composite Buffer 复合缓冲区
- 复合缓冲区类似于一个 ByteBuf 的组合视图(类似于列表),通过它可以将多个 ByteBuf 组合起来当做一个连续的缓冲区进行操作
堆缓冲区与直接缓冲区的运用
- 通常,底层 I/O 处理线程使用直接缓冲区,可以减少一次 I/O 的复制,直接写入 Socket 或文件中
- 业务消息的编解码使用堆缓冲区,分配效率更高
ByteBuf 创建
- 可以通过 Unpooled 工厂类来创建非池化 ByteBuf:
// 分配一个堆缓冲区
public static ByteBuf buffer(int initialCapacity, int maxCapacity) {
return ALLOC.heapBuffer(initialCapacity, maxCapacity);
}
// 分配一个直接缓冲区
public static ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
return ALLOC.directBuffer(initialCapacity, maxCapacity);
}
// 将多个 bytebuf 包装成一个复合缓冲区
public static ByteBuf wrappedBuffer(byte[] array) {
if (array.length == 0) {
return EMPTY_BUFFER;
}
return new UnpooledHeapByteBuf(ALLOC, array, array.length);
}
// 返回一个复合缓冲区,并指定 bytebuf 的个数
public static CompositeByteBuf compositeBuffer(int maxNumComponents){
return new CompositeByteBuf(ALLOC, false, maxNumComponents);
}
- 以上所有的方法实际上是通过 ALLOC 静态变量进行了调用,来实现具体的 ByteBuf 创建,而 ALLOC 实际上是一个 ByteBufAllocator
private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT;
ByteBufAllocator 是一个专门负责分配 ByteBuf 的接口,该接口有几种实现:
- AbstractByteBufAllocator
- PooledByteBufAllocator
- PreferHeapByteBufAllocator
- UnpooledByteBufAllocator
非池化的实现是 UnpooledByteBufAllocator,池化的实现是 PooledByteBufAllocator
ByteBuf 池化
- 对于频繁使用的对象或创建比较耗时的对象,那么为了优化系统的性能,通常会对这些对象进行池化,例如我们所知的线程池、数据库连接池、字符串常量池等
- Netty 中 ByteBuf 也是被频繁使用的一种对象,而 Netty 对 ByteBuf 也实现了池化
- ByteBuf 类实现了 ReferenceCounted 接口,该接口标记一个类是一个需要用引用计数来管理的类,引用计数是实现池化的关键点
public interface ReferenceCounted {
// 返回当前对象的引用计数值,如果是0则表示当前对象已经被释放了
int refCnt();
// 引用计数加 1
ReferenceCounted retain();
// 引用计数+increment
ReferenceCounted retain(int increment);
// 引用计数减 1
boolean release();
// 引用计数减-decrement,如果当前引用计数为0,则释放当前对象,如果释放成功则返回true
boolean release(int decrement);
}
- 每个 ByteBuf 对象都会维护一个自身的引用计数,当对象被创建时,引用计数为 1,通过 retain() 增加引用计数,release() 减少引用计数,如果引用计数为 0 则释放当前对象
- 如果是池化的 ByteBuf 那么就会返回到池中
- 如果不是池化的 ByteBuf 则销毁底层的字节数组引用或者释放对应的堆内存
如何使用池化 ByteBuf
- 需要在创建客户端或者服务端的时候在引导辅助类中进行配置,使用内存池
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
- 使用 PooledByteBufAllocator 分配池化 ByteBuf
// 分配池化堆缓冲区
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(1024,2048);
// 分配池化直接缓冲区
ByteBuf directBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(1024,2048);
CompositeByteBuf
- Netty 提供了复合缓冲区(CompositeByteBuf),它就好似一个 ByteBuf 视图(类似于列表),可以将多个独立 ByteBuf 组合为一个逻辑上的 ByteBuf 进行操作
CompositeByteBuf 的运用场景
- 假设有一份协议的数据,它由头部和消息体组成,而头部和消息体分别存放在两个 ByteBuf 中,即:
ByteBuf header = ...
ByteBuf body = ...
- 我们在处理代码中,通常希望将 header 和 body 合并为一个 ByteBuf 方便处理,那么通常的做法是:
ByteBuf allBuf = Unpooled.buffer(header.readableBytes() + body.readableBytes());
allBuf.writeBytes(header);
allBuf.writeBytes(body);
- 可以看到,我们将 header 和 body 都拷贝到了新的 allBuf 中,这无形的增加了两次额外的数据拷贝操作。下面看一下 CompositeByteBuf 是如何实现这样的需求的:
ByteBuf header = ...
ByteBuf body = ...
CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
compositeByteBuf.addComponents(true, header, body);
- 其中第一个参数是 true, 表示当添加新的 ByteBuf 时, 自动递增 CompositeByteBuf 的 writeIndex,如果我们调用的是
compositeByteBuf.addComponents(header, body);
- 那么 compositeByteBuf 的 writeIndex 仍然是0, 因此此时我们就不可能从 compositeByteBuf 中读取到数据, 这一点要特别注意

wrap 操作
- 假设我们有一个 byte 数组,希望把它转换为 ByteBuf 对象以便后续操作,那么传统的做法时将 byte 数组写入到 ByteBuf 中,即:
byte[] bytes = ...
ByteBuf byteBuf = Unpooled.buffer();
byteBuf.writeBytes(bytes);
- 显然这种方式也是有一个额外的拷贝操作,我们可以使用 Unpooled 的相关方法,包装这个 byte 数组来生成一个新的 ByteBuf 实例,而不需要进行拷贝操作
byte[] bytes = ...
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
slice 操作
- slice 操作和 wrap 操作正好相反,Unpooled.wrappedBuffer 可以将多个 ByteBuf 合并成一个,而 slice 操作可以将一个 ByteBuf 切分为多个共享一个内存区域的 ByteBuf 对象,ByteBuf 提供了两个 slice 操作方法
public ByteBuf slice();
public ByteBuf slice(int index, int length);
- 不带参数的 slice() 方法等同于 buf.slice(buf.readerIndex(), buf.readableBytes()),即返回 buf 中可读部分的切片,而 slice(int index, int length) 方法可以设置不同的参数来获取到 buf 的不同区域的切片
ByteBuf byteBuf = ...
ByteBuf header = byteBuf.slice(0, 5);
ByteBuf body = byteBuf.slice(5, 10);
- slice 方法产生的 header 和 body 的过程都是没有拷贝操作的,header 和 body 对象内部其实是共享了 ByteBuf 的存储空间的不同部分而已

ByteBuf 源码分析
- 它虽被定义为抽象类,但其中并未实现任何方法,它扩展了 ReferenceCounted 实现引用计数,该类最重要的方法如下:
ByteBuf capacity(int newCapacity); // 设置缓冲区容量
ByteBuf order(ByteOrder endianness); // 设置缓冲区字节序
ByteBuf readerIndex(int readerIndex); // 设置缓冲区读索引
ByteBuf writerIndex(int writerIndex); // 设置缓冲区写索引
ByteBuf setIndex(int readerIndex, int writerIndex); // 设置读写索引
ByteBuf markReaderIndex(); // 标记读索引,写索引可类比
ByteBuf resetReaderIndex(); // 重置为标记的读索引
ByteBuf skipBytes(int length); // 略过指定字节(增加读索引)
ByteBuf clear(); // 读写索引都置0
int readableBytes(); // 可读的字节数
boolean isReadable(); // 是否可读
boolean isReadable(int size); // 指定的字节数是否可读
boolean hasArray(); // 判断底层实现是否为字节数组
byte[] array(); // 返回底层实现的字节数组
int arrayOffset(); // 底层字节数组的首字节位置
boolean isDirect(); // 判断底层实现是否为直接缓冲区
boolean hasMemoryAddress(); // 底层直接ByteBuffer是否有内存地址
long memoryAddress(); // 直接ByteBuffer的首字节内存地址
int indexOf(int fromIndex, int toIndex, byte value); // 查找首个特定字节的绝对位置
int bytesBefore(int index, int length, byte value); // 查找首个特定字节的相对位置,相对读索引
ByteBuf copy(); // 拷贝一个缓冲区,copy() 生成的ByteBuf完全独立于原ByteBuf
ByteBuf slice(); // slice() 和 duplicate() 生成的ByteBuf与原ByteBuf共享相同的底层实现,只是各自维护独立的索引和标记,使用这两个方法时,特别需要注意结合使用场景确定是否调用retain()增加引用计数
String toString(); // JAVA中Object的标准重载方法,返回ByteBuf的JAVA描述
String toString(Charset charset); // 返回使用指定编码集编码的缓冲区字节数据的字符形式
- 此处,Netty 使用了高聚合的设计模式,ByteBuf 全部都是抽象方法,它将子类可能使用到的方法都集中到了基类,再加上工厂模式生成 ByteBuf,给程序员带来了极大便利,不用接触具体的子类,只需要使用顶层的抽象类进行操作
AbstractByteBuf
- 抽象基类 AbstractByteBuf 中定义了 ByteBuf 的通用操作,比如读写索引以及标记索引的维护、容量扩增以及废弃字节丢弃等等
int readerIndex; // 读索引
int writerIndex; // 写索引
private int markedReaderIndex; // 标记读索引
private int markedWriterIndex; // 标记写索引
private int maxCapacity; // 最大容量
- 计算容量扩增的方法 calculateNewCapacity(minNewCapacity) 其中参数表示扩增所需的最小容量,通过源码可分析出:
- ByteBuf 最小的容量为 64b
- 当所需的最小容量在 64b 和 4mb 之间时翻倍扩容
- 当所需的最小容量超过 4mb 时每次扩容增加 4mb
private int calculateNewCapacity(int minNewCapacity) {
final int maxCapacity = this.maxCapacity;
final int threshold = 1048576 * 4; // 4MB的阈值
if (minNewCapacity == threshold) {
return threshold;
}
// 所需的最小容量超过阈值4MB,每次增加4MB
if (minNewCapacity > threshold) {
int newCapacity = (minNewCapacity / threshold) * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity; // 超过最大容量不再扩增
} else {
newCapacity += threshold; // 增加4MB
}
return newCapacity;
}
// 此时所需的最小容量小于阈值4MB,容量翻倍
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1; // 使用移位运算表示*2
}
return Math.min(newCapacity, maxCapacity);
}
- 丢弃已读字节的方法 discardReadBytes():
public ByteBuf discardReadBytes() {
if (readerIndex == 0) {
return this;
}
if (readerIndex != writerIndex) {
// 将readerIndex之后的数据移动到从0开始
setBytes(0, this, readerIndex, writerIndex - readerIndex);
writerIndex -= readerIndex; // 写索引减少readerIndex
adjustMarkers(readerIndex); // 标记索引对应调整
readerIndex = 0; // 读索引置0
} else {
// 读写索引相同时等同于clear操作
adjustMarkers(readerIndex);
writerIndex = readerIndex = 0;
}
return this;
}
- 频繁调用 discardReadBytes() 将导致数据的频繁前移,使性能损失。由此,提供了另一个方法 discardSomeReadBytes() 当读索引超过容量的一半时,才会进行数据前移,核心实现如下:
if (readerIndex >= capacity() >>> 1) {
setBytes(0, this, readerIndex, writerIndex - readerIndex);
writerIndex -= readerIndex;
adjustMarkers(readerIndex);
readerIndex = 0;
}
- 如果并不想丢弃字节,只期望读索引前移,可使用方法 skipBytes():
public ByteBuf skipBytes(int length) {
checkReadableBytes(length);
readerIndex += length;
return this;
}
- 以 getInt() 和 readInt() 为例,分析常用的数据获取方法,readInt 将增加读索引,getInt 则不会对索引产生任何影响。数据设置方法 setInt() 和 writeInt() 的实现可对应类比
public int getInt(int index) {
checkIndex(index, 4); // 索引正确性检查
return _getInt(index);
}
protected abstract int _getInt(int index);
public int readInt() {
checkReadableBytes0(4); // 检查索引
int v = _getInt(readerIndex);
readerIndex += 4; // 读索引增加
return v;
}
AbstractReferenceCountedByteBuf
- 该抽象类实现了 ByteBuf 引用计数相关的功能,每当调用 ByteBuf 的 release() 减少引用计数时,实际上是调用了 release0() 方法,下面是该方法的实现:
private boolean release0(int decrement) {
// AtomicIntegerFieldUpdater类的getAndAdd方法返回的是对象原来的值,然后再进行add操作
int oldRef = refCntUpdater.getAndAdd(this, -decrement);
// 如果oldRef==decrement,则说明该对象的引用计数正好被释放完,则可以进行对象的释放操作,也即调用deallocate()方法
if (oldRef == decrement) {
deallocate();
return true;
// 如果引用计数的原值小于要释放的值,或者decrement小于0,则会抛出引用计数出错的异常IllegalReferenceCountException
} else if (oldRef < decrement || oldRef - decrement > oldRef) {
// Ensure we don't over-release, and avoid underflow.
// 此处会将引用计数的值再增加回来
refCntUpdater.getAndAdd(this, decrement);
throw new IllegalReferenceCountException(oldRef, decrement);
}
return false;
}
// 引用计数对象的释放方法是一个抽象方法,由各个子类具体实现
protected abstract void deallocate();
-
下面看看各个 ByteBuf 的实现类是如何处理对象释放的
- 未池化的 UnpooledHeapByteBuf
@Override protected void deallocate() { freeArray(array); // 将byte[]的引用释放 array = null; }
- 未池化的 UnpooledDirectByteBuf
@Override protected void deallocate() { ByteBuffer buffer = this.buffer; if (buffer == null) { return; } this.buffer = null; // 释放堆外Buffer if (!doNotFree) { freeDirect(buffer); } }
- 池化的 PooledHeapByteBuf 与 池化的 PooledDirectByteBuf
@Override protected final void deallocate() { if (handle >= 0) { final long handle = this.handle; this.handle = -1; memory = null; tmpNioBuf = null; chunk.arena.free(chunk, handle, maxLength, cache); chunk = null; // 将该ByteBuf循环使用,即放回到池中去 recycle(); } } private void recycle() { recyclerHandle.recycle(this); } static final class DefaultHandle<T> implements Handle<T> { private Stack<?> stack; // 该变量就是用来保存回收的ByteBuf对象的 private Object value; DefaultHandle(Stack<?> stack) { this.stack = stack; } @Override public void recycle(Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); } // 把handle的对象push到栈中去 stack.push(this); } }
CompositeByteBuf
- Netty 通过该类实现了多个 ByteBuf 的组合且不需要进行对象的拷贝,其内部维护了一个 ComponentList 类型的变量 components,ComponentList 是一个继承 ArrayList 的内部类,其代码如下:
private static final class ComponentList extends ArrayList<Component> {
ComponentList(int initialCapacity) {
super(initialCapacity);
}
@Override
public void removeRange(int fromIndex, int toIndex) {
super.removeRange(fromIndex, toIndex);
}
}
- ComponentList 是一个保存 Component 的 List,而 Component 是一个内部类,它内部保存了一个 final 类型的 ByteBuf 对象
private static final class Component {
final ByteBuf buf;
final int length;
int offset;
int endOffset;
Component(ByteBuf buf) {
this.buf = buf;
length = buf.readableBytes();
}
void freeIfNecessary() {
// We should not get a NPE here. If so, it must be a bug.
buf.release();
}
}
- 往 CompositeByteBuf 中添加 ByteBuf 时,实际上是将 ByteBuf 封装成一个 Component 然后将他添加到 components 中,如下列代码所示:
private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
assert buffer != null;
boolean wasAdded = false;
try {
checkComponentIndex(cIndex);
int readableBytes = buffer.readableBytes();
// No need to consolidate - just add a component to the list.
//将ByteBuf封装成一个Component
@SuppressWarnings("deprecation")
Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
if (cIndex == components.size()) {
wasAdded = components.add(c);
if (cIndex == 0) {
c.endOffset = readableBytes;
} else {
Component prev = components.get(cIndex - 1);
c.offset = prev.endOffset;
c.endOffset = c.offset + readableBytes;
}
} else {
components.add(cIndex, c);
wasAdded = true;
if (readableBytes != 0) {
updateComponentOffsets(cIndex);
}
}
if (increaseWriterIndex) {
writerIndex(writerIndex() + buffer.readableBytes());
}
return cIndex;
} finally {
if (!wasAdded) {
buffer.release();
}
}
}
网友评论