在上一篇中介绍到关于jupiter的底层通信模块transport的部分实现。仅仅只是讨论了整个server的初始化以及启动流程。很多细枝末节其实还没有涉及到,例如tcp参数设置,超时机制,编解码器等等实现细节。这些东西将会逐步被消化分解。而本文的主题是jupiter的业务编解码器的实现。
在jupiter中有一张图,这张图清晰地描述了整个server的数据流向:
* *********************************************************************
* I/O Request I/O Response
* │ △
* │
* │
* ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ─ ─ ─
* │ │ │
* │
* │ ┌ ─ ─ ─ ─ ─ ─▽─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │
* IdleStateChecker#inBound IdleStateChecker#outBound
* │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ └ ─ ─ ─ ─ ─ ─△─ ─ ─ ─ ─ ─ ┘ │
* │ │
* │ │
* │ │
* │ ┌ ─ ─ ─ ─ ─ ─▽─ ─ ─ ─ ─ ─ ┐ │
* AcceptorIdleStateTrigger │
* │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │
* │ │
* │ │
* │ │
* │ ┌ ─ ─ ─ ─ ─ ─▽─ ─ ─ ─ ─ ─ ┐ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │
* ProtocolDecoder ProtocolEncoder
* │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ └ ─ ─ ─ ─ ─ ─△─ ─ ─ ─ ─ ─ ┘ │
* │ │
* │ │
* │ │
* │ ┌ ─ ─ ─ ─ ─ ─▽─ ─ ─ ─ ─ ─ ┐ │
* AcceptorHandler │
* │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │
* │ │
* │ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ │
* ▽ │
* │ ─ ─ ▷│ Processor ├ ─ ─▷ │
*
* │ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ │
* ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
*
实际上这个图仅仅是针对Netty api的描述,和rpc没有太多直接的关系。如果换做别的通信框架这个图就没有什么意义了。鉴于Netty依旧是目前Java网络开发中最流行的框架,拿出来讨论也是很有意义的。
这个图中有几个核心的概念:decode、encode、IdleStateChecker和handler。在编写一个网络应用的时候首先必须定义的是通信协议。比如上传下载文件使用ftp,即时聊天使用xmpp,浏览网页使用http...当然自己写一个rpc框架也得定义自己的通信协议。为什么要定义协议这个问题对于开发的老手来说没有必要去解释,不过我还是得啰嗦一下。在网络应用程序中,所有的数据在网络上的传输都是二进制格式,也就是0101010这样的机器码。当然也有人会反驳:不是有基于字符的数据吗?我的理解是在宏观上来说确实存在,那是因为上层的协议已经将代表字符串的01010解码成了可读的字符串。但是在底层的传输,也就是在物理层比如网线、电磁波中的传输依旧还是0101。当然用0101来比喻也是不恰当,更准确的说法应该是电平信号。而协议的作用是将通信双方的内容进行规范。就像以前写书信的时候,开头得有称谓,然后写正文,最后是落款这样一种格式。别人收到后就知道,嗯,这是一封信而不是一篇散文或者自传。当然这个比喻不是很恰当,但是就这样吧,不想废话了。
既然定义了协议,那么就得去处理协议。Netty中提供了很多内置的协议解析的类。通常被称作编解码器。顾名思义也就是将二进制转化为我们想要的数据结构,这样方便统一处理。这张图中的具体体现就是先解码,然后再处理,最后将处理结果编码后发送出去。编解码器将是本文讨论的重点。然而还有别的几个组件如空闲链路检查IdleStateChecker
和业务处理AcceptorHandler
组件,这些放在后面讨论。
boot.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new IdleStateChecker(timer, JConstants.READER_IDLE_TIME_SECONDS, 0, 0),
idleStateTrigger,
CodecConfig.isCodecLowCopy() ? new LowCopyProtocolDecoder() : new ProtocolDecoder(),
encoder,
handler);
}
});
这段代码就是初始化编解码器等组件。可以看到handler作为业务核心处理器放在了最后。这是Netty的机制所决定的。关于Netty的一些设计可以看看这本书作为基础入门。
看到解码器和编码器还有点不一样。解码器是直接new的方式添加到pipeline中的,而编码器是new好了再添加到pipeline中。其实具体差别就在于解码器不能共享。
public class ProtocolDecoder extends ReplayingDecoder<ProtocolDecoder.State> {
// ...
}
@ChannelHandler.Sharable
public class ProtocolEncoder extends MessageToByteEncoder<PayloadHolder> {
// ...
}
所谓的能共享,就是意味着每次都能使用这个编码器而不需要每次用就得实例化出来。而解码器不能被共享也是有原因的。因为解码器解码的数据全部来自于网络请求,网络通常而言是不可靠的,不能保证每次都能发送完整的数据包也有可能需要的数据还没收到。那么如何保证接受到的数据是完整的呢?实际上是没法保证,只能够“假装”是完整的。所以decoder是继承自ReplayingDecoder
。这个类的作用简单通俗来理解就是如果网络上的数据还没发完,我就继续接收,直到收完为止。具体的工作原理可以参考api文档或者源码,这里不再过多探讨。为什么继承这个类后就不能共享呢?这个类是个泛型,参数是一个state的字眼。看到这里大概可以联想到这个类肯定和状态有关系。这样就很容易理解了,有状态的对象通常不能被共享。想象一下,在多线程环境下,线程A将这个状态改为1线程B这时候拿到执行权又把这个状态改为2,然后线程A又要使用这个状态变量了,这时候就不是他刚开始改变的状态了。如此一来,全部乱套了。这些都是并发基础相关的内容,有兴趣可以去谷歌。这里不再过多描述。而encode则没有状态变量,需要编码的数据一定是确定的,不存在解码器中数据不完整的情况。因此使用共享实例没什么问题。具体可以看看其中的代码实现:ProtocolDecoder ProtocolEncoder
解释完这些鸡毛蒜皮的细节,接下来开始分析一下这个协议的定义以及解析。
* **************************************************************************************************
* Protocol
* ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
* 2 │ 1 │ 1 │ 8 │ 4 │
* ├ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┤
* │ │ │ │ │
* │ MAGIC Sign Status Invoke Id Body Size Body Content │
* │ │ │ │ │
* └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
*
* 消息头16个字节定长
* = 2 // magic = (short) 0xbabe
* + 1 // 消息标志位, 低地址4位用来表示消息类型request/response/heartbeat等, 高地址4位用来表示序列化类型
* + 1 // 状态位, 设置请求响应状态
* + 8 // 消息 id, long 类型, 未来jupiter可能将id限制在48位, 留出高地址的16位作为扩展字段
* + 4 // 消息体 body 长度, int 类型
*
作者在代码中将协议格式完全标注出来了:16字节的消息头+消息体。消息头中的MAGIC字段仅仅是为了标识这个数据包是属于jupiter。就像java class文件以0xCAFEBABE
开头一样。没有实际的意义,仅仅做个标识而已。其他的字段也就没什么可说的了。
private static final boolean USE_COMPOSITE_BUF = SystemPropertyUtil.getBoolean("jupiter.io.decoder.composite.buf", false);
public ProtocolDecoder() {
super(State.MAGIC);
if (USE_COMPOSITE_BUF) {
setCumulator(COMPOSITE_CUMULATOR);
}
}
enum State {
MAGIC,
SIGN,
STATUS,
ID,
BODY_SIZE,
BODY
}
解码器的构造函数中直接调用了父类的构造函数,将枚举类型State
传入。这个枚举类型所代表的就是要解析协议数据中的位置(下标)。因此构造器中传入的当然是协议的第一个字段,也就表示从第一个位置开始解析。然后有一个布尔标识,这个变量从系统变量中获取,用来决定是否使用COMPOSITE_CUMULATOR
。默认的CUMULATOR是MERGE_CUMULATOR
.区别可能在性能上有点差距吧。具体差异得去研究源码。
private final JProtocolHeader header = new JProtocolHeader();
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) {
case MAGIC:
checkMagic(in.readShort()); // MAGIC
checkpoint(State.SIGN);
case SIGN:
header.sign(in.readByte()); // 消息标志位
checkpoint(State.STATUS);
case STATUS:
header.status(in.readByte()); // 状态位
checkpoint(State.ID);
case ID:
header.id(in.readLong()); // 消息id
checkpoint(State.BODY_SIZE);
case BODY_SIZE:
header.bodySize(in.readInt()); // 消息体长度
checkpoint(State.BODY);
case BODY:
switch (header.messageCode()) {
case JProtocolHeader.HEARTBEAT:
break;
case JProtocolHeader.REQUEST: {
int length = checkBodySize(header.bodySize());
byte[] bytes = new byte[length];
in.readBytes(bytes);
JRequestPayload request = new JRequestPayload(header.id());
request.timestamp(SystemClock.millisClock().now());
request.bytes(header.serializerCode(), bytes);
out.add(request);
break;
}
case JProtocolHeader.RESPONSE: {
int length = checkBodySize(header.bodySize());
byte[] bytes = new byte[length];
in.readBytes(bytes);
JResponsePayload response = new JResponsePayload(header.id());
response.status(header.status());
response.bytes(header.serializerCode(), bytes);
out.add(response);
break;
}
default:
throw IoSignals.ILLEGAL_SIGN;
}
checkpoint(State.MAGIC);
}
}
正真的解析逻辑全部在decode
方法中。这里的每个case并没有break,道理很简单,每解析一段数据后得接着继续往下解析,如果break掉了后面的数据不全都放弃解析了吗?每当解析到一个位置,都将这个位置上读取的数据放到header
变量中,同时将接下来需要解析的位置记录下来(通过checkpoint(...)
方法)。之所以要记录下来,万一某个位置解析出错,下次就不用从头再来了,直接上次出错的位置接着来就行了,也是为了提高性能。然而解析到body部分的时候,header里面内容都已经全部填充好了。接下来就是根据消息类型来处理body里的内容。如果是心跳包,那么什么都不做直接返回,因为body里肯定是没有数据的。如果是请求包(REQUEST
类型),先从header里读取这个body到底有多长,然后再去读这么长的数据,最后通过JRequestPayload
对象将这个body数据封装起来,同时将消息id和序列化类型code也封装进去了。响应包类型逻辑与之类似。整个解码器解析逻辑就完成了。
与之对应的就是编码器了,编码器逻辑更简单。
@Override
protected void encode(ChannelHandlerContext ctx, PayloadHolder msg, ByteBuf out) throws Exception {
if (msg instanceof JRequestPayload) {
doEncodeRequest((JRequestPayload) msg, out);
} else if (msg instanceof JResponsePayload) {
doEncodeResponse((JResponsePayload) msg, out);
} else {
throw new IllegalArgumentException(Reflects.simpleClassName(msg));
}
}
private void doEncodeRequest(JRequestPayload request, ByteBuf out) {
byte sign = JProtocolHeader.toSign(request.serializerCode(), JProtocolHeader.REQUEST);
long invokeId = request.invokeId();
byte[] bytes = request.bytes();
int length = bytes.length;
out.writeShort(JProtocolHeader.MAGIC)
.writeByte(sign)
.writeByte(0x00)
.writeLong(invokeId)
.writeInt(length)
.writeBytes(bytes);
}
private void doEncodeResponse(JResponsePayload response, ByteBuf out) {
byte sign = JProtocolHeader.toSign(response.serializerCode(), JProtocolHeader.RESPONSE);
byte status = response.status();
long invokeId = response.id();
byte[] bytes = response.bytes();
int length = bytes.length;
out.writeShort(JProtocolHeader.MAGIC)
.writeByte(sign)
.writeByte(status)
.writeLong(invokeId)
.writeInt(length)
.writeBytes(bytes);
}
和解码器相反,编码器就是将对象中的数据按协议中的定义挨个写到buf中。
以上,jupiter中的消息编解码器的实现就全部整理完了。当然还有空闲链路检测部分,因此我决定暂时放弃这部分,太复杂了,等所有逻辑整理完毕后再接着填坑。
网友评论