基于TCP协议处理网络数据经常面对半包和粘包问题,那么什么是半包问题,什么是粘包问题呢?应用层消息在被发送到网络之前会经过TCP/IP协议栈的包装,每一层协议都有自己的长度限制,如果用户发送的消息大于ip层的长度限制那么这个消息就会被分割成多个ip数包发送出去。发送端发送的消息会被先放到发送缓存中,这样接受端接受到的二进制数据可能包含多个消息,上面的情况就会导致以下两种情况的出现
1)半包问题:接收端接受到的数据只是客户端发送的消息的一部分。如果我们发送的消息比较大,那么这条消息就有可能会被分割成不同的ip数据报,那么接受端一次接受到的数据可能只是原始数据的一部分
2)粘包问题:接受端接受到的数据包含多条消息。发送端一次发送了多条数据,接受端无法从读取到的数据中去解析出发送端发送的消息
针对半包和粘包的问题,我们有以下一些通用的解决方案
1)使用消息分割符来标识出每个消息在二进制数据流中的截止位置
2)在每个消息的头部标识出消息的长度
3)发送消息的长度是固定值
上面这些解决方案对于数据接收端来说都需要一个字节累积器,读取到的字节数据会在字节累积器累积直到能根据字节累积器中累积的字节解析出一条消息,从字节累积器解析出这条消息后,把这条消息对应的二进制数据从字节累积器中清除然后继续解析下一条消息。
在netty中接受端从二进制流中取得消息的过程叫做解码,实现解码功能的类叫做解码器,上面三种半包/粘包的解决方案netty都提供了相应的编解码器去实现,我们分析第二种方案在netty中对应的解码器实现。
LengthFieldBasedFrameDecoder
有类消息在消息中用固定长度的字节来存储消息体的长度,对于这类消息netty定义了LengthFieldBasedFrameDecoder来解码它们
ByteToMessageDecoder
二进制数据流解码成具体消息的基类,它是LengthFieldBasedFrameDecoder的父类,同时ByteToMessageDecoder继承了ChannelInboundHandlerAdapter
当netty从网络IO中读取到一份数据的时候会触发ByteToMessageDecoder.channelRead 方法
ByteToMessageDecoder.channelRead
我们分析下ByteToMessageDecoder.channelRead的方法
//msg 就是通过网络IO读取到的数据
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//ByteToMessageDecoder 处理的数据类型是ByteBuf
if (msg instanceof ByteBuf) {
//CodecOutputList是用来存放将来解析出来的消息
CodecOutputList out = CodecOutputList.newInstance();
try {
first = cumulation == null;
//cumulation上面有说到的字节累积器,它是一个ByteBuf
//cumulator是字节累积算法实现类,在ByteToMessageDecoder中它的实现是MERGE_CUMULATOR
//cumulator.cumulate把读取到的msg写入到cumulation中
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
//callDecode 就是把字节累积器交给具体的解码器实现去解码
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
//如果cumulation中的数据读完了,那么把字节累积器给释放了
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
//如果连续读取的次数大于等于discardAfterReads(默认是16),那么执行discardSomeReadBytes
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
//解析到的消息数量
int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
//使用解析到的消息触发channelRead事件
fireChannelRead(ctx, out, size);
//回收CodecOutputList对象
out.recycle();
}
} else {
//如果msg不是ByteBuf类型,把数据交给pipeline链上的下一个handler处理
ctx.fireChannelRead(msg);
}
}
我分析下读取到的ByteBuf中的字节是如何被添加到字节累积器cumulation中的
//cumulation是一个ByteBuf,alloc用来分配它所需要的空间的,
//in 本次读取到的ByteBuf,读取到的数据需要添加到cumulation中
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (!cumulation.isReadable() && in.isContiguous()) {
// If cumulation is empty and input buffer is contiguous, use it directly
//如果cumulation是初始默认的EMPTY_BUFFER,那么直接返回in作为cumulation,同时把EMPTY_BUFFER释放
cumulation.release();
return in;
}
try {
//获取本次网络IO读取到消息的大小
final int required = in.readableBytes();
if (required > cumulation.maxWritableBytes() ||
(required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
cumulation.isReadOnly()) {
// Expand cumulation (by replacing it) under the following conditions:
// - cumulation cannot be resized to accommodate the additional data
// - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
//如果cumulation中的可用空间小于required的大小,那么需要给cumulation扩容
//根据cumulation目前存储的数据和新来数据的总和去向alloc申请一个新的ByteBuf,
//cumulation中的数据会被搬移到新申请的ByteBuf中,然后把老的cumulation释放掉,新申请的ByteBuf会被赋值给cumulation
return expandCumulation(alloc, cumulation, in);
}
//把新读取到的数据写入到cumulation中
cumulation.writeBytes(in, in.readerIndex(), required);
in.readerIndex(in.writerIndex());
return cumulation;
} finally {
// We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
//释放in
in.release();
}
}
}
上面介绍了最新读取到的ByteBuf是如何被添加到字节累积器中的,那么如何从字节累积器中解码出消息呢?
callDecode是解码的入口方法
callDecode
// in 就是字节累积器,out用来存放解析出来的消息
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
//一直不断的从字节累积器中解码消息,直到字节累积器中没有字节可读了
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
//如果解码出了消息那么触发channelRead事件
fireChannelRead(ctx, out, outSize);
//触发完ChannelRead事件后,清空消息容器
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
//如果本解码器对应的context从pipeline上删除了,那么解码停止
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
//获取字节累积器中已经存放的数据大小
int oldInputLength = in.readableBytes();
//解码器解码方法decode在这个地方中调用
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
//字节累积器存储的字节在上次解码后没有任何字节被解码那么退出解码过程
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
//代码执行到这里说明从字节累积器中解析出了消息,但是字节累积器中没有任何数据别解码所以报错
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
继续看调用链上的decodeRemovalReentryProtection方法
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
//初始化解码状态为STATE_CALLING_CHILD_DECODE表示正在解码
decodeState = STATE_CALLING_CHILD_DECODE;
try {
//调用解码实现方法decode
decode(ctx, in, out);
} finally {
//如果在解码进行中的时候,解码handler对应的context在pipeline上被删除了,那么decodeState在handlerRemoved方法中被设置为STATE_HANDLER_REMOVED_PENDING
//那么在这种情况下removePending就会是true
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
//如果removePending为true,那么触发ChannelRead事件
fireChannelRead(ctx, out, out.size());
out.clear();
handlerRemoved(ctx);
}
}
}
进入解码的核心方法decode,我们以LengthFieldBasedFrameDecoder.decode来分析
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//通过decode方法解析出消息
Object decoded = decode(ctx, in);
if (decoded != null) {
//如果消息不是null,添加到消息容器中
out.add(decoded);
}
}
//解码具体实现方法
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
//对于LengthFieldBasedFrameDecoder来说我们会设置单个消息的最大长度
//如果从标识消息体长度的字段中解析出的真实长度值大于设置的最大长度,那么需要丢弃这个消息
if (discardingTooLongFrame) {
discardingTooLongFrame(in);
}
//如果字节累积器中存储的字节长度小于设置的消息体大小标识字段占用的字节长度,那么直接返回null
if (in.readableBytes() < lengthFieldEndOffset) {
return null;
}
//我们在LengthFieldBasedFrameDecoder还可以设置一个lengthFieldOffset参数用来表示消息大小标识字段在整个消息中的偏移量
//算出消息大小标识字段在字节累积器中的开始位置
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
//lengthFieldLength 表示消息大小标识字段占用多少字节
//getUnadjustedFrameLength获取消息体的长度
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
//lengthFieldEndOffset = lengthFieldOffset+lengthFieldLength
//lengthAdjustment是调整这个消息长度的一个值,不同的消息转换成字节数组的时候有不同的策略,
//有的会把头部转化成字节数据发送给客户端,有一些则不会,但是消息长度字段中有的会把头部信息的长度算在总长度中,有些又不会
//那么这个时候就需要开发者根据实际的情况通过设置lengthAdjustment来调和这种情况
//根据lengthAdjustment和lengthFieldEndOffset计算出frameLength的大小
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
}
//如果frameLength大于maxFrameLength,需要丢弃这个消息
if (frameLength > maxFrameLength) {
exceededFrameLength(in, frameLength);
return null;
}
// never overflows because it's less than maxFrameLength
int frameLengthInt = (int) frameLength;
//如果字节累加器中的数据小于frameLengthInt那么这次不做消息解码,因为这个时候字节累积器中存储的数据不足以解码出一条消息
if (in.readableBytes() < frameLengthInt) {
return null;
}
//initialBytesToStrip是开发者设置的在读取一个消息之前,先跳过字节累加器中的前initialBytesToStrip个字节,这个也是和消息转换成字节的不同方式有关
if (initialBytesToStrip > frameLengthInt) {
failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
}
in.skipBytes(initialBytesToStrip);
// extract frame
int readerIndex = in.readerIndex();
//计算出消息体真正的长度
int actualFrameLength = frameLengthInt - initialBytesToStrip;
//从字节累积器中解码出一个消息
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
//更新字节累积器下次读取数据的开始位置
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
上面就是LengthFieldBasedFrameDecoder解码的过程。
当LengthFieldBasedFrameDecoder读取的消息长度大于设置的maxLength,LengthFieldBasedFrameDecoder会把这个消息丢弃,我解析下这个过程的细节
exceededFrameLength
private void exceededFrameLength(ByteBuf in, long frameLength) {
//discard等于需要丢弃的数据量的减去现在字节累积器中存的数据量。
long discard = frameLength - in.readableBytes();
//tooLongFrameLength设置为frameLength,等下解析discardingTooLongFrame方法会用到这个参数
tooLongFrameLength = frameLength;
if (discard < 0) {
//如果discard <0表示字节累积器中包含了所有需要丢弃的长度为frameLength的数据,直接通过ByteBuf.skip去丢弃这些数据
// buffer contains more bytes then the frameLength so we can discard all now
in.skipBytes((int) frameLength);
} else {
// Enter the discard mode and discard everything received so far.
//若discard大于0表示字节累积器中存的数据只是丢弃数据的一部分
//还有一部分需要丢弃的数据可能还在网络上传输,字节累积器还没收集到
//设置discardingTooLongFrame为true,那么下次执行decode方法的时候就会根据这个属性判断是不是需要继续从字节累积器中丢弃数据
discardingTooLongFrame = true;
//bytesToDiscard记录下次decode执行的时候需要继续从字节累积器中丢弃多少字节数据
bytesToDiscard = discard;
//字节累积器把自己目前存的数据全部丢弃
in.skipBytes(in.readableBytes());
}
failIfNecessary(true);
}
decode方法第一行代码就是判断discardingTooLongFrame==true,通过上面exceededFrameLength方法的分析我们知道在开始执行解码前我们可能需要从字节累积器中先丢弃一些数据
discardingTooLongFrame
private void discardingTooLongFrame(ByteBuf in) {
long bytesToDiscard = this.bytesToDiscard;
//获得本次从字节累积器中需要丢弃的字节数
int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
//继续丢弃localBytesToDiscard个字节的数据
in.skipBytes(localBytesToDiscard);
bytesToDiscard -= localBytesToDiscard;
//更新bytesToDiscard
this.bytesToDiscard = bytesToDiscard;
failIfNecessary(false);
}
继续分析failIfNecessary方法
private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
if (bytesToDiscard == 0) {
//bytesToDiscard等于0表示需要丢弃的数据都已经被丢弃了
// Reset to the initial state and tell the handlers that
// the frame was too large.
long tooLongFrameLength = this.tooLongFrameLength;
//重置tooLongFrameLength和discardingTooLongFrame状态
this.tooLongFrameLength = 0;
discardingTooLongFrame = false;
if (!failFast || firstDetectionOfTooLongFrame) {
fail(tooLongFrameLength);
}
} else {
// Keep discarding and notify handlers if necessary.
if (failFast && firstDetectionOfTooLongFrame) {
//若满足条件抛出异常,会触发handler的exceptionCaught方法
fail(tooLongFrameLength);
}
}
}
网友评论