本系列主要参考官网文档、芋道源码的源码解读和《深入理解Apache Dubbo与实战》一书。Dubbo版本为2.6.1。
本篇用以分析心跳、编码、解码相关的代码。
文章内容顺序:
1.心跳
1.1 为什么要有心跳?
1.2 HeaderExchangeClient
1.3 HeaderExchangeClient#startHeatbeatTimer
1.4 HeartBeatTask#run
2.编码
2.1 编码的链路:
2.2 消息头的字段的意义
2.3 ExchangeCodec
2.4 序列化的多种实现
3.解码
3.1 InternalDecoder#messageReceived
3.2 DubboCountCodec#decode
3.3 DubboCodec.decode(channel,buffer)
3.4 DubboCodec#decodeBody
3.5 DecodeableRpcInvocation#decode
3.6 解码的方法调用顺序
首先是心跳相关
1.心跳
在上一篇服务调用的消费端中,我们介绍到HeaderExchangeClient
的构造方法中会有心跳的一些逻辑,在那边一笔带过了,在这篇文章来详细看看。
1.1为什么要有心跳?
心跳间隔,对于长连接,当物理层断开时,比如拔网线,TCP的FIN消息来不及发送,对方收不到断开事件,此时需要心跳来帮助检查连接是否已断开
这里仍旧贴一下HeaderExchangeClient的构造方法
1.2HeaderExchangeClient
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
// 创建 HeaderExchangeChannel 对象
this.channel = new HeaderExchangeChannel(client);
// 以下代码均与心跳检测逻辑有关
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
if (needHeartbeat) {
// 开启心跳检测定时器
startHeartbeatTimer();
}
}
此方法最后调用 #startHeatbeatTimer()
方法,发起心跳定时器。直接来看startHeatbeatTimer()
方法的实现吧
1.3HeaderExchangeClient#startHeatbeatTimer
private void startHeatbeatTimer() {
// 停止原有定时任务
stopHeartbeatTimer();
// 发起新的定时任务
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
可以看到这边就是直接调用线程池进行了一个定时任务HeartBeatTask
对象,scheduleWithFixedDelay
方法意为当当前任务执行完毕后再隔多少秒进行下一个任务。
接下来我们来看一下HeartBeatTask的实现。
1.4HeartBeatTask#run
public void run() {
try {
long now = System.currentTimeMillis();
for (Channel channel : channelProvider.getChannels()) {
if (channel.isClosed()) {
continue;
}
try {
Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
// 最后读写的时间,任一超过心跳间隔,发送心跳
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true); // 需要响应
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
}
}
// 最后读的时间,超过心跳超时时间
if (lastRead != null && now - lastRead > heartbeatTimeout) {
logger.warn("Close channel " + channel
+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
// 客户端侧,重新连接服务端
if (channel instanceof Client) {
try {
((Client) channel).reconnect();
} catch (Exception e) {
//do nothing
}
// 服务端侧,关闭客户端连接
} else {
channel.close();
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
} catch (Throwable t) {
logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
}
}
- 【任务一】:最后读或写的时间,任一超过心跳间隔 heartbeat ,发送心跳。
- 【任务二】:最后读的时间,超过心跳超时时间 heartbeatTimeout ,分成两种情况:
客户端侧,重连连接服务端。
服务端侧,关闭客户端连接。
至此,心跳的分析告一段落。
接下来来看看编码的操作。
2.编码
2.1编码的链路

看上面这张图,当运行到NettyChannel#send
的这一行后,就会跳进Netty
的执行逻辑,最后由NettyCodecAdapter
的内部类调用编码类执行编码操作

上图中继承的
OneToOneEncoder
是Netty
的抽象方法,那这个InternalEncoder
是什么时候传进来的呢?
在NettyClient#doOpen()
方法中有如下代码,会在编码器、解码器等一并设置进来,从而使得最后编码、解码逻辑能交由Dubbo
自己的类实现。

再贴一张编码的链路。
NettyChannel#send
->一系列Netty内部的方法
->NettyCodecAdapter内部类(继承了netty抽象类)#encode
->ExchangeCodec#encode
->ExchangeCodec#encodeRequest

可以看到最后是交由
ExchangeCodec
来执行编码的逻辑了。那么这个链路就简单介绍到这,直接来看他是怎么实现的编码吧。
2.2消息头的字段的意义
先简单列举一下消息头的内容,其中的魔数是用来分割处理粘包问题的。

接下来就直接来看看
ExchangeCodec
的实现
2.3 ExchangeCodec
public class ExchangeCodec extends TelnetCodec {
// 消息头长度
protected static final int HEADER_LENGTH = 16;
// 魔数内容
protected static final short MAGIC = (short) 0xdabb;
protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0];
protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1];
protected static final byte FLAG_REQUEST = (byte) 0x80;
protected static final byte FLAG_TWOWAY = (byte) 0x40;
protected static final byte FLAG_EVENT = (byte) 0x20;
protected static final int SERIALIZATION_MASK = 0x1f;
private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class);
public Short getMagicCode() {
return MAGIC;
}
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
// 对 Request 对象进行编码
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
// 对 Response 对象进行编码,后面分析
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// 创建消息头字节数组,长度为 16
byte[] header = new byte[HEADER_LENGTH];
// 设置魔数
Bytes.short2bytes(MAGIC, header);
// 设置数据包类型(Request/Response)和序列化器编号
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
// 设置通信方式(单向/双向)
if (req.isTwoWay()) {
header[2] |= FLAG_TWOWAY;
}
// 设置事件标识
if (req.isEvent()) {
header[2] |= FLAG_EVENT;
}
// 设置请求编号,8个字节,从第4个字节开始设置
Bytes.long2bytes(req.getId(), header, 4);
// 获取 buffer 当前的写位置
int savedWriteIndex = buffer.writerIndex();
// 更新 writerIndex,为消息头预留 16 个字节的空间
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
// 创建序列化器,比如 Hessian2ObjectOutput
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
// 对事件数据进行序列化操作
encodeEventData(channel, out, req.getData());
} else {
// 对请求数据进行序列化操作
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
// 获取写入的字节数,也就是消息体长度
int len = bos.writtenBytes();
checkPayload(channel, len);
// 将消息体长度写入到消息头中
Bytes.int2bytes(len, header, 12);
// 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
buffer.writerIndex(savedWriteIndex);
// 从 savedWriteIndex 下标处写入消息头
buffer.writeBytes(header);
// 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
// 省略其他方法
}
以上就是请求对象的编码过程,该过程首先会通过位运算将消息头写入到 header
数组中。然后对 Request
对象的 data
字段执行序列化操作,序列化后的数据最终会存储到ChannelBuffer
中。序列化操作执行完后,可得到数据序列化后的长度 len
,紧接着将 len
写入到 header
指定位置处。最后再将消息头字节数组 header
写入到 ChannelBuffer
中,整个编码过程就结束了。
这里我们可以再来关注一下encodeEventData
方法(encodeRequestData
也是一样的实现。)
2.4 序列化的多种实现
image.png
通过一系列重载方法,我们可以看到最后调用了out.writeObeject
而这个out
,则是在encodeRequest
方法中通过url传过来的参数设置的,有多种不同的实现。
image.png
image.png
有关序列化的协议,可以简单参照下这篇博文: Dubbo协议及序列化
说完了编码,再来说说解码。
同样的,还是从解码的链路开始说起
3.解码
3.1 InternalDecoder#messageReceived
NettyCodecAdapter
的内部类InternalDecoder#messageReceived
方法
同样的,这个类也与编码的类一样,都是通过netty的pipeline
设置进来的,上文已经介绍过了。

调用了DubboCountCodec#decode
方法
3.2 DubboCountCodec#decode
public final class DubboCountCodec implements Codec2 {
private DubboCodec codec = new DubboCodec();
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
// 记录当前读位置
int save = buffer.readerIndex();
// 创建 MultiMessage 对象
MultiMessage result = MultiMessage.create();
do {
// 解码
Object obj = codec.decode(channel, buffer);
// 输入不够,重置读进度
if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
buffer.readerIndex(save);
break;
// 解析到消息
} else {
// 添加结果消息
result.addMessage(obj);
// 记录消息长度到隐式参数集合,用于 MonitorFilter 监控
logMessageLength(obj, buffer.readerIndex() - save);
// 记录当前读位置
save = buffer.readerIndex();
}
} while (true);
// 需要更多的输入
if (result.isEmpty()) {
return Codec2.DecodeResult.NEED_MORE_INPUT;
}
// 返回解析到的消息
if (result.size() == 1) {
return result.get(0);
}
return result;
}
//省略其他代码
}
这边的codec
指的就是DubboCodec
。调用的是DubboCodec.decode(channel,buffer)
3.3 DubboCodec.decode(channel,buffer)
public class ExchangeCodec extends TelnetCodec {
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
// 读取 Header 数组
int readable = buffer.readableBytes();
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
buffer.readBytes(header);
// 解码
return decode(channel, buffer, readable, header);
}
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// 非 Dubbo 协议,目前是 Telnet 命令。
// check magic number.
if (readable > 0 && header[0] != MAGIC_HIGH || readable > 1 && header[1] != MAGIC_LOW) {
// 将 buffer 完全复制到 `header` 数组中。因为,上面的 `#decode(channel, buffer)` 方法,可能未读全
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
// 【TODO 8026 】header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW ?
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
// 提交给父类( Telnet ) 处理,目前是 Telnet 命令。
return super. decode(channel, buffer, readable, header);
}
// Header 长度不够,返回需要更多的输入
// check length.
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// `[96 - 127]`:Body 的**长度**。通过该长度,读取 Body 。
// get data length.
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
// 总长度不够,返回需要更多的输入
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// 解析 Header + Body
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
//子类重写的方法
return decodeBody(channel, is, header);
} finally {
// skip 未读完的流,并打印错误日志
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
//省略其他代码
}
这边对于再提一下,实际上这个方法是DubboCodec
里的方法,但是ExchangeCodec
是DubboCodec
的父类,并且在DubboCodec
没有重写这个方法,所以debug
会跳到父类的方法行(因为代码逻辑写在父类里)。
上面方法通过检测消息头中的魔数是否与规定的魔数相等,提前拦截掉非常规数据包,比如通过 telnet
命令行发出的数据包。接着再对消息体长度,以及可读字节数进行检测。最后调用 decodeBody
方法进行后续的解码工作。
注意在最后的try块中,会调用到DubboCodec的实现——DubboCodec#decodeBody
。注意,从头到尾我们调用的都是DubboCodec
类。
3.4 DubboCodec#decodeBody
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2];
// 获得 Serialization 对象
byte proto = (byte) (flag & SERIALIZATION_MASK);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
// 获得请求||响应编号
// get request id.
long id = Bytes.bytes2long(header, 4);
// 解析响应
if ((flag & FLAG_REQUEST) == 0) {
// decode response.
Response res = new Response(id);
// 若是心跳事件,进行设置
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
// 设置状态
// get status.
byte status = header[3];
res.setStatus(status);
// 正常响应状态
if (status == Response.OK) {
try {
Object data;
// 解码心跳事件
if (res.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
// 解码其它事件
} else if (res.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
// 解码普通响应
} else {
DecodeableRpcResult result;
// 在通信框架(例如,Netty)的 IO 线程,解码
if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto);
result.decode();
// 在 Dubbo ThreadPool 线程,解码,使用 DecodeHandler
} else {
result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto);
}
data = result;
}
// 设置结果
res.setResult(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
// 异常响应状态
} else {
res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
// 解析请求
} else {
// decode request.
Request req = new Request(id);
req.setVersion("2.0.0");
// 是否需要响应
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
// 若是心跳事件,进行设置
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
// 解码心跳事件
if (req.isHeartbeat()) {
data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
// 解码其它事件
} else if (req.isEvent()) {
data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
// 解码普通请求
} else {
// 在通信框架(例如,Netty)的 IO 线程,解码
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
// 在 Dubbo ThreadPool 线程,解码,使用 DecodeHandler
} else {
inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
req.setBroken(true);
req.setData(t);
}
return req;
}
}
如上,decodeBody
对部分字段进行了解码,并将解码得到的字段封装到 Request 中。随后会调用 DecodeableRpcInvocation#decode
方法进行后续的解码工作。
要么是在本线程内解码,要么是交由work
线程池执行,会在Dubbo的线程模型、handler
讲解如何交给其执行,又是怎么执行的。
再来看一下DecodeableRpcInvocation#decode
方法
3.5 DecodeableRpcInvocation#decode
public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Decodeable {
/**
* 是否已经解码完成
*/
private volatile boolean hasDecoded;
@Override
public void decode() {
if (!hasDecoded && channel != null && inputStream != null) {
try {
decode(channel, inputStream);
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
}
request.setBroken(true);
request.setData(e);
} finally {
hasDecoded = true;
}
}
}
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
// 通过反序列化得到 dubbo version,并保存到 attachments 变量中
String dubboVersion = in.readUTF();
request.setVersion(dubboVersion);
setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion);
// 通过反序列化得到 path,version,并保存到 attachments 变量中
setAttachment(Constants.PATH_KEY, in.readUTF());
setAttachment(Constants.VERSION_KEY, in.readUTF());
// 通过反序列化得到调用方法名
setMethodName(in.readUTF());
try {
Object[] args;
Class<?>[] pts;
// 通过反序列化得到参数类型字符串,比如 Ljava/lang/String;
String desc = in.readUTF();
if (desc.length() == 0) {
pts = DubboCodec.EMPTY_CLASS_ARRAY;
args = DubboCodec.EMPTY_OBJECT_ARRAY;
} else {
// 将 desc 解析为参数类型数组
pts = ReflectUtils.desc2classArray(desc);
args = new Object[pts.length];
for (int i = 0; i < args.length; i++) {
try {
// 解析运行时参数
args[i] = in.readObject(pts[i]);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " + e.getMessage(), e);
}
}
}
}
// 设置参数类型数组
setParameterTypes(pts);
// 通过反序列化得到原 attachment 的内容
Map<String, String> map = (Map<String, String>) in.readObject(Map.class);
if (map != null && map.size() > 0) {
Map<String, String> attachment = getAttachments();
if (attachment == null) {
attachment = new HashMap<String, String>();
}
// 将 map 与当前对象中的 attachment 集合进行融合
attachment.putAll(map);
setAttachments(attachment);
}
// 对 callback 类型的参数进行处理
for (int i = 0; i < args.length; i++) {
args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
}
// 设置参数列表
setArguments(args);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read invocation data failed.", e));
} finally {
if (in instanceof Cleanable) {
((Cleanable) in).cleanup();
}
}
return this;
}
}
可以看到DecodeableRpcInvocation #decode
会先判断是否已经解码完成(这很重要,在交由Dubbo线程池执行的时候也会进到这个方法,如果已经解码过,就不进行下面的流程,如果已经没解码过,那么就会帮助执行解码操作),如果没有解码过,调用decode的重载方法。
重载方法通过反序列化将诸如 path
、version
、调用方法名、参数列表等信息依次解析出来,并设置到相应的字段中,最终得到一个具有完整调用信息的 DecodeableRpcInvocation
对象。
3.6解码的方法调用顺序
所以解码调用的顺序为:
NettyCodecAdapter的内部类InternalDecoder#messageReceived
->DubboCountCodec#decode
->DubboCodec#decode(channel,buffer)//父类实现的方法
->DubboCodec#decode(channel,buffer,readable,header)//父类实现的方法
->Dubbo#decodeBody
->DecodeableRpcInvocation#decode//交由本线程或者业务线程池执行
网友评论