概述
socket流程.png先来回顾下epoll模式下socket的大致传输流程,其中TCP报文摘除TCP头以后从内核缓冲区拷贝到socket接收缓冲区时有可能会发生粘包半包的问题。针对粘包半包问题netty提供了很多拆箱即用的编解码器,本节先来分析下netty提供的解码器是如何实现的。
解码器体系
Decoder.png- 熟悉的套路netty提供的抽象类
ByteToMessageDecoder
继承ChannelInboundHandlerAdapter
作为一个hanlder,这样就可以在pipeline的事件传播中灵活的使用; -
ByteToMessageDecoder
提供模板方法并暴露抽象的decode方法,netty内置的几个解码器或用户自定义解码器只需要重写decode方法; - netty内置了几个比较常用的解码器
-
FixedLengthFrameDecoder
:固定长度解码器 -
LineBasedFrameDecoder
:行解码器 -
DelimiterBasedFrameDecoder
:分隔符解码器 -
LengthFieldBasedFrameDecoder
:可变长度解码器
-
ByteToMessageDecoder
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//只处理字节缓冲区类型
if (msg instanceof ByteBuf) {
//就是带有回收器的ArrayList
CodecOutputList out = CodecOutputList.newInstance();
try {
first = cumulation == null;
//新ByteBuf通过累加器进行累加,累加之后一起处理
cumulation = cumulator.cumulate(ctx.alloc(), first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
//解码
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
//不为空且不可读,释放累加器
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
//读取数据的次数大于阈值,则尝试丢弃已读的数据
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
//是否要去调用ChannelHandlerContext的read()来设置监听读事件,可能没读完; channelReadComplete事件中使用
firedChannelRead |= out.insertSinceRecycled();
//向下传播channelRead事件
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
//其他类型不处理,继续传递
ctx.fireChannelRead(msg);
}
}
- channelRead入口在
AbstractNioByteChannel.read
中,每次从接收缓冲区读取一段ByteBuf通过pipeline.fireChannelRead(byteBuf);
传播过来; -
cumulator.cumulate
新ByteBuf通过累加器进行累加,累加之后一起处理,主要针对,有不能解码成一个消息的数据,需要等待下一次读取到数据拼在一起进行解码; -
callDecode
累加器中字节流进行解码 -
cumulation != null && !cumulation.isReadable()
不为空且不可读(即累加器中ByteBuf已经全部被解码),则释放累加器 -
fireChannelRead
向下传播channelRead事件
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
//累加器是中是空的,并且新来的ByteBuf(in)又是连续的(说明不是复合缓冲区),则直接返回新ByteBuf
if (!cumulation.isReadable() && in.isContiguous()) {
// If cumulation is empty and input buffer is contiguous, use it directly
cumulation.release();
return in;
}
try {
final int required = in.readableBytes();
// 判断累加器是否需要扩容
// 新的可读大于累加器的最大可写 或新可读大于最大可重新(即可以通过重新分配来增加可写)并且又是共享的 或累加器是只读的
if (required > cumulation.maxWritableBytes() ||
(required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
cumulation.isReadOnly()) {
// Expand cumulation (by replacing it) under the following conditions:
// - cumulation cannot be resized to accommodate the additional data
// - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
// 扩容也很简单,就是new一个新的更大的累加器,把原累加器以及新ByteBuf中数据copy过来,之后释放原累加器
return expandCumulation(alloc, cumulation, in);
}
// 追加写入累加器
cumulation.writeBytes(in, in.readerIndex(), required);
// 把in的readerIndex置为writeIndex,相当于把in置为不可读
in.readerIndex(in.writerIndex());
return cumulation;
} finally {
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
//返回之前释放当前ByteBuf
in.release();
}
}
};
-
expandCumulation
累加器扩容 -
cumulation.writeBytes
追加写入累加器 -
in.readerIndex
把in的readerIndex置为writeIndex,相当于把in置为不可读
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
//累加器中有数据可读
while (in.isReadable()) {
int outSize = out.size();
//有数据已经被解码
if (outSize > 0) {
//已经解码的数据往下传递
fireChannelRead(ctx, out, outSize);
//传递后清空out
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
//在继续解码之前,检查是否已删除此context
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
//记录累加器内还有多少字节可读
int oldInputLength = in.readableBytes();
//解码
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
//没有解码出数据
if (outSize == out.size()) {
//如果解码后累加器中可读数据 = 之前可读数据,说明没有读取到累加器中内容(说明不符合子类解码器的要求),则直接跳出循环;否则继续解码
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
//解码器解码出了数据,但是累加器中可读数据量并没变,说明解码器有问题,报错
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
//是否只解码一次
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
-
outSize > 0
遍历过程中有数据已经被解码,则fireChannelRead
已经解码的数据往下传递,然后out.clear();
清空已解码数据 -
decodeRemovalReentryProtection
解码 -
outSize == out.size()
没有解码出数据,如果解码后累加器中可读数据 = 之前可读数据,说明没有读取到累加器中内容(说明不符合子类解码器的要求),则直接跳出循环;否则继续解码
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
//解码状态:0:初始 1:正在调用子类解码 2:解码器待删除
decodeState = STATE_CALLING_CHILD_DECODE;
try {
//调用子类进行解码
decode(ctx, in, out);
} finally {
//handleRemoved时decodeState置为2
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
//如果有被设置待删除状态,就马上传播下去处理
fireChannelRead(ctx, out, out.size());
out.clear();
//解码器从pipeline中移除
handlerRemoved(ctx);
}
}
}
-
decode
调用子类进行解码
其他分支逻辑比较简单,下一节分析下几个常用的内置解码器
网友评论