上一节我们研究了NioEventLoop创建过程,概括起来就是,通道channel
获取NioEventLoop
实例,调用execute()
方法,用ThreadPerTaskExecutor
执行器,开启一个线程,去执行NioEventLoop
的run()
方法,同时将任务存储到NioEventLoop
的成员变量taskQueue
队列中,以便后续异步执行,该成员变量在其父类SingleThreadEventExecutor
中。
这一节,我们直接研究NioEventLoop
具体执行的内容是什么。
入口:启动NioEventLoop
时SingleThreadEventExecutor#doStartThread()
执行了以下代码。实际上时获取了NioEventLoop
实例并执行了run()
方法。本节主要看该方法
SingleThreadEventExecutor.this.run();
执行过程主要做了三件事
-
select(wakenUp.getAndSet(false));
: 轮询IO事件 -
processSelectedKeys();
: 处理IO事件 -
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
:执行任务队列里的任务,并根据时间系数ioRatio
来设置其运行时间和processSelectedKeys()
运行时间的占比。
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
//轮询 注册到该 selector 上的IO事件
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
//设置任务队列的时间系数ioRatio
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//处理IO事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
//处理IO事件
processSelectedKeys();
} finally {
// 以ioTime的执行时间 和 时间系数确定任务队列运行时间
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
-
select(wakenUp.getAndSet(false));
和processSelectedKeys();
比较复杂,接下来另开章节介绍。 -
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
和runAllTasks()
一个有参数一个没参数。
发现runAllTasks()
将一系列定时任务(用户可调用schedule()
方法来添加定时任务)加入到了taskQueue
中,并且执行里面的任务,如果出现了异常,就打印日志,输出到控制台。
protected boolean runAllTasks() {
//保证当前线程式NioEventLoop持有的线程
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
//将定时任务队列里的任务全部放到NioEventQueue.taskQueue中
fetchedAll = fetchFromScheduledTaskQueue();
//从taskQueue中取出任务依次执行,直到完成,异常则打日志
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
Runnable task = pollTaskFrom(taskQueue);
if (task == null) {
return false;
}
for (;;) {
safeExecute(task);
task = pollTaskFrom(taskQueue);
if (task == null) {
return true;
}
}
}
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
runAllTasks(long timeoutNanos)
增加了一个超时时间的节点。该节点最终被表示为deadline
,超过这个截止时间,循环执行队列任务的操作会被中断,可能队列里任务不能完全执行完。
protected boolean runAllTasks(long timeoutNanos) {
//定时任务队列的任务全部挪到NioEventLoop.taskQueue中
fetchFromScheduledTaskQueue();
//从taskQueue中取出任务
Runnable task = pollTask();
if (task == null) {
//执行tailQueue中的任务
afterRunningAllTasks();
return false;
}
//计算截止时间
//ScheduledFutureTask.nanoTime():定时任务开始执行到现在的时间
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
//执行任务
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
//每64次刷新一次:距离任务开始的时间
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
//执行tailQueue中的任务
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
因此,ioRatio
的作用是以处理IO事件的时长为参考,决定执行taskQueue
中的任务的超时时间。如果设置为100的话,则会一直执行,直到队列为空。ioRatio
的默认值是50
因此是1:1,超时时间和IO事件处理事件是一样的,那么怎么设置这个值呢?当前在处理IO事件,是在bossGroup
里面。在这里设置。
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
bossGroup.setIoRatio(100);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
另外以上的一些列ScheduledFutureTask从哪里来呢?
NioEventLoop
的父类AbstractScheduledEventExecutor
持有一个定时任务队列
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
而NioEventLoop
通过暴露schedule()
方法来允许用户添加定时任务到该任务队列
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(callable, "callable");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
delay = 0;
}
validateScheduled0(delay, unit);
return schedule(new ScheduledFutureTask<V>(
this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
SCHEDULED_FUTURE_TASK_COMPARATOR,
// Use same initial capacity as java.util.PriorityQueue
11);
}
return scheduledTaskQueue;
}
网友评论