开篇
- 这篇文章的目的是理解下Dubbo 在consumer和provider两侧的心跳检测机制。
- consumer侧心跳检测的以Connection作为维度进行检测,每一个连接生成一个心跳检测任务。
- provider侧心跳检测以server端口监听作为维度进行检测,每个端口监听生成一个心跳检测任务。
- consumer侧的心跳检测任务的初始化在HeaderExchangeClient当中。
- provider侧的心跳检测任务的初始化在HeaderExchangeServer当中。
- provider和consumer在处理发送接收请求过程中会记录读写的时间戳,根据该时间戳判断是否超过心跳检测时间。
HeaderExchangeClient
public class HeaderExchangeClient implements ExchangeClient {
private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeClient.class);
private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));
private final Client client;
private final ExchangeChannel channel;
// heartbeat timer
private ScheduledFuture<?> heartbeatTimer;
// heartbeat(ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
private int heartbeatTimeout;
public HeaderExchangeClient(Client client, boolean needHeartbeat) {
if (client == null) {
throw new IllegalArgumentException("client == null");
}
this.client = client;
this.channel = new HeaderExchangeChannel(client);
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
if (needHeartbeat) {
startHeartbeatTimer();
}
}
private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.<Channel>singletonList(HeaderExchangeClient.this);
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
}
- HeaderExchangeClient的startHeartbeatTimer()方法会创建heartbeatTimer对象。
- heartbeatTimer对象是通过线程池执行的HeartBeatTask对象。
- 具体的任务心跳检测在HeartBeatTask当中实现。
HeaderExchangeServer
public class HeaderExchangeServer implements ExchangeServer {
protected final Logger logger = LoggerFactory.getLogger(getClass());
private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
new NamedThreadFactory(
"dubbo-remoting-server-heartbeat",
true));
private final Server server;
// heartbeat timer
private ScheduledFuture<?> heartbeatTimer;
// heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
private int heartbeat;
private int heartbeatTimeout;
private AtomicBoolean closed = new AtomicBoolean(false);
public HeaderExchangeServer(Server server) {
if (server == null) {
throw new IllegalArgumentException("server == null");
}
this.server = server;
this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}
startHeartbeatTimer();
}
private void startHeartbeatTimer() {
stopHeartbeatTimer();
if (heartbeat > 0) {
heartbeatTimer = scheduled.scheduleWithFixedDelay(
new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
@Override
public Collection<Channel> getChannels() {
return Collections.unmodifiableCollection(
HeaderExchangeServer.this.getChannels());
}
}, heartbeat, heartbeatTimeout),
heartbeat, heartbeat, TimeUnit.MILLISECONDS);
}
}
}
- HeaderExchangeServer的startHeartbeatTimer()方法会创建heartbeatTimer对象。
- heartbeatTimer对象是通过线程池执行的HeartBeatTask对象。
- 具体的任务心跳检测在HeartBeatTask当中实现。
HeartBeatTask
final class HeartBeatTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);
private ChannelProvider channelProvider;
private int heartbeat;
private int heartbeatTimeout;
HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) {
this.channelProvider = provider;
this.heartbeat = heartbeat;
this.heartbeatTimeout = heartbeatTimeout;
}
@Override
public void run() {
try {
long now = System.currentTimeMillis();
for (Channel channel : channelProvider.getChannels()) {
if (channel.isClosed()) {
continue;
}
try {
Long lastRead = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP);
Long lastWrite = (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
if ((lastRead != null && now - lastRead > heartbeat)
|| (lastWrite != null && now - lastWrite > heartbeat)) {
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(Request.HEARTBEAT_EVENT);
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
}
}
if (lastRead != null && now - lastRead > heartbeatTimeout) {
logger.warn("Close channel " + channel
+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
if (channel instanceof Client) {
try {
((Client) channel).reconnect();
} catch (Exception e) {
//do nothing
}
} else {
channel.close();
}
}
} catch (Throwable t) {
logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);
}
}
} catch (Throwable t) {
logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);
}
}
interface ChannelProvider {
Collection<Channel> getChannels();
}
}
- HeartBeatTask会根据channel的lastRead或lastWrite的时间戳来判断需要重发心跳或者重新建立连接。
- 针对需要重发心跳的逻辑就是简单的发送HEARTBEAT_EVENT心跳类型的报文即可。
- lastRead和lastWrite时间戳会在HeartbeatHandler的读写事件中被更新。
- Dubbo的心跳报文会在需要发送的时候才会发送,因为复用了正常报文的收发来修改上次读写时间戳。
HeartbeatHandler
public class HeartbeatHandler extends AbstractChannelHandlerDelegate {
private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class);
public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP";
public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP";
public HeartbeatHandler(ChannelHandler handler) {
super(handler);
}
@Override
public void connected(Channel channel) throws RemotingException {
setReadTimestamp(channel);
setWriteTimestamp(channel);
handler.connected(channel);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
clearReadTimestamp(channel);
clearWriteTimestamp(channel);
handler.disconnected(channel);
}
@Override
public void sent(Channel channel, Object message) throws RemotingException {
setWriteTimestamp(channel);
handler.sent(channel, message);
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
setReadTimestamp(channel);
if (isHeartbeatRequest(message)) {
Request req = (Request) message;
if (req.isTwoWay()) {
Response res = new Response(req.getId(), req.getVersion());
res.setEvent(Response.HEARTBEAT_EVENT);
channel.send(res);
if (logger.isInfoEnabled()) {
int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
if (logger.isDebugEnabled()) {
logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period"
+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));
}
}
}
return;
}
if (isHeartbeatResponse(message)) {
if (logger.isDebugEnabled()) {
logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());
}
return;
}
handler.received(channel, message);
}
private void setReadTimestamp(Channel channel) {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
}
private void setWriteTimestamp(Channel channel) {
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
}
}
- HeartbeatHandler的任何读写事件和连接事件的过程中都会刷新lastRead和lastWrite的值,保证心跳检测正常。
- HeartbeatHandler的received操作当中会直接解析心跳报文并发送心跳响应。
Handler的封装关系
Handler的封装关系
网友评论