下面来分析Netty的解码器ByteToMessageDecoder,同样,ByteToMessageDecoder也是继承自ChannelInboundHandlerAdapter,内部有2个累加器:MERGE_CUMULATOR和COMPOSITE_CUMULATOR,MERGE_CUMULATOR是基于内存的复制,COMPOSITE_CUMULATOR则是维护了一个复杂的索引,可以减少内存的复制。
MERGE_CUMULATOR
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() // 超过空间大小,需要扩容
|| cumulation.refCnt() > 1 // 引用大于 1 ,说明用户使用了 slice().retain() 或 duplicate().retain() 使refCnt增加并且大于 1 ,
// 此时扩容返回一个新的累积区ByteBuf,方便用户对老的累积区ByteBuf进行后续处理。
|| cumulation.isReadOnly()) { // 只读,不可累加,所以需要改成可写
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain() or if its read-only.
// 扩容,返回新的 buffer
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
// 使用老的 buffer
buffer = cumulation;
}
// 写入 in 到 buffer 中
buffer.writeBytes(in);
// 释放输入 in
in.release();
// 返回 buffer
return buffer;
}
};
COMPOSITE_CUMULATOR
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
// 和 MERGE_CUMULATOR 的情况类似
if (cumulation.refCnt() > 1) {
// Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user
// use slice().retain() or duplicate().retain().
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
buffer.writeBytes(in);
in.release();
} else {
CompositeByteBuf composite;
// 原来是 CompositeByteBuf 类型,直接使用
if (cumulation instanceof CompositeByteBuf) {
composite = (CompositeByteBuf) cumulation;
// 原来不是 CompositeByteBuf 类型,创建,并添加到其中
} else {
composite = alloc.compositeBuffer(Integer.MAX_VALUE);
composite.addComponent(true, cumulation);
}
// 添加 in 到 composite 中
composite.addComponent(true, in);
// 赋值给 buffer
buffer = composite;
}
// 返回 buffer
return buffer;
}
};
然后在channelRead方法里进行解码
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
// 创建 CodecOutputList 对象
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
// 判断是否首次
first = cumulation == null;
// 若首次,直接使用读取的 data
if (first) {
cumulation = data;
// 若非首次,将读取的 data ,累积到 cumulation 中
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 执行解码
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e; // 抛出异常
} catch (Exception e) {
throw new DecoderException(e); // 封装成 DecoderException 异常,抛出
} finally {
// cumulation 中所有数据被读取完,直接释放全部
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0; // 重置 numReads 次数
cumulation.release(); // 释放 cumulation
cumulation = null; // 置空 cumulation
// 读取次数到达 discardAfterReads 上限,释放部分的已读
} else if (++ numReads >= discardAfterReads) {
numReads = 0; // 重置 numReads 次数
discardSomeReadBytes(); // 释放部分的已读
}
// 解码消息的数量
int size = out.size();
// 是否解码到消息
decodeWasNull = !out.insertSinceRecycled();
// 触发 Channel Read 事件。可能是多条消息
fireChannelRead(ctx, out, size);
// 回收 CodecOutputList 对象
out.recycle();
}
} else {
// 触发 Channel Read 事件
ctx.fireChannelRead(msg);
}
}
执行解码业务逻辑
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
// 循环读取,直到不可读
while (in.isReadable()) {
// 记录
int outSize = out.size();
// out 非空,说明上一次解码有解码到消息
if (outSize > 0) {
// 触发 Channel Read 事件。可能是多条消息
fireChannelRead(ctx, out, outSize);
// 清空
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
// 记录当前可读字节数
int oldInputLength = in.readableBytes();
// 执行解码。如果 Handler 准备移除,在解码完成后,进行移除。
decodeRemovalReentryProtection(ctx, in, out);
// 用户主动删除该 Handler ,继续操作 in 是不安全的
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
if (ctx.isRemoved()) {
break;
}
// 整列判断 `out.size() == 0` 比较合适。因为,如果 `outSize > 0` 那段,已经清理了 out 。
if (outSize == out.size()) {
// 如果未读取任何字节,结束循环
if (oldInputLength == in.readableBytes()) {
break;
// 如果可读字节发生变化,继续读取
} else {
continue;
}
}
// 如果解码了消息,但是可读字节数未变,抛出 DecoderException 异常。说明,有问题。
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message.");
}
// 如果开启 singleDecode ,表示只解析一次,结束循环
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
ByteToMessageDecoder的分析就到这里了。
网友评论