美文网首页IT@程序员猿媛
netty源码分析(9)-NioEventLoop执行过程之se

netty源码分析(9)-NioEventLoop执行过程之se

作者: Jorgezhong | 来源:发表于2019-02-20 18:33 被阅读29次

    上一节我们研究了NioEventLoop的执行过程,但是select(wakenUp.getAndSet(false));processSelectedKeys();没有分析。

    这一节研究select(),改方法干的事情其实是轮询检查IO事件。有三个阶段

    1. 第一个阶段,计算超时时间并判断是否超时,超时则进行一次非阻塞式的select,中断轮询检测;没有超时,同样也会判断taskQueue和tailQueue中是否有有任务,有任务也进行同样的操作
    2. 第二个阶段,未超时,且任务队列为空,进行阻塞式的select,直到检测到一个已注册的IO事件发生,阻塞时间默认为1秒钟。
    3. 第三个阶段,该阶段主要的作用就是防止空轮询bug,导致cpu暂用率过高导致宕机的情况。
    
        private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                int selectCnt = 0;
                long currentTimeNanos = System.nanoTime();
                //当前时间 + 截止时间 (NioEventLoop底层维持了一个定时任务队列,按照任务的截止时间正序排序)
                //delayNanos(currentTimeNanos) : 计算当前定时任务队列第一个任务的截止时间
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
                //轮询
                for (;;) {
    
                    //计算超时时间
                    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                    //判断是否超时
                    if (timeoutMillis <= 0) {
                        //超时,判断有没有select
                        if (selectCnt == 0) {
                            //没有select,进行非阻塞的select,异步检测IO事件。不阻塞
                            selector.selectNow();
                            selectCnt = 1;
                        }
                        break;
                    }
    
                    // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                    // Selector#wakeup. So we need to check task queue again before executing select operation.
                    // If we don't, the task might be pended until select operation was timed out.
                    // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
    
                    //未超时
                    //判断当前任务队列(taskQueue)和异步任务队列(tailQueue)是否有任务
                    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                        //有任务,进行一次非阻塞式的select
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
    
                    //第二个阶段
                    //未超时,且任务队列为空的话,进行阻塞式的select,直到检测到一个已注册的IO事件发生,timoutMillis是阻塞最大时间,默认1000,
                    int selectedKeys = selector.select(timeoutMillis);
                    selectCnt ++;
    
                    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                        // - Selected something,
                        // - waken up by user, or
                        // - the task queue has a pending task.
                        // - a scheduled task is ready for processing
                        break;
                    }
                    if (Thread.interrupted()) {
                        // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                        // As this is most likely a bug in the handler of the user or it's client library we will
                        // also log it.
                        //
                        // See https://github.com/netty/netty/issues/2426
                        if (logger.isDebugEnabled()) {
                            logger.debug("Selector.select() returned prematurely because " +
                                    "Thread.currentThread().interrupt() was called. Use " +
                                    "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                        }
                        selectCnt = 1;
                        break;
                    }
    
                    //第三个阶段
                    //防止jdk空轮询的bug
    
                    long time = System.nanoTime();
                    //判断执行了一次阻塞式select后,当前时间和开始时间是否大于超时时间。(大于是很正常的,小于的话,说明没有执行发生了空轮询)
                    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                        // timeoutMillis elapsed without anything selected.
                        selectCnt = 1;
                    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                        // The code exists in an extra method to ensure the method is not too big to inline as this
                        // branch is not very likely to get hit very frequently.
                        // SELECTOR_AUTO_REBUILD_THRESHOLD的值为512
                        //selectCut >= 512 也就是空轮询测试大于等于512次的话,空轮询处理发生,调用改方法避免下一次发生
                        selector = selectRebuildSelector(selectCnt);
                        selectCnt = 1;
                        break;
                    }
                    currentTimeNanos = time;
                }
    
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                }
            } catch (CancelledKeyException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
                // Harmless exception - log anyway
            }
        }
    
    • 重点说一下防止空轮询bug,在第三阶段,代码time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos

    time:当前时间
    timeoutMillis:阻塞式select超时时间
    currentTimeNanos:本方法开始时间

    因此满足该条件就是,经过了阻塞式的select检测IO事件,但是并没有任何事件发生,尽力了。这时候select的计数加一,累计到阈值SELECTOR_AUTO_REBUILD_THRESHOLD的时候则满足了空轮询的条件,该阈值在本类静态代码块中初始化为512,经过512次什么事情都没有做,表表示满足空轮询的条件。

    //selectCut >= 512 也就是空轮询测试大于等于512次的话,空轮询处理发生,调用改方法避免下一次发生
    selector = selectRebuildSelector(selectCnt);
    
    
        private Selector selectRebuildSelector(int selectCnt) throws IOException {
            // The selector returned prematurely many times in a row.
            // Rebuild the selector to work around the problem.
            logger.warn(
                    "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                    selectCnt, selector);
            //充值Selector
            rebuildSelector();
            Selector selector = this.selector;
            //非阻塞select 检测IO事件
            // Select again to populate selectedKeys.
            selector.selectNow();
            return selector;
        }
    
        public void rebuildSelector() {
            if (!inEventLoop()) {
                execute(new Runnable() {
                    @Override
                    public void run() {
                        rebuildSelector0();
                    }
                });
                return;
            }
            rebuildSelector0();
        }
    

    通过分析源码,发现空轮询bug的解决策略就是重置selector,重新打开一个selector,依照旧的selector重新注册所有的IO事件到原有通道上,生成新的selectionKey,并刷新该值,同时刷新的还有和selector相关的属性值,最后把旧的给关闭掉。这样不会无限死循环,仅仅会替换掉原有的属性值而已,做了一些动作。

        private void rebuildSelector0() {
            final Selector oldSelector = selector;
            final SelectorTuple newSelectorTuple;
    
            if (oldSelector == null) {
                return;
            }
    
            try {
                //重新创建一个selector
                newSelectorTuple = openSelector();
            } catch (Exception e) {
                logger.warn("Failed to create a new Selector.", e);
                return;
            }
    
            // Register all channels to the new Selector.
            int nChannels = 0;
            //轮询旧操作的所有的key和attachment 
            //attachment 其实就是netty包装了的channel
            for (SelectionKey key: oldSelector.keys()) {
                //获取channel
                Object a = key.attachment();
                try {
                    if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
                        continue;
                    }
                    //注册key注册的事件
                    int interestOps = key.interestOps();
                    //取消之前key的事件
                    key.cancel();
                    //将事件和channel注册到新创建的selector上
                    SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
                    if (a instanceof AbstractNioChannel) {
                        // Update SelectionKey
                        //替换新的selectionKey
                        ((AbstractNioChannel) a).selectionKey = newKey;
                    }
                    nChannels ++;
                } catch (Exception e) {
                    logger.warn("Failed to re-register a Channel to the new Selector.", e);
                    if (a instanceof AbstractNioChannel) {
                        AbstractNioChannel ch = (AbstractNioChannel) a;
                        ch.unsafe().close(ch.unsafe().voidPromise());
                    } else {
                        @SuppressWarnings("unchecked")
                        NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                        invokeChannelUnregistered(task, key, e);
                    }
                }
            }
            //重置新的selector
            selector = newSelectorTuple.selector;
            unwrappedSelector = newSelectorTuple.unwrappedSelector;
    
            try {
                // time to close the old selector as everything else is registered to the new one
                //关闭旧的selector
                oldSelector.close();
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to close the old Selector.", t);
                }
            }
    
            if (logger.isInfoEnabled()) {
                logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
            }
        }
    

    相关文章

      网友评论

        本文标题:netty源码分析(9)-NioEventLoop执行过程之se

        本文链接:https://www.haomeiwen.com/subject/dftkyqtx.html