美文网首页
Netty源码分析-NioEventLoop

Netty源码分析-NioEventLoop

作者: yunkai_zhang | 来源:发表于2017-08-08 22:27 被阅读0次

    NioEventLoop是实现Reactor模型的非常重要的一个类。它是一个Loop循环线程,Loop的核心可以看下它的run()方法:1)执行Selector的select;2)执行一些task。下边的这张图片可以简单的描述Loop循环的操作。

    Loop循环

    1.私有变量及构造函数

    简单看一下NioEventLoop的私有变量以及它的构造函数:

    public final class NioEventLoop extends SingleThreadEventLoop {
        private Selector selector;
        private Selector unwrappedSelector;
        private SelectedSelectionKeySet selectedKeys;
        private final SelectorProvider provider;
        private final AtomicBoolean wakenUp = new AtomicBoolean();
        private final SelectStrategy selectStrategy;
        private volatile int ioRatio = 50;
        private int cancelledKeys;
        private boolean needsToSelectAgain;
    
        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            final SelectorTuple selectorTuple = openSelector();
            selector = selectorTuple.selector;
            unwrappedSelector = selectorTuple.unwrappedSelector;
            selectStrategy = strategy;
        }
    

    具体分析一下,selectorunwrappedSelectorselectedKeys通过在构造函数中调用openSelector()产生,简单说一下这会跟netty的select优化有关系,后边会单独分析一下。provider提供的是select的方式,类型是java.nio.channels.spi.SelectorProvider,实现方式有KQueueSelectorProvider /PollSelectorProvider等。selectStrategyioRatio会在下边讲解run()方法时进行介绍。
    下面摘抄了一下openSelector()方法的实现。首先可以看到:用户可以通过io.netty.noKeySetOptimization开关决定是否启用该优化项,默认是关闭的,此时selector和unwrappedSelector 都指向同一个Selector对象。如果开启优化,那么会在该方法中实例化selectorunwrappedSelector、`selectedKeys这些实例变量。

        private SelectorTuple openSelector() {
            final Selector unwrappedSelector;
            try {
                unwrappedSelector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
    
            if (DISABLE_KEYSET_OPTIMIZATION) {
                return new SelectorTuple(unwrappedSelector);
            }
    
            final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
            //如果开启了优化开关,需要通过反射的方式从Selector实例中获取selectedKeys和publicSelectedKeys,
            //将上述两个成员变量设置为可写,通过反射的方式使用Netty构造的selectedKeys包装类selectedKeySet将原JDK的selectedKeys替换掉。
            Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        return Class.forName(
                                "sun.nio.ch.SelectorImpl",
                                false,
                                PlatformDependent.getSystemClassLoader());
                    } catch (Throwable cause) {
                        return cause;
                    }
                }
            });
    
            if (!(maybeSelectorImplClass instanceof Class) ||
                    ......
                return new SelectorTuple(unwrappedSelector);
            }
    
            final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
            Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
                        ......
                        selectedKeysField.set(unwrappedSelector, selectedKeySet);
                        publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                        return null;
                    } catch  (Exception e) {
                        return e;
                    }
                }
            });
           ......
            selectedKeys = selectedKeySet;
            return new SelectorTuple(unwrappedSelector,
                                     new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
        }
    

    2.run()方法

    首先贴一下run()方法的核心代码,后边我们会结合着代码一步一步分析其到底做了哪些工作。总括一下:
    1.ioEventLoop执行的任务分为两大类:IO任务和非IO任务。IO任务即selectionKey中ready的事件,譬如accept、connect、read、write等;非IO任务则为添加到taskQueue中的任务,譬如register0、bind、channelActive等任务
    2.两类任务的执行先后顺序为:IO任务->非IO任务。
    3.两类任务的执行时间比由变量ioRatio控制,譬如:ioRatio=50(该值为默认值),则表示允许非IO任务执行的时间与IO任务的执行时间相等。

        protected void run() {
            for (;;) {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                        default:
                            // fallthrough
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    

    2.1selectStrategy

    首先关注下selectStrategy做了什么事情。下边的代码标注了selectNowSupplier的实例化方法以及DefaultSelectStrategy的实现逻辑:如果taskQueue中有元素,执行 selectNow()方法,最终执行selector.selectNow(),该方法会立即返回。如果taskQueue没有元素,执行 select(oldWakenUp) 方法。

    private final IntSupplier selectNowSupplier = new IntSupplier() {
            @Override
            public int get() throws Exception {
                return selectNow();
            }
        };
    
    final class DefaultSelectStrategy implements SelectStrategy {
        @Override
        public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
            return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
        }
    }
    public interface SelectStrategy {
    
        /**
         * Indicates a blocking select should follow.
         */
        int SELECT = -1;
        /**
         * Indicates the IO loop should be retried, no blocking select to follow directly.
         */
        int CONTINUE = -2;
    
        /**
         * The {@link SelectStrategy} can be used to steer the outcome of a potential select
         * call.
         *
         * @param selectSupplier The supplier with the result of a select result.
         * @param hasTasks true if tasks are waiting to be processed.
         * @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
         *         the next step should be to not select but rather jump back to the IO loop and try
         *         again. Any value >= 0 is treated as an indicator that work needs to be done.
         */
        int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
    }
    
    

    那么 selectNow()select(oldWakenUp)之间有什么区别呢? 首先来看一下selectNow()的源码。方法中:首先调用了 selector.selectNow()方法检查当前是否有就绪的 IO 事件, 如果有, 则返回就绪 IO 事件的个数; 如果没有, 则返回0. 注意, selectNow() 是立即返回的, 不会阻塞当前线程. 当selectNow() 调用后, finally 语句块中会检查 wakenUp 变量是否为 true, 当为 true 时, 调用 selector.wakeup()唤醒 select()的阻塞调用.

        int selectNow() throws IOException {
            try {
                return selector.selectNow();
            } finally {
                // restore wakeup state if needed
                if (wakenUp.get()) {
                    selector.wakeup();
                }
            }
        }
    

    我们再来看下另外的分支select(oldWakenUp) 方法.其实select(oldWakenUp)方法的处理逻辑比较复杂, 这里就不用列出,简单指出一下在这个 select 方法中, 调用了 selector.select(timeoutMillis), 而这个调用是会阻塞住当前线程的,重要的是Netty这个方法解决了Nio中臭名昭著的bug:selector的select方法导致cpu100%。

    2.2I/O任务

    在NioEventLoop的run()方法中执行I/O任务的主要函数就是processSelectedKeys(),会根据是否采用Selector优化选择执行不同的方法。

    private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    

    首先看一下没有做优化的process处理流程.大致流程是遍历唤醒的SelectionKey,然后取出对应key注册的attachment。在这里attachment可能会有两种类型:1是AbstractNioChannel,说明对应的Channel有相应的状态变更,接下来会根据SelectionKey的readyOps值进行相应的accpet、read、write操作;2是NioTask<SelectableChannel>,说明是一个NioTask,也会进行相应的操作。

        private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
            if (selectedKeys.isEmpty()) {
                return;
            }
    
            Iterator<SelectionKey> i = selectedKeys.iterator();
            for (;;) {
                final SelectionKey k = i.next();
                final Object a = k.attachment();
                i.remove();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (!i.hasNext()) {
                    break;
                }
    
                if (needsToSelectAgain) {
                    selectAgain();
                    selectedKeys = selector.selectedKeys();
    
                    // Create the iterator again to avoid ConcurrentModificationException
                    if (selectedKeys.isEmpty()) {
                        break;
                    } else {
                        i = selectedKeys.iterator();
                    }
                }
            }
        }
    

    关于做过优化的processSelectedKeysOptimized的处理流程与上边的大致是一致的,只是要处理的SelectedKeys对象有所不同,processSelectedKeysOptimized处理的SelectedKeys对象是构造函数初始化的时候通过openSelector实例化的selectedKeys。

    2.3非I/O任务

    如果 ioRatio 不为100时,方法runAllTasks的执行时间只能为ioTime * (100 - ioRatio) / ioRatio,其中ioTime 是方法processSelectedKeys的执行时间。
    方法fetchFromScheduledTaskQueue把scheduledTaskQueue中已经超过延迟执行时间的任务移到taskQueue中等待被执行。
    依次从taskQueue任务task执行,每执行64个任务,进行耗时检查,如果已执行时间超过预先设定的执行时间,则停止执行非IO任务,避免非IO任务太多,影响IO任务的执行。

        protected boolean runAllTasks(long timeoutNanos) {
            fetchFromScheduledTaskQueue();
            Runnable task = pollTask();
            if (task == null) {
                afterRunningAllTasks();
                return false;
            }
    
            final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                safeExecute(task);
    
                runTasks ++;
    
                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
    
                task = pollTask();
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            afterRunningAllTasks();
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }
    

    相关文章

      网友评论

          本文标题:Netty源码分析-NioEventLoop

          本文链接:https://www.haomeiwen.com/subject/uqpxrxtx.html