exchanger层默认实现类是HeaderExchange.
一 client
- DubboProtocol层调用exchanger接口获取client
ExchangeClient client = Exchangers.connect(url, requestHandler)
- 接口返回HeaderExhangeClient
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
1.1 HeaderExhangeClient
- HeaderExhangeClient代理了下层transport层的发送接口
- 执行心跳报文发送
heartbeat
表示心跳间隔,
heartbeat.timeout
表示心跳超时时间,默认3倍心跳间隔。
定时任务调用HeartBeatTask
private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
1.2 HeartBeatTask
- channel通信过程中,会分别设置读时间和写时间
- 上次读写时间距离现在超过心跳间隔,则构建心跳报文Request并发送
- 若事件超过心跳超时时间,client则connect重建连接,server端则close断开连接
public void run() {
try {
long now = System.currentTimeMillis();
for (Channel channel : channelProvider.getChannels()) {
if (channel.isClosed()) {
continue;
}
try {
Long lastRead = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP);
Long lastWrite = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
}
}
if (lastRead != null && now - lastRead > heartbeatTimeout) {
logger.warn("Close channel " + channel
+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
if (channel instanceof Client) {
try {
((Client) channel).reconnect();
} catch (Exception e) {
//do nothing
}
} else {
channel.close();
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
} catch (Throwable t) {
logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
}
}
二 server
- DubboProtocol层调用exchanger接口获取server
ExchangeServer server = Exchangers.bind(url, requestHandler)
- 和client层相同,支持心跳报文发送和处理。
三 收发报文
3.1发送报文
3.1.1 报文格式
组装Request格式的报文

* data为上层传入的RpcInvocation类型数据或其他数据
* version为协议版本
* twoWay为是否需要响应报文
* id通过AtomicLong自增获取。
* hearbeat表示心跳报文,event表示心跳报文或其他事件报文
* broken表示报文解析失败,为垃圾报文
3.1.2 通信异步化
- 发送报文前,使用DefaultFuture存储请求信息。
DefaultFuture future = new DefaultFuture(channel, req, timeout);
- 请求响应报文包含请求id,根据id获取请求的DefaultFuture,发送信号唤醒等待响应的线程,并调用回调函数。
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
3.2 收包handler

3.2.1 HeaderExchangeHandler

3.2.2 DecodeHandler
DecodeableRpcInvocation和DecodeableRpcResult支持编解码的消息体,由DecodeHandler调用对应消息体的编解码函数。
3.2.3 HeartbeatHandler
心跳报文处理函数在transporter层封装,处理简单,即尽早处理心跳报文。
- 记录心跳时间
- 对心跳请求报文回发一个心跳响应。
网友评论