美文网首页Java程序员
Netty源码(三)NioEventLoop三部曲

Netty源码(三)NioEventLoop三部曲

作者: 挪威的senlin | 来源:发表于2020-05-26 23:44 被阅读0次

    前言

    本文将会具体分析NioEventLoop中的thread,它的启动时机,以及所履行的职责。还会分析一些netty的实现细节,比如解决NIO的bug和一些优化等。

    thread启动

    之前说到NioEventLoop是由一个thread处理I/O事件和提交的任务。先看一下这个thread启动的流程。

    execute 简化流程

    private void execute(Runnable task, boolean immediate) {
           //是当前线程调用,直接加入队列
            boolean inEventLoop = inEventLoop();
            addTask(task);
            if (!inEventLoop) {
                //启动线程
                startThread();
             // ......
            }
    
            if (!addTaskWakesUp && immediate) {
                wakeup(inEventLoop);
            }
        }
    

    可以看出启动thread是一个延迟加载的过程,在执行第一个任务的时候才会启动thread。跟进去看startThread()

        private void startThread() {
          //判断线程状态是否已启动
            if (state == ST_NOT_STARTED) {
               //CAS设置线程状态为已启动
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    boolean success = false;
                    try {
                      //真正去启动线程
                        doStartThread();
                        success = true;
                    } finally {
                        if (!success) {
                            STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                        }
                    }
                }
            }
        }
    

    doStartThread

    private void doStartThread() {
            assert thread == null;
            //调用传入参数的executor的execute方法,
            //executor会新建一个线程去执行任务
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    //将执行该任务的线程赋值给thread 
                    thread = Thread.currentThread();
                    if (interrupted) {
                        thread.interrupt();
                    }
    
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        //执行任务
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                    // ......
                }
        }
    

    前文分析了executor为ThreadPerTaskExecutor,执行execute方法时候为新建一个线程去执行任务,NioEventLoop的thread就是在此时赋值。
    thread的启动流程简化为,首先thread启动是一个懒加载的过程,在第一次执行任务才会启动。在启动的过程中,会有一个CAS的状态判断当前线程是否已经被启动,如果thread没有启动,则通过传入的executor对象去创建thread对象,并执行SingleThreadEventExecutor.this.run()这个方法。

    下面分析SingleThreadEventExecutor.this.run()这个方法,

        /**
         * Run the tasks in the {@link #taskQueue}
         */
        protected abstract void run();
    

    可以看见是一个抽象方法,然后找到文本分析的NioEventLoop对于run的实现,这里做一个将代码做一个简化,只有主要流程

        protected void run() {
            int selectCnt = 0;
            for (; ; ) {
                //1、检测IO事件
                select();
                try {
                  //2、处理准备就绪的IO事件
                    processSelectedKeys();
                } finally {
                    // 3、执行队列里的任务
                    final long ioTime = System.nanoTime() - ioStartTime;
                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
    
            }
        }
    

    NioEventLoop的职责只有三个,1、检测IO事件 ;2、处理准备就绪的IO事件;3、执行队列里的任务,用一个死循环去不断执行这三件事情。如之前画的图所示:


    run

    接下来就着重分析这三个步骤。

    select

    select步骤的核心是调用通过NIO中的selector的select()方法,返回selector上所监听到IO事件。

                        case SelectStrategy.SELECT:
                            // 获取当前任务队列的延迟时间
                            long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                            if (curDeadlineNanos == -1L) {
                                curDeadlineNanos = NONE; // nothing on the calendar
                            }
                            nextWakeupNanos.set(curDeadlineNanos);
                            try {
                                //当前任务队列为空,监听IO事件
                                if (!hasTasks()) {
                                    strategy = select(curDeadlineNanos);
                                }
                            } finally {
                                // This update is just to help block unnecessary selector wakeups
                                // so use of lazySet is ok (no race condition)
                                nextWakeupNanos.lazySet(AWAKE);
                            }
                            // fall through
                        default:
                        }
    

    select方法

        private int select(long deadlineNanos) throws IOException {
            if (deadlineNanos == NONE) {
                return selector.select();
            }
            // Timeout will only be 0 if deadline is within 5 microsecs
            long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
            return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
        }
    

    流程整体比较简单,如果时间参数deadlineNanos为NONE,就调用selector.select()方法,这个方法会一直阻塞直到有IO事件返回。否则再判断deadlineNanos是否小于等于0,如果是调用selectNow()会立即返回当前selector上准备就绪的IO事件,否则调用selector.select(timeoutMillis)方法,会在指定时间内返回,不管是否有IO事件发生。然后跟select()方法,找到实现类io.netty.channel.nio.SelectedSelectionKeySetSelector,

        public int select() throws IOException {
            selectionKeys.reset();
            return delegate.select();
        }
    

    一共有两步操作,第一步是将之前的selectionKeys清空,检测到就绪的IO事件都会放入selectionKeys中,这里表示新的一轮IO循环开始,所以要将之前的清空(selectionKeys后续会在详细介绍)。第二步是调用NIO中的Selector对象的select(),将最后底层的IO实现委托给它。

    processSelectedKeys

    processSelectedKeys这一步将会处理监测到的IO事件,比如连接、读写的IO操作。

        private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    

    这里有个细节,处理优化过后的selectedKeys还是处理原生的selectedKeys。所谓优化的selectedKeys就是将原生的selectedKeys的HashSet替换成数组实现,提高空间利用率和遍历的效率,待会儿会详细将到是怎么替换的selectedKeys。

    然后跟进去看processSelectedKeysOptimized()的具体实现:

        private void processSelectedKeysOptimized() {
            for (int i = 0; i < selectedKeys.size; ++i) {
                final SelectionKey k = selectedKeys.keys[i];
                // null out entry in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.keys[i] = null;
    
                final Object a = k.attachment();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (needsToSelectAgain) {
                    // null out entries in the array to allow to have it GC'ed once the Channel close
                    // See https://github.com/netty/netty/issues/2363
                    selectedKeys.reset(i + 1);
    
                    selectAgain();
                    i = -1;
                }
            }
        }
    

    整体流程就是在遍历selectedKeys,将绑定在SelectionKey上的Channel取下来,然后做对应的IO操作,最后再判断是否需要重置selectedKeys。下面我会逐步分析里面的细节

    第一: selectedKeys.keys[i] = null;

    将SelectionKey取出之后把数组这个位置的地方置为null。为什么这么做?https://github.com/netty/netty/issues/2363描述的很清楚,简单来说就是我们并不会去清空selectedKeys数组,这就会导致在Channel关闭之后,依然会保持SelectionKey的强引用。

    selectedKeys.jpg
    如上图所示,假如数组原有长度为2,一次高峰期的IO事件导致数组扩容到8,之后新的IO事件的数量又达不到之前数组的位置,为导致上图坐标[6]、[7]位置会长时间持有已经关闭的Channel的引用,所以这里将其置为null,有助于GC。

    第二: processSelectedKey

                
                final Object a = k.attachment();
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } 
    

    首先是将SelectionKey绑定的属性取下来,判断是否是AbstractNioChannel的类型。这里可以追踪一下netty是什么时候将AbstractNioChannel设置进去的。在AbstractNioChannel的doRegister方法

      //最后一个参数就是att
      selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
    

    其Channel注册到底层jdk的组件中,然后将AbstractNioChannel作为参数传递进去,后续轮询出IO事件之后,再将AbstractNioChannel取出做后续操作。
    具体处理IO事件
    processSelectedKey(SelectionKey k, AbstractNioChannel ch)
    这里贴一点核心流程,主要是判断当前Channel的操作类型,是连接还是读、写

               int readyOps = k.readyOps();
                //连接事件
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
               //写事件
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
                //读事件
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
    

    这里面的内部流程就不具体分析了,大致分为两个部分bossGroup监听的连接事件,将接受到的Channel转交给workGroup,然后workGroup处理读写事件,然后将事件通过ChannelPipeline将事件传播出去。具体细节可以看AbstractNioMessageChannel和AbstractNioByteChannel的read()方法,后续可能会具体分析这里的代码。

    第三: needsToSelectAgain

    最后一个步骤,重新设置selectedKeys

                if (needsToSelectAgain) {
                    // null out entries in the array to allow to have it GC'ed once the Channel close
                    // See https://github.com/netty/netty/issues/2363
                    selectedKeys.reset(i + 1);
                    selectAgain();
                    i = -1;
                }
    

    什么时候需要重新select?找到needsToSelectAgain被设置为true的地方,只有唯一的一处cancel

        void cancel(SelectionKey key) {
            key.cancel();
            cancelledKeys ++;
            if (cancelledKeys >= CLEANUP_INTERVAL) {
                cancelledKeys = 0;
                needsToSelectAgain = true;
            }
        }
    

    然后看cancel被调用的地方doDeregister

        protected void doDeregister() throws Exception {
            eventLoop().cancel(selectionKey());
        }
    

    由上面的两部分代码分析可以知道,channel的关闭是通过移除在selector上的注册实现的,同时会把cancelledKeys加一 。当达到了阈值CLEANUP_INTERVAL(默认256)后将cancelledKeys重置为0、needsToSelectAgain 为true。
    当needsToSelectAgain 为true之后,有两个步骤:
    1.selectedKeys清空 -> selectedKeys.reset(i + 1);

        void reset(int start) {
            Arrays.fill(keys, start, size, null);
            size = 0;
        }
    
    1. 再次填充selectedKeys ->selectAgain
        private void selectAgain() {
            needsToSelectAgain = false;
            try {
                selector.selectNow();
            } catch (Throwable t) {
                logger.warn("Failed to update SelectionKeys.", t);
            }
        }
    

    至于为什么需要重新去填充selectedKeys,可能是需要保持selectedKeys里面的Channel都随时保持的是活跃的。

    processSelectedKeys到这就分析完了,总共分为三步

    1. 遍历selectedKeys
    2. 处理IO事件
    3. 是否需要重置selectedKeys

    ranTasks

    现在分析thread的最后一步工作ranTasks,执行队列里的任务。
    1. 任务类型
    NioEventLoop里的任务类型分为两部分,一个是由taskQueue(MpscUnboundedArrayQueue)存放普通的任务,还有一个scheduledTaskQueue存放定时任务的队列。之前分析过EventLoop继承自ScheduledExecutorService,所以也需要提供执行定时任务的功能,而这里的定时任务是通过PriorityQueue来实现的。(定时任务的实现方式有很多,优先队列只是其中一种)ranTasks执行的任务其实就是两部分的内容,一个是普通队列中的任务和定时队列中的任务。
    2. ioRatio
    在分析执行细节之前,在提一个很重要的参数ioRatio,代表设置事件循环中I/O所需时间的百分比,意思就是在一次循环中,处理IO事件的时间与处理队列任务所占时间做一个百分比的分配,范围是1到100,当设置为100时,这个参数就失效了,默认参数为50。下面代码就是对ioRatio的使用

                    //等于100的时候,参数失效,不再平衡IO事件所占时间的比例
                    if (ioRatio == 100) {
                        try {
                            if (strategy > 0) {
                                processSelectedKeys();
                            }
                        } finally {
                            // Ensure we always run tasks.
                            ranTasks = runAllTasks();
                        }
                    } else if (strategy > 0) {
                        //开始执行IO事件的时间
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            // 获得IO执行总共耗时
                            final long ioTime = System.nanoTime() - ioStartTime;
                            //按照ioRatio计算出将花费多少时间执行ranTasks 
                            ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
    

    3. runAllTasks

        protected boolean runAllTasks(long timeoutNanos) {
            //将scheduledTaskQueue队列中的任务转移到taskQueue中
            fetchFromScheduledTaskQueue();
            Runnable task = pollTask();
            //任务为空结束
            if (task == null) {
                afterRunningAllTasks();
                return false;
            }
            //计算本次执行任务最迟的时间
            final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                //执行任务
                safeExecute(task);
    
                runTasks ++;
    
                //每执行64个任务之后判断时间是否超出,若超出结束循环
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
                //没有任务结束循环
                task = pollTask();
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            afterRunningAllTasks();
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }
    

    主要流程为

    1. scheduledTaskQueue队列中的任务转移到taskQueue中;
    2. 安全的执行任务(其实就是将任务try catch,以免任务执行发生异常,影响其他任务执行);
        protected static void safeExecute(Runnable task) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception. Task: {}", task, t);
            }
        }
    
    1. 每执行64个任务之后判断执行时间是否超出deadline,这里采用64个任务为一个批次,没有每次任务执行去判断,也是对性能的一个优化;
    2. 执行afterRunningAllTasks方法,其实就是执行tailTasks队列中的任务,然后记录一下最后的执行时间this.lastExecutionTime = lastExecutionTime;

    一些细节

    selectedKeySet

    前面提到过netty将NIO中Selector的selectedKeys替换,这里分析一下为什么需要替换和么去替换的selectedKeys。

    1. 为什么替换

    NIO原生的selectedKeys使用的是HashSet,而NioEventLoop将其替换成了SelectedSelectionKeySet

    //SelectorImpl
    protected Set<SelectionKey> selectedKeys = new HashSet();
    //NioEventLoop
    private SelectedSelectionKeySet selectedKeys;
    

    SelectedSelectionKeySet构造函数

        SelectedSelectionKeySet() {
            keys = new SelectionKey[1024];
        }
    
    

    SelectedSelectionKeySet使用的是数组存储元素,而HashSet是基于HashMap去存储数据,采用数组使得空间利用率和遍历的效率有所提高。

    2.怎么替换

    要在运行时替换掉类的属性,很明显是通过反射来做到的。

    • 获取sun.nio.ch.SelectorImpl Class对象
            Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        return Class.forName(
                                "sun.nio.ch.SelectorImpl",
                                false,
                                PlatformDependent.getSystemClassLoader());
                    } catch (Throwable cause) {
                        return cause;
                    }
                }
            });
    
    • 创建selectedKeySet
    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
    • 设置属性
        //获取属性
        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
        ......
        //将selectedKeySet设置到属性中
        selectedKeysField.set(unwrappedSelector, selectedKeySet);
        publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
    

    NIO空轮询bug

    NIO有一个很出名的bug就是epoll空轮询的bug,这会导致CPU占有率到100%,java也并没有修复这个bug,netty采用了一个很巧妙的方法来绕过这个bug。
    主要思想就是,通过检测发生空轮询的次数,当超过一定的阈值之后,netty将会重新创建一个selector,并将之前selector上的channel转移到新的selector上。通过重新创建selector的方式来解决NIO空轮询的bug。

    unexpectedSelectorWakeup

            //空轮询的次数超过阈值,默认为512
            if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                //重新构建selector
                rebuildSelector();
                return true;
            }
    

    跟进去找到具体的实现方法rebuildSelector0

            final Selector oldSelector = selector;
            final SelectorTuple newSelectorTuple;
            try {
                //创建新的selector
                newSelectorTuple = openSelector();
            } catch (Exception e) {
                logger.warn("Failed to create a new Selector.", e);
                return;
            }
            // Register all channels to the new Selector.
            int nChannels = 0;
            for (SelectionKey key: oldSelector.keys()) {
            //将旧的selector上的channel全部注册到新的selector上
            }
            //赋值
            selector = newSelectorTuple.selector;
            unwrappedSelector = newSelectorTuple.unwrappedSelector;
            try {
                // 关闭旧的selector
                oldSelector.close();
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to close the old Selector.", t);
                }
            }
    

    总结

    本文分析NioEventLoop中所对应的唯一的thread,启动是一个懒加载的过程,当第一次任务执行的时候才会初始化。后续thread开始循环处理三件事件

    1. 检测IO事件 ;
    2. 处理准备就绪的IO事件;
    3. 执行队列里的任务

    本文也对具体的代码进行了分析,还有一些netty对NIO的优化和bug处理,当然netty的精妙之处远不止本文分析的这些,更多的还需要自己去探索和学习。

    相关文章

      网友评论

        本文标题:Netty源码(三)NioEventLoop三部曲

        本文链接:https://www.haomeiwen.com/subject/hnveghtx.html