美文网首页
Netty-EventLoop执行流程分析

Netty-EventLoop执行流程分析

作者: 王凯_6a8b | 来源:发表于2019-01-24 17:44 被阅读0次

EventLoop类继承图

上文提到了,NioServerSocketChannel将OP_Accept事件注册到bossGroup的EventLoop的selector上,而EventLoop是一个继承自java.util.concurrent.ScheduledExecutorService类可执行定时任务的Executor类。

其完整的继承类图如下:


diagram.png

EventLoop的run方法源码

EventLoop不仅可以处理IO线程类,同时还可以执行一个定时或非定时的任务。EventLoop的run方法如下:

protected void run() {
    //创建之后就会被调用
    //之后会不断轮询
    for (;;) {
        try {
            //判断是否有任务,有任务,返回continue,没任务需要处理,则调用SELECT
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;//IO事件处理的时间
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

ioRadio表示IO事件执行的时间和任务执行的时间的比值,默认为1:1,我们在创建NioEventLoopGroup的时候可以设置这个值。NioEventLoopGroup.setIoRatio()。
通过源码可以看出,run方法内在不断的进行轮询,首先处理是否有已经触发的io事件,有则进行执行,而在finally块中进行runAllTasks(),其中一个带参数的runAllTasks表示,在指定的时间内返回无论任务是否执行完成。而不带参数的runAllTasks则没有时间限制。

下面分别对IO事件processSelectedKeys()和任务处理:runAllTasks()方法进行分析:
EventLoop处理IO事件

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        // null out entry in the array to allow to have it GC'ed once the Channel close
        // See https://github.com/netty/netty/issues/2363
        selectedKeys.keys[i] = null;
        final Object a = k.attachment();
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        if (needsToSelectAgain) {
            // null out entries in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.reset(i + 1);
            selectAgain();
            i = -1;
        }
    }
}
//进行事件的判断和执行
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    //....省略部分代码
    try {
        int readyOps = k.readyOps();
        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }
        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

根据以上代码可以看到,当读取到的事件是OP_Accept或者OP_Read事件时,调用unsafe.read()事件进行处理,unsafe是SocketChannel内部专门处理IO事件的类,不同的SocketChannel拥有不同的unsafe实例,在此我们只讨论我们用到的socketChannel:NioServerSocketChannel、NioSocketChannel,下面是它们的类图。


image2018-6-4 10_58_40.png

其中AbstractNioChannle中提供了内部NioUnsafe接口,在子类AbstractNioByteChannle和AbstractNioMessageChannle分别进行了实现,所以当我们处理IO事件并调用Unsafe.read()时,NioSocketChannel和NioServerSocketChannel调用的是不同的unsafe进行处理。

  • 其中NioServerSocketChannel由于处理的是Accept事件,所以 unsafe.read的处理主要是接收并创建NioSocketChannel然后fireChannelRead事件,而这个事件最终会被ServerBootstrapAcceptor中的ChannelRead所处理,channelRead中,将当前创建的NioSocketChannel注册到childGroup中。源码如下:
//服务端处理OP_ACCEPT事件public void channelRead(ChannelHandlerContext ctx, Object msg) {
 final Channel child = (Channel) msg;
 child.pipeline().addLast(childHandler);
 setChannelOptions(child, childOptions, logger);
 for (Entry<AttributeKey<?>, Object> e: childAttrs) {
 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
 }
 try {//注册Channel到childGroup
 childGroup.register(child).addListener(new ChannelFutureListener() {
 @Override
 public void operationComplete(ChannelFuture future) throws Exception {
 if (!future.isSuccess()) {
 forceClose(child, future.cause());
 }
 }
 });
 } catch (Throwable t) {
 forceClose(child, t);
 }
}
  • NioSocketChannel处理的事件是Read事件,所以主要读取缓冲区的内容到Bytebuf,然后fireChannelRead事件,进行在pipeline上传播。
    关于Pipeline上传播,在下节中我们会讨论,NioServerSocketChannel和NioSocketChannel的pipeline的ChannelHandler链

EventLoop执行Task

EventLoop除了执行io事件外,本身也会执行一些任务和定时任务,如下面的runAllTasks即为eventloop执行task的源码:

protected boolean runAllTasks(long timeoutNanos) {
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    if (task == null) {
        return false;
    }

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (;;) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception.", t);
        }

        runTasks ++;

        // Check timeout every 64 tasks because nanoTime() is relatively expensive.
        // XXX: Hard-coded value - will make it configurable if it is really a problem.
        //每执行64个任务,就判断一次事件是否超时
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    this.lastExecutionTime = lastExecutionTime;
    return true;
}

根据源码可以看到,首先EventLoop会将将要执行的定时任务(拿到还未执行的deadlineTime小于当前时间的所有定时任务),加入当前taskQueue中,并对taskQueue进行轮询处理,其中如果设置了处理的task的时间限制,则每处理64个task就会判断一下是否超时,如果超时则退出执行。

之前提到EventLoop内的任务分为两种,一种是可立即执行的taskQueue,另一种是定时执行的scheduledTaskQueue,当我们在handler中提交普通任务和定时任务时会分别加入这两个队列。

相关文章

网友评论

      本文标题:Netty-EventLoop执行流程分析

      本文链接:https://www.haomeiwen.com/subject/rujujqtx.html