一、概述
soft-bolt心跳机制是基于Netty,因此在分析soft-bolt的心跳之前,先分析一下netty心跳实现。
二、netty心跳实现
Netty提供了IdleStateHandler类,用于支持心跳检查,其构造参数
/**
* Creates a new instance firing {@link IdleStateEvent}s.
*
* @param readerIdleTime 读超时时间
* @param writerIdleTime 写超时时间
* @param allIdleTime 读写超时时间
* @param unit 时间单位
*/
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
该类是一个ChannelHandler,需要加入到ChannelPipeline里面,参考代码如下:
this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("decoder", codec.newDecoder());
pipeline.addLast("encoder", codec.newEncoder());
if (idleSwitch) {
pipeline.addLast("idleStateHandler", new IdleStateHandler(5, 0, 0, TimeUnit.MILLISECONDS));
pipeline.addLast("serverIdleHandler", serverIdleHandler);
}
pipeline.addLast("connectionEventHandler", connectionEventHandler);
pipeline.addLast("handler", rpcHandler);
createConnection(channel);
}
在channel链中加入了IdleSateHandler,第一个参数是5,单位是秒,服务器端会每隔5秒来检查一下channelRead方法被调用的情况,如果在5秒内该链上的channelRead方法都没有被触发,就会调用userEventTriggered方法 ,下面看一下IdleStateHandler中的channelRead方法。
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
}
该方法只是记录了一下调用时间,然后将请求往下透传,接下来看一下channelActive方法。
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// This method will be invoked only if this handler was added
// before channelActive() event is fired. If a user adds this handler
// after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx);
super.channelActive(ctx);
}
在客户端与服务端建立连接以后,会调用channelActive方法,在IdleSateHandler的channelActive方法中调用initialize()方法进行连接心跳的初始化操作,具体实现如下:
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
//创建定时任务处理读超时
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
//创建定时任务,处理写超时
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
// 创建定时任务,处理读写超时
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
上面启动了定时任务,来处理心跳问题,下面具体来分析ReaderIdleTimeoutTask定时任务做了什么操作?
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
当前时间减去最后一次channelRead方法调用的时间,如果该时间间隔大于设置的读超时时间,就触发读空闲时间,并且创建定时任务继续检查。
上面的代码分析了读超时问题,写超时和读写超时的代码类似,可以自行分析。
三、soft-bolt 心跳实现
在soft-bolt实现了通过HeartbeatHandler来处理连接心跳。具体实现如下:
public class HeartbeatHandler extends ChannelDuplexHandler {
public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ProtocolCode protocolCode = ctx.channel().attr(Connection.PROTOCOL).get();
Protocol protocol = ProtocolManager.getProtocol(protocolCode);
protocol.getHeartbeatTrigger().heartbeatTriggered(ctx);
} else {
super.userEventTriggered(ctx, evt);
}
}
}
上面判断如果事件类型IdleStateEvent,马上就进行心跳处理,心跳处理是heartbeatTriggered方法实现的
public void heartbeatTriggered(final ChannelHandlerContext ctx) throws Exception {
Integer heartbeatTimes = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
final Connection conn = ctx.channel().attr(Connection.CONNECTION).get();
//如果心跳超过设定次数没有响应,就断开连接
if (heartbeatTimes >= maxCount) {
try {
conn.close();
logger.error(
"Heartbeat failed for {} times, close the connection from client side: {} ",
heartbeatTimes, RemotingUtil.parseRemoteAddress(ctx.channel()));
} catch (Exception e) {
logger.warn("Exception caught when closing connection in SharableHandler.", e);
}
} else {
boolean heartbeatSwitch = ctx.channel().attr(Connection.HEARTBEAT_SWITCH).get();
if (!heartbeatSwitch) {
return;
}
final HeartbeatCommand heartbeat = new HeartbeatCommand();
//添加回调listener
final InvokeFuture future = new DefaultInvokeFuture(heartbeat.getId(),
new InvokeCallbackListener() {
@Override
public void onResponse(InvokeFuture future) {
ResponseCommand response;
try {
response = (ResponseCommand) future.waitResponse(0);
} catch (InterruptedException e) {
logger.error("Heartbeat ack process error! Id={}, from remoteAddr={}",
heartbeat.getId(), RemotingUtil.parseRemoteAddress(ctx.channel()),
e);
return;
}
if (response != null
&& response.getResponseStatus() == ResponseStatus.SUCCESS) {
if (logger.isDebugEnabled()) {
logger.debug("Heartbeat ack received! Id={}, from remoteAddr={}",
response.getId(),
RemotingUtil.parseRemoteAddress(ctx.channel()));
}
// 如果心跳请求被成功响应,设置心跳次数为0
ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(0);
} else {
if (response == null) {
logger.error("Heartbeat timeout! The address is {}",
RemotingUtil.parseRemoteAddress(ctx.channel()));
} else {
logger.error(
"Heartbeat exception caught! Error code={}, The address is {}",
response.getResponseStatus(),
RemotingUtil.parseRemoteAddress(ctx.channel()));
}
// 心跳请求响应异常或者超时,心跳次数加1
Integer times = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(times + 1);
}
}
@Override
public String getRemoteAddress() {
return ctx.channel().remoteAddress().toString();
}
}, null, heartbeat.getProtocolCode().getFirstByte(), this.commandFactory);
final int heartbeatId = heartbeat.getId();
conn.addInvokeFuture(future);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat, successive count={}, Id={}, to remoteAddr={}",
heartbeatTimes, heartbeatId, RemotingUtil.parseRemoteAddress(ctx.channel()));
}
// 发送心跳请求
ctx.writeAndFlush(heartbeat).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat done! Id={}, to remoteAddr={}",
heartbeatId, RemotingUtil.parseRemoteAddress(ctx.channel()));
}
} else {
logger.error("Send heartbeat failed! Id={}, to remoteAddr={}", heartbeatId,
RemotingUtil.parseRemoteAddress(ctx.channel()));
}
}
});
// 处理心跳请求超时
TimerHolder.getTimer().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
InvokeFuture future = conn.removeInvokeFuture(heartbeatId);
if (future != null) {
future.putResponse(commandFactory.createTimeoutResponse(conn
.getRemoteAddress()));
future.tryAsyncExecuteInvokeCallbackAbnormally();
}
}
}, heartbeatTimeoutMillis, TimeUnit.MILLISECONDS);
}
}
网友评论