1、 ReplayingDecoder
- ReplayingDecoder S 是指一个枚举,如果不需要指定Void即可
- ReplayingDecoder 不需要判断(ByteBuf)中的数量是否足够
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {
![](https://img.haomeiwen.com/i14586304/e7bbb01e7d712b49.png)
ReplayingDecoder
- 可见ByteToMessageDecoder的子类。类定义中的泛型 S 是一个用于记录解码状态的状态机枚举类,在state(S s)、checkpoint(S s)等方法中会用到。在简单解码时也可以用java.lang.Void来占位。
- 与ByteToMessageDecoder不同,该类可以在接收到所需要长度的字节之后再调用
decode
方法,而不用一遍又一遍的手动检查流中的字节长度
- 从源码上看ReplayingDecoder重写了ByteToMessageDecoder的
callDecode()
方法
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
replayable.setCumulation(in);
try {
while (in.isReadable()) {
int oldReaderIndex = checkpoint = in.readerIndex();
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();//清除
//在继续解码之前,检查这个处理程序是否已被删除。
//如果它被移除,继续在缓冲区上操作是不安全的。
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
S oldState = state;
int oldInputLength = in.readableBytes();
try {
//解码移除再入保护
decodeRemovalReentryProtection(ctx, replayable, out);
//在继续循环之前,检查是否删除了这个处理程序。
//如果它被移除,继续在缓冲区上操作是不安全的。
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes() && oldState == state) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) + ".decode() must consume the inbound " +
"data or change its state if it did not decode anything.");
} else {
//以前的数据已被丢弃或导致状态转换。
//也许它还在继续读。
continue;
}
}
} catch (Signal replay) {
replay.expect(REPLAY);
//在继续循环之前,检查是否删除了此处理程序。
//如果它被移除,继续在缓冲区上操作是不安全的。
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
//返回到这个检查点(或是旧的位置)和重试
int checkpoint = this.checkpoint;
if (checkpoint >= 0) {
in.readerIndex(checkpoint);
} else {
}
break;
}
if (oldReaderIndex == in.readerIndex() && oldState == state) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) + ".decode() method must consume the inbound data " +
"or change its state if it decoded something.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
2、ReplayingDecoder如何工作?
- ReplayingDecoder传递一个专门的ByteBuf实现,当缓冲区中没有足够的数据时,这个实现会抛出某种类型的错误。在上面的IntegerHeaderFrameDecoder中,您只是假设在调用
buf.readInt()
时,缓冲区中有4个或更多字节。如果缓冲区中确实有4个字节,它将像您期望的那样返回整数报头。否则,将引发错误并将控制返回到ReplayingDecoder。如果ReplayingDecoder捕捉到错误,那么它会将缓冲区的readerIndex倒回“初始”位置(即缓冲区的开始),并在缓冲区接收到更多数据时再次调用decode(..)
方法。
- 请注意,ReplayingDecoder总是抛出相同的缓存错误实例,以避免每次抛出时创建新错误并填充其堆栈跟踪的开销。
3、ReplayingDecoder如何提高性能
- 幸运的是,使用
checkpoint()
方法可以显著提高复杂解码器实现的性能。checkpoint()
方法更新缓冲区的“初始”位置,以便重新播放解码器将缓冲区的readerIndex
回滚到调用checkpoint点()
方法的最后位置。
4、ReplayingDecoder使用枚举Enum调用 checkpoint(T)
- 即使您可以只使用
checkpoint()
方法并自己管理解码器的状态,但是管理解码器状态的最简单方法是创建一个表示解码器当前状态的Enum类型,并在状态发生变化时调用checkpoint(T)
方法。根据要解码的消息的复杂性,可以有任意多个状态:
public enum MyDecoderState {
READ_LENGTH,
READ_CONTENT;
}
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> {
private int length;
public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
switch (state()) {
case READ_LENGTH:
length = buf.readInt();
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
out.add(frame);
break;
default:
throw new Error("Shouldn't reach here.");
}
}
}
5、ReplayingDecoder调用没有参数的checkpoint()方法
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {}
6、 ReplayingDecoder用管道中的另一个解码器替换一个解码器
- 如果您要编写一个协议多路复用器,您可能需要用另一个重放解码器(ByteToMessageDecoder或MessageToMessageDecoder,实际的协议解码器)替换ReplayingDecoder(协议检测器)。仅仅通过调用ChannelPipeline是不可能实现这一点的。替换(ChannelHandler, String, ChannelHandler),但需要一些额外的步骤:
public class FirstDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
...
// Decode the first message
Object firstMessage = ...;
// Add the second decoder
ctx.pipeline().addLast("second", new SecondDecoder());
if (buf.isReadable()) {
// Hand off the remaining data to the second decoder
out.add(firstMessage);
out.add(buf.readBytes(super.actualReadableBytes()));
} else {
// Nothing to hand off
out.add(firstMessage);
}
// Remove the first decoder (me)
ctx.pipeline().remove(this);
}
}
7、开发过程中编写解码器与编码器的建议(注意点)
- 建议要么解码器,要么编码器,遵循单一原则,不会那么困扰,方便开发与维护
网友评论