介绍:
对于netty这种高性能的网络通信框架来说,支持的连接数还是很可观的;当同时存在数百万连接的时候,对于这些连接进行一定监控,适当的去关闭一些超时的连接,还是可以节省很多资源的。
netty利用IdleStateHandler实现心跳机制;超过配置的指定时间,还没有收到读或者写事件,就可以关闭对应的Channel,这个需要自定义一个Handler来重写userEventTriggered()方法,在里面去监听IdleStateEvent事件,然后关闭对应的连接或者做其他的处理。
下面具体描述下实现心跳的具体逻辑...
心跳:
从大的方面来说是封装一个AbstractIdleTask任务,放入到PriorityQueue<ScheduledFutureTask<?>>队列里面,?指的就是AbstractIdleTask的子类实现;具体要等待多久执行这个Task的代码逻辑是在ReaderIdleTimeoutTask、WriterIdleTimeoutTask、AllIdleTimeoutTask这三个任务类里面的run方法实现;代码实现如下所示:
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;//@1 设置的常量值
if (!reading) {
nextDelay -= ticksInNanos() - lastReadTime;//@2 距离上一次读的时间和常量值比较
}
if (nextDelay <= 0) {//@3 超过设置的时间
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false;
try {
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {//@4 还没超过设置的时间
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
然后让线程去从队列里面取任务,执行...
IdleStateHandler在加入到Pipeline的时候,会触发调用handlerAdded()方法,内部紧接着会调用initialize()方法,在这个初始化方法的调用链路里面,会把task加入到Queue里面。如下图所示:
超时设置.png
像readerIdleTimeNanos、writerIdleTimeNanos、allIdleTimeNanos参数都是在构造函数里面赋值的...
在AbstractScheduledEventExecutor类里面scheduleFromEventLoop()方法里面实现放进Queue的,如图:
image.png
此处应该有一个类图:
IdleStateHandler.png
在NioEventLoop的run方法里面,有一句代码:
finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
这个方法就是从一些队列里面获取任务(很多种任务,相当于一个大杂烩),比如:scheduledTaskQueue、Queue<Runnable> tailTasks,然后执行;此处也应该有个类图,有点乱,梳理下...
NioEventLoop.png
这个时候,突然有种感觉:面向接口和抽象类编程,挺灵活...
我这个netty版本(4.1.53.Final),用的是这种时间差来做定时的,新版本好像用的是时间轮...时间轮这个算法的设计思想也有很多值得借鉴的地方.
总结:
生活中总会有各种各样的绊脚石,跨过去,你就会发现前面的路会更加平坦...
网友评论