下面我们来分析一下Netty里的又一个关键组件NioEventLoop。
其实NioEventLoop就是轮询注册在Selector上的相关事件,在AbstractBootstrap里有一个属性是
/**
* EventLoopGroup 对象
*/
volatile EventLoopGroup group;
在ServerBootStrap里有一个属性是
/**
* 子 Channel 的 EventLoopGroup 对象
*/
private volatile EventLoopGroup childGroup;
NioEventLoopGroup就是线程组,包含了NioEventLoop。
默认是cpu个数的2倍
/**
* 默认 EventLoop 线程数
*/
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
下面就是NioEventLoop了,先来看看NioEventLoop有哪些东西,NioEventLoop包含了一个Selector和一个TaskQueue,具体结构为
NioEventLoop结构图
来看下NioEventLoop都的类结构图
NioEventLoop
看起来好像很复杂的样子,其实最主要的就是SingleThreadEventExecutor和NioEventLoop这2个类,先来看下SingleThreadEventExecutor的runAllTasks方法
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false; // 是否执行过任务
do {
// 从定时任务获得到时间的任务
fetchedAll = fetchFromScheduledTaskQueue();
// 执行任务队列中的所有任务
if (runAllTasksFrom(taskQueue)) {
// 若有任务执行,则标记为 true
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
// 如果执行过任务,则设置最后执行时间
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
// 执行所有任务完成的后续方法
afterRunningAllTasks();
return ranAtLeastOne;
}
你可能会问,这只是处理非io任务的,那selector是怎么处理事件的呢。别急,这就要看NioEventLoop这个类了。在NioEventLoop的processSelectedKeys方法处理了io事件,包括connect,accept,read,write,具体为processSelectedKey方法
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 如果 SelectionKey 是不合法的,则关闭 Channel
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
// 获得就绪的 IO 事件的 ops
int readyOps = k.readyOps();
// OP_CONNECT 事件就绪
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// 移除对 OP_CONNECT 感兴趣
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 完成连接
unsafe.finishConnect();
}
// OP_WRITE 事件就绪
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
// 向 Channel 写入数据
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
NioEventLoop还有一个定时任务,为AbstractScheduledEventExecutor,这个类有一个定时任务队列,负责执行定时任务
/**
* 定时任务队列
*/
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
最后会执行schedule方法
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(callable, "callable");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
delay = 0;
}
return schedule(new ScheduledFutureTask<V>(
this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
然后就是NioEventLoop执行run方法,开始轮询各种io事件了。NioEventLoop的分析就到这里了。
网友评论