美文网首页
Netty-EventLoop执行流程分析

Netty-EventLoop执行流程分析

作者: 王凯_6a8b | 来源:发表于2019-01-24 17:44 被阅读0次

    EventLoop类继承图

    上文提到了,NioServerSocketChannel将OP_Accept事件注册到bossGroup的EventLoop的selector上,而EventLoop是一个继承自java.util.concurrent.ScheduledExecutorService类可执行定时任务的Executor类。

    其完整的继承类图如下:


    diagram.png

    EventLoop的run方法源码

    EventLoop不仅可以处理IO线程类,同时还可以执行一个定时或非定时的任务。EventLoop的run方法如下:

    protected void run() {
        //创建之后就会被调用
        //之后会不断轮询
        for (;;) {
            try {
                //判断是否有任务,有任务,返回continue,没任务需要处理,则调用SELECT
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;//IO事件处理的时间
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
    

    ioRadio表示IO事件执行的时间和任务执行的时间的比值,默认为1:1,我们在创建NioEventLoopGroup的时候可以设置这个值。NioEventLoopGroup.setIoRatio()。
    通过源码可以看出,run方法内在不断的进行轮询,首先处理是否有已经触发的io事件,有则进行执行,而在finally块中进行runAllTasks(),其中一个带参数的runAllTasks表示,在指定的时间内返回无论任务是否执行完成。而不带参数的runAllTasks则没有时间限制。

    下面分别对IO事件processSelectedKeys()和任务处理:runAllTasks()方法进行分析:
    EventLoop处理IO事件

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);
                selectAgain();
                i = -1;
            }
        }
    }
    //进行事件的判断和执行
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        //....省略部分代码
        try {
            int readyOps = k.readyOps();
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    

    根据以上代码可以看到,当读取到的事件是OP_Accept或者OP_Read事件时,调用unsafe.read()事件进行处理,unsafe是SocketChannel内部专门处理IO事件的类,不同的SocketChannel拥有不同的unsafe实例,在此我们只讨论我们用到的socketChannel:NioServerSocketChannel、NioSocketChannel,下面是它们的类图。


    image2018-6-4 10_58_40.png

    其中AbstractNioChannle中提供了内部NioUnsafe接口,在子类AbstractNioByteChannle和AbstractNioMessageChannle分别进行了实现,所以当我们处理IO事件并调用Unsafe.read()时,NioSocketChannel和NioServerSocketChannel调用的是不同的unsafe进行处理。

    • 其中NioServerSocketChannel由于处理的是Accept事件,所以 unsafe.read的处理主要是接收并创建NioSocketChannel然后fireChannelRead事件,而这个事件最终会被ServerBootstrapAcceptor中的ChannelRead所处理,channelRead中,将当前创建的NioSocketChannel注册到childGroup中。源码如下:
    //服务端处理OP_ACCEPT事件public void channelRead(ChannelHandlerContext ctx, Object msg) {
     final Channel child = (Channel) msg;
     child.pipeline().addLast(childHandler);
     setChannelOptions(child, childOptions, logger);
     for (Entry<AttributeKey<?>, Object> e: childAttrs) {
     child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
     }
     try {//注册Channel到childGroup
     childGroup.register(child).addListener(new ChannelFutureListener() {
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
     if (!future.isSuccess()) {
     forceClose(child, future.cause());
     }
     }
     });
     } catch (Throwable t) {
     forceClose(child, t);
     }
    }
    
    • NioSocketChannel处理的事件是Read事件,所以主要读取缓冲区的内容到Bytebuf,然后fireChannelRead事件,进行在pipeline上传播。
      关于Pipeline上传播,在下节中我们会讨论,NioServerSocketChannel和NioSocketChannel的pipeline的ChannelHandler链

    EventLoop执行Task

    EventLoop除了执行io事件外,本身也会执行一些任务和定时任务,如下面的runAllTasks即为eventloop执行task的源码:

    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            return false;
        }
    
        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception.", t);
            }
    
            runTasks ++;
    
            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            //每执行64个任务,就判断一次事件是否超时
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
    
            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }
    
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
    

    根据源码可以看到,首先EventLoop会将将要执行的定时任务(拿到还未执行的deadlineTime小于当前时间的所有定时任务),加入当前taskQueue中,并对taskQueue进行轮询处理,其中如果设置了处理的task的时间限制,则每处理64个task就会判断一下是否超时,如果超时则退出执行。

    之前提到EventLoop内的任务分为两种,一种是可立即执行的taskQueue,另一种是定时执行的scheduledTaskQueue,当我们在handler中提交普通任务和定时任务时会分别加入这两个队列。

    相关文章

      网友评论

          本文标题:Netty-EventLoop执行流程分析

          本文链接:https://www.haomeiwen.com/subject/rujujqtx.html