netty中使用了IdleStateHandler来进行心跳检测,客户端和服务端保持长连接需要通过一个检测机制来确保链接的有效性,在链接处于空闲状态或者一方宕机又或者网络延迟,在这种情况下就要确认链接是否有效,无效链接就需要客户端和服务端都关闭当前链路,释放文件句柄资源。
IdleStateHandler在rocketmq中的使用实例:
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyServerHandler()
);
}
});
rocketmq中使用了IdleStateHandler来维持长链接的有效性,规定在120秒之内,没有发生read或write事件的时候,就会触发fireUserEventTriggered,触发用户自定义事件的执行(关闭链接)。
IdleStateHandler构造函数:
public IdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("unit");
}
this.observeOutput = observeOutput;
//读空闲时间
if (readerIdleTime <= 0) {
readerIdleTimeNanos = 0;
} else {
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
}
//写空闲时间
if (writerIdleTime <= 0) {
writerIdleTimeNanos = 0;
} else {
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
}
//读或写空闲时间
if (allIdleTime <= 0) {
allIdleTimeNanos = 0;
} else {
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
}
}
- readerIdleTime:定义读空闲时间,在这个时间间隔内如果链路没有发生读事件,会触发定时任务的执行。
- writerIdleTime:定义写空闲时间,在这个时间间隔内如果链路没有发生写事件,也会触发定时任务的执行。
- allIdleTime:定义读或写空闲时间,在这个时间间隔内如果链路既没有发生读事件也没有发生写事件,会触发定时任务的执行。
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
// channelActive() event has been fired already, which means this.channelActive() will
// not be invoked. We have to initialize here instead.
initialize(ctx);
} else {
// channelActive() event has not been fired yet. this.channelActive() will be invoked
// and initialization will occur there.
}
}
private void initialize(ChannelHandlerContext ctx) {
// Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143
switch (state) {
case 1:
case 2:
return;
}
state = 1;
initOutputChanged(ctx);
lastReadTime = lastWriteTime = ticksInNanos();
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
}
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
//当前channel所在线程池也是一个周期性任务线程池
return ctx.executor().schedule(task, delay, unit);
}
IdleStateHandler也是被作为一个Handle添加到ChannelPipeline中,当其天剑成功的时候就会触发hadnlerAdded事件:他会判断当前channel链路是否已生效并且已经注册到selector监听器上,此时就会执行initialize函数。
- state:作为IdleStateHandler的状态字段,0 - none, 1 - initialized, 2 - destroyed,已初始化或被销毁就不再执行初始化方法
- lastReaTime:当前时间作为最近一次读或写时间
- 如果定义了读或写或者读写空闲时间的时候,就开启对应的IdleTimeoutTask周期性超时任务
这里拿ReaderIdleTimeoutTask来作为例子进行分析,其它的类似:
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
//构造IdleStateHandler时定义的读空闲时间间隔
long nextDelay = readerIdleTimeNanos;
//链路是否处于读事件中,有读事件的时候该值为true
if (!reading) {
//下一次周期任务等待时间 = 读空闲定义时间 - (当前时间-最近一次读事件时间)
nextDelay -= ticksInNanos() - lastReadTime;
}
//nextDelay小于0,说明周期性定时任务该出发执行了
if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
//这里生成一个读空闲事件
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
//触发用户自定义事件
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
//IdleStateHandler监听到读事件的时候只是会把reading置为true,然后将读事件向下传播
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
}
在给定的时间内链路一直处于读空闲状态的时候,定时任务检测到了之后会触发fireUserEventTriggered事件,这个时候可以通过自定义一个Handler来实现fireUserEventTriggered方法,来对链路读空闲的处理,服务端可以选择关闭链路,客户端可以选择发送一个ping报文给服务端。
网友评论