不论我们在前面学习NIO的ByteBuffer,还是现在Netty当中的ByteBuf,其都有使用直接内存的方式。
在Netty当中,我们使用完直接内存,需要去手动进行释放,而不应该等待GC去进行回收,以减少发生内存溢出的风险。
一、ByteBuf的种类
关于其种类,有很多种,我们根据前面提到的池化机制,将其主要分为两大类,每一类当当中又分为堆内存和直接内存:
- UnpooledHeapByteBuf:非池化堆内存ByteBuf,受JVM内存管理,可以等待GC回收。
- UnpooledDirectByteBuf:非池化直接内存ByteBuf,不收JVM管理,虽然可以受GC回收,但不是及时的,可能会发生内存溢出,需要手动进行回收。
- PooledByteBuf:池化ByteBuf,这种有更复杂的回收范式,后面通过源码分析,具体查看其实现细节。
- PooledHeapByteBuf:池化堆内存ByteBuf
- PooledDirectByteBuf:池化直接内存ByteBuf
二、直接内存回收原理
在前面的文章中,我们简单聊到过ByteBuf的结构:
public abstract class ByteBuf implements ReferenceCounted
如上所示,其实现了ReferenceCounted的接口,接口翻译过来叫做“引用计数”。
相信学过jvm GC的同学应该有所了解“引用计数法”,当一个对象有引用时,我们就对计数器加1,反之就减1,但是引用计数法无法处理环形垃圾,所以后面提出了“根可达算法”,简单提一下,需要了解细节的朋友可以看我的专题【JVM】。
此处的引用计数,用于ByteBuf的直接内存回收,我们看下其主要的方法:
public interface ReferenceCounted {
/**
* 返回当前对象的引用计数
*/
int refCnt();
/**
* 将引用计数增加1
*/
ReferenceCounted retain();
/**
* 按指定的increment增加引用计数
*/
ReferenceCounted retain(int increment);
/**
* 将引用计数减少1,并在引用计数达到0解除分配此对象
*/
boolean release();
/**
* 将引用计数减少指定的decrement ,如果引用计数达到0则取消分配此对象。
*/
boolean release(int decrement);
}
所有的ByteBuf都会实现这个接口,当一个新的ReferenceCounted被实例化时,它以1的引用计数开始。 retain()增加引用计数,而release()减少引用计数。 如果引用计数减少到0 ,对象将被释放,并且访问释放的对象通常会导致访问冲突。
通过下面的代码简单试用一下:
public static void main(String[] args) {
ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
//打印当前的引用计数
System.out.println("初始化后的引用计数" + byteBuf.refCnt());
//释放引用计数
byteBuf.release();
//打印当前的引用计数
System.out.println("释放后的引用计数" + byteBuf.refCnt());
//调用byteBuf
try {
byteBuf.writeInt(888);
} catch (Exception e) {
System.out.println("释放后调用异常:" + e);
}
//增加引用计数
try {
byteBuf.retain();
} catch (Exception e) {
System.out.println("释放后增加引用计数异常:" + e);
}
// 重新分配
byteBuf = ByteBufAllocator.DEFAULT.buffer();
//调用byteBuf
byteBuf.writeInt(888);
System.out.println("重新分配后的引用计数" + byteBuf.refCnt());
}
结果:
初始化后的引用计数1
释放后的引用计数0
释放后调用异常:io.netty.util.IllegalReferenceCountException: refCnt: 0
释放后增加引用计数异常:io.netty.util.IllegalReferenceCountException: refCnt: 0, increment: 1
重新分配后的引用计数1
当引用计数变为0后,整个内存就释放了,再次使用会抛出异常,重新尝试增加引用计数也会跑出异常,只能进行重新分配。
三、内存释放使用方式
3.1 手动释放
前面简单了解了关于内存释放的内容,那么我们应该如何使用呢?是不是可以向我们习惯的java代码一样,在finally当中调用呢?
try {
} finally {
byteBuf.release();
}
直接给出结论,是不行的。
前面我们介绍时候就说过,会有几率造成内存溢出的,即使不会发生也会造成内存的浪费。
前面的文章当中,我们学习了Pipeline和Handler。通常我们会将一个byteBuf传递给另一个channelHandler去处理,是存在一个传递性的。这里面存在两种情况:
- 假设一共有5个channelHandler,在第二个当中,将byteBuf转换成了java对象,然后将对象传递给第三个channelHandler,此时byteBuf就没有用了,所以此时就应该释放。
- 一直以byteBuf传递,直到最后一个channelHandler才进行释放。
总结一句话:最后谁用完了,谁就负责释放。
建议:如果确定这个buf在最后时刻用完了,而又无法确定当前有多少个引用计数,使用如下两种方式释放:
- 循环调用release(),知道返回true。
- 通过refCnt()获取当前的引用计数,然后调用release(int refCnt)释放。
3.2 tail和head自动释放
还记得前面将Pipeline和Handler时,提到了关于head和tail的概念,除了我们自己添加的Handler以外,会默认有一个头和尾的处理器。
在这两个处理器当中,也会有自动回收内存的保底能力,但是前提是要求我们将byteBuf传递到head或tail当中才行,对于中途就转换类型的,仍然需要我们自己去释放资源。
前面我们还学习过入站处理器和出栈处理器,其中入站处理器传递内容需要使用channelRead()方法,而在出站处理器传递参数需要使用write方法,这将作为我们跟踪代码的标记。
下面我们简单跟踪下源码,看看是如何实现的内存释放。
我们跟踪pipeline的addLast方法,跟踪到了AbstractChannelHandlerContext这个抽象类,其有两个实现类:
刚好对应我们的head和tail处理器。
3.2.1 TailContext
首先看tail处理器,实现了ChannelInboundHandler,即入站处理器,进行入站首尾工作。
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler
找到channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
DefaultChannelPipeline.this.onUnhandledInboundMessage(ctx, msg);
}
继续跟踪onUnhandledInboundMessage
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
发现其中的引用计数工具类,调用了release方法:
ReferenceCountUtil.release(msg);
判断msg是否是实现了ReferenceCounted ?是就进行是否,否则返回false。
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}
3.2.1 HeadContext
查看HeadContext,实现了ChannelOutboundHandler,即出站处理器,进行出站首尾工作。
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler
找到其write方法:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
this.unsafe.write(msg, promise);
}
继续跟踪write:
public final void write(Object msg, ChannelPromise promise) {
this.assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
this.safeSetFailure(promise, this.newClosedChannelException(AbstractChannel.this.initialCloseCause));
ReferenceCountUtil.release(msg);
} else {
int size;
try {
msg = AbstractChannel.this.filterOutboundMessage(msg);
size = AbstractChannel.this.pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable var6) {
this.safeSetFailure(promise, var6);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
}
在上面的代码中,仍然发现了
ReferenceCountUtil.release(msg)
其他代码此文暂时不做讲解了。
无论是head,还是tail,都需要将buf传递过来,才能进行释放。
本文暂时介绍这些,后面继续,有帮助的话点个赞吧。
网友评论