EventLoop类继承图
上文提到了,NioServerSocketChannel将OP_Accept事件注册到bossGroup的EventLoop的selector上,而EventLoop是一个继承自java.util.concurrent.ScheduledExecutorService类可执行定时任务的Executor类。
其完整的继承类图如下:
diagram.png
EventLoop的run方法源码
EventLoop不仅可以处理IO线程类,同时还可以执行一个定时或非定时的任务。EventLoop的run方法如下:
protected void run() {
//创建之后就会被调用
//之后会不断轮询
for (;;) {
try {
//判断是否有任务,有任务,返回continue,没任务需要处理,则调用SELECT
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;//IO事件处理的时间
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
ioRadio表示IO事件执行的时间和任务执行的时间的比值,默认为1:1,我们在创建NioEventLoopGroup的时候可以设置这个值。NioEventLoopGroup.setIoRatio()。
通过源码可以看出,run方法内在不断的进行轮询,首先处理是否有已经触发的io事件,有则进行执行,而在finally块中进行runAllTasks(),其中一个带参数的runAllTasks表示,在指定的时间内返回无论任务是否执行完成。而不带参数的runAllTasks则没有时间限制。
下面分别对IO事件processSelectedKeys()和任务处理:runAllTasks()方法进行分析:
EventLoop处理IO事件
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
//进行事件的判断和执行
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
//....省略部分代码
try {
int readyOps = k.readyOps();
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
根据以上代码可以看到,当读取到的事件是OP_Accept或者OP_Read事件时,调用unsafe.read()事件进行处理,unsafe是SocketChannel内部专门处理IO事件的类,不同的SocketChannel拥有不同的unsafe实例,在此我们只讨论我们用到的socketChannel:NioServerSocketChannel、NioSocketChannel,下面是它们的类图。
image2018-6-4 10_58_40.png
其中AbstractNioChannle中提供了内部NioUnsafe接口,在子类AbstractNioByteChannle和AbstractNioMessageChannle分别进行了实现,所以当我们处理IO事件并调用Unsafe.read()时,NioSocketChannel和NioServerSocketChannel调用的是不同的unsafe进行处理。
- 其中NioServerSocketChannel由于处理的是Accept事件,所以 unsafe.read的处理主要是接收并创建NioSocketChannel然后fireChannelRead事件,而这个事件最终会被ServerBootstrapAcceptor中的ChannelRead所处理,channelRead中,将当前创建的NioSocketChannel注册到childGroup中。源码如下:
//服务端处理OP_ACCEPT事件public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {//注册Channel到childGroup
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
- NioSocketChannel处理的事件是Read事件,所以主要读取缓冲区的内容到Bytebuf,然后fireChannelRead事件,进行在pipeline上传播。
关于Pipeline上传播,在下节中我们会讨论,NioServerSocketChannel和NioSocketChannel的pipeline的ChannelHandler链
EventLoop执行Task
EventLoop除了执行io事件外,本身也会执行一些任务和定时任务,如下面的runAllTasks即为eventloop执行task的源码:
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
//每执行64个任务,就判断一次事件是否超时
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
this.lastExecutionTime = lastExecutionTime;
return true;
}
根据源码可以看到,首先EventLoop会将将要执行的定时任务(拿到还未执行的deadlineTime小于当前时间的所有定时任务),加入当前taskQueue中,并对taskQueue进行轮询处理,其中如果设置了处理的task的时间限制,则每处理64个task就会判断一下是否超时,如果超时则退出执行。
之前提到EventLoop内的任务分为两种,一种是可立即执行的taskQueue,另一种是定时执行的scheduledTaskQueue,当我们在handler中提交普通任务和定时任务时会分别加入这两个队列。
网友评论