美文网首页
三、netty源码分析之EventLoop

三、netty源码分析之EventLoop

作者: 丑星星 | 来源:发表于2019-09-28 09:30 被阅读0次

    一、EventLoop功能概述

    上篇我们分析了EventLoopGroup的核心能力,EventLoopGroup具有执行任务、注册Channel、执行器调度等能力。今天我们来看一下EventLoop。我们先来看看EventLoop的类图关系:

    EventLoop
    我们可以看到,EventLoop接口继承了EventLoopGroup接口。为什么EventLoop要继承EventLoopGroup呢?从上一篇的分析,我们知道,EventLoopGroup最主要的功能时对EventLoop进行管理调度,EventLoopGroup的其他大部分功能,都是交给自己管理的EventLoop来处理的。而EventLoop继承EventLoopGroup,就是为了继承EventLoopGroup任务执行、优雅停机、Channel注册等功能窗口。
    除了继承EventLoopGroup之外,EventLoop还继承了EventExecutor接口。我们可以看一下EventExecutor的具体内容:
    /**
     * The {@link EventExecutor} is a special {@link EventExecutorGroup} which comes
     * with some handy methods to see if a {@link Thread} is executed in a event loop.
     * Besides this, it also extends the {@link EventExecutorGroup} to allow for a generic
     * way to access methods.
     *
     */
    public interface EventExecutor extends EventExecutorGroup {
    
        /**
         * Returns a reference to itself.
         */
        @Override
        EventExecutor next();
    
        /**
         * Return the {@link EventExecutorGroup} which is the parent of this {@link EventExecutor},
         */
        EventExecutorGroup parent();
    
        /**
         * Calls {@link #inEventLoop(Thread)} with {@link Thread#currentThread()} as argument
         */
        boolean inEventLoop();
    
        /**
         * Return {@code true} if the given {@link Thread} is executed in the event loop,
         * {@code false} otherwise.
         */
        boolean inEventLoop(Thread thread);
    
        /**
         * Return a new {@link Promise}.
         */
        <V> Promise<V> newPromise();
    
        /**
         * Create a new {@link ProgressivePromise}.
         */
        <V> ProgressivePromise<V> newProgressivePromise();
    
        /**
         * Create a new {@link Future} which is marked as succeeded already. So {@link Future#isSuccess()}
         * will return {@code true}. All {@link FutureListener} added to it will be notified directly. Also
         * every call of blocking methods will just return without blocking.
         */
        <V> Future<V> newSucceededFuture(V result);
    
        /**
         * Create a new {@link Future} which is marked as failed already. So {@link Future#isSuccess()}
         * will return {@code false}. All {@link FutureListener} added to it will be notified directly. Also
         * every call of blocking methods will just return without blocking.
         */
        <V> Future<V> newFailedFuture(Throwable cause);
    }
    
    

    从接口的头部注释我们可以看到,EventExecutor是一个特殊的EventExecutorGroup,它提供了一些易用的方法来判断一个线程是否正在事件循环中执行。至于EventExecutorGroup我们上一篇分析过这个接口的能力,这里就不再赘述了。我们看一看EventExecutor的几个重要的的方法:
    首先是EventExecutorGroup parent();方法,EventExecutor只有事件执行的能力,没有调度的能力,所以这个方法只会返回对象自身。
    然后是两个重载的inEventLoop方法,用来判断线程是否正在事件循环中执行。
    随后是两个创建Promise的方法,关于Promise的作用,大家不清楚的可以查一下相关资料,内部具体实现我们在后面的文章中再做分析。
    最后,是一对创建Future的方法,我们从注释中可以看到这两个方法的作用,就是创建一个已经被标记成成功/失败的Future对象。所有已经注册的FutureListener都会被直接通知。所有的阻塞方法都会非阻塞的返回。

    我们的EventLoop继承了OrderedEventExecutor,而OrderedEventExecutor直接继承了EventExecutor,本身并无定义其他方法。但是我们可以从OrderedEventExecutor的头部注释中看到,OrderedEventExecutor其实是一个标记接口,这个接口保证所有执行的任务必须按照顺序执行,并且要串行执行!所以我们可以相信,实现了OrderedEventExecutor的类,执行任务的时候回保证任务执行的顺序性,并且同一时刻只能执行一个任务。

    到这里,我们可以知道,EventLoop的核心能力:EventLoop是一个可以优雅停机的任务执行器,它能保证提交的任务都被顺序串行执行。接下来我们根据EventLoop的一个具体实现类NioEventLoop来更直观的理解一下EventLoop的能力。

    从NioEventLoop来看EventLoop在netty中扮演的角色

    首先我们先看一看NioEventLoop的构造方法:

        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                     EventLoopTaskQueueFactory queueFactory) {
            super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                    rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            final SelectorTuple selectorTuple = openSelector();
            selector = selectorTuple.selector;
            unwrappedSelector = selectorTuple.unwrappedSelector;
            selectStrategy = strategy;
        }
    

    我们一路跟踪,会发现,这个构造方法调用了父类SingleThreadEventExecutor的构造方法:

        protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                            boolean addTaskWakesUp, Queue<Runnable> taskQueue,
                                            RejectedExecutionHandler rejectedHandler) {
            super(parent);
            this.addTaskWakesUp = addTaskWakesUp;
            this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
            this.executor = ThreadExecutorMap.apply(executor, this);
            this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
            rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
        }
    

    我们可以看到这里面有一行this.executor = ThreadExecutorMap.apply(executor, this);。这个构造方法传入的executor参数就是我们上节提到过的NioEventLoopGrop在创建NioEventLoop时传入的ThreadPerTaskExecutor对象。这里在给成员变量赋值的时候调用了ThreadExecutorMap.apply(executor, this),我们可以看一下这里面的具体内容:

        //ThreadExecutorMap类的相关内容
    
        private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
    
        public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
            ObjectUtil.checkNotNull(executor, "executor");
            ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
            return new Executor() {
                @Override
                public void execute(final Runnable command) {
                    executor.execute(apply(command, eventExecutor));
                }
            };
        }
    
        public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
            ObjectUtil.checkNotNull(command, "command");
            ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
            return new Runnable() {
                @Override
                public void run() {
                    setCurrentEventExecutor(eventExecutor);
                    try {
                        command.run();
                    } finally {
                        setCurrentEventExecutor(null);
                    }
                }
            };
        }
    
        private static void setCurrentEventExecutor(EventExecutor executor) {
            mappings.set(executor);
        }
    

    我们可以看到,Executor apply(final Executor executor, final EventExecutor eventExecutor)重新创建了一个Executor对象,这个对象执行任务还是调用参数传入的Executor 来执行,只不过是在传入的任务中做了一个静态代理,在任务执行的前后分别将执行此任务的EventExecutor绑定、解绑到自身持有的一个FastThreadLocal中。这里的FastThreadLocal是netty自己实现的一个处理线程单例的工具,这个FastThreadLocal究竟比我们jdk中的ThreadLocal快在哪里呢?我们把这个类的set方法拿出来看一下(在此之前你必须要知道jdkThreadLoop的实现原理):

      //FastThreadLocal的set方法
        public final void set(V value) {
            if (value != InternalThreadLocalMap.UNSET) {
                InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
                setKnownNotUnset(threadLocalMap, value);
            } else {
                remove();
            }
        }
    

    InternalThreadLocalMap的get()方法:

        public static InternalThreadLocalMap get() {
            Thread thread = Thread.currentThread();
            if (thread instanceof FastThreadLocalThread) {
                return fastGet((FastThreadLocalThread) thread);
            } else {
                return slowGet();
            }
        }
    
        private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
            InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
            if (threadLocalMap == null) {
                thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
            }
            return threadLocalMap;
        }
    
        private static InternalThreadLocalMap slowGet() {
            ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
            InternalThreadLocalMap ret = slowThreadLocalMap.get();
            if (ret == null) {
                ret = new InternalThreadLocalMap();
                slowThreadLocalMap.set(ret);
            }
            return ret;
        }
    

    我们可以看到这个FastThreadLocal在获取Map的时候会判断当前的线程是否是FastThreadLocalThread的对象,是的话就调用fastGet(FastThreadLocalThread thread)方法获取InternalThreadLocalMap(不存在就创建);如果不是FastThreadLocalThread的对象,就调用slowGet()获取,获取逻辑是从一个静态的ThreadLocal对象中获取当前线程绑定的InternalThreadLocalMap对象,没有的话就创建一个。在获取到InternalThreadLocalMap的对象后,怎么向里面赋值呢?我们可以看一下FastThreadLocal中的set方法赋值的真正逻辑:

      // FastThreadLocal的set方法
        private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
            //index是FastThreadLocal维护的一个索引对象
            if (threadLocalMap.setIndexedVariable(index, value)) {
                addToVariablesToRemove(threadLocalMap, this);
            }
        }
    
      // InternalThreadLocalMap的方法
        public boolean setIndexedVariable(int index, Object value) {
            Object[] lookup = indexedVariables;
            if (index < lookup.length) {
                Object oldValue = lookup[index];
                lookup[index] = value;
                return oldValue == UNSET;
            } else {
                expandIndexedVariableTableAndSet(index, value);
                return true;
            }
        }
    

    我们可以看到,其实InternalThreadLocalMap内部是一个数组,每个FastThreadLocal都记录了自身维护的线程单例的对象再数组中的位置,即index这个成员变量。这个index的值是在FastThreadLocal初始化的时候从InternalThreadLocalMap内部的一个静态递增变量处获取的。 InternalThreadLocalMap这种方式和jdk内部的ThreadLocalMap使用散列表的方式存储对象相比,优点是:获取和设置线程单例对象的时候,少了hash值计算这一步,并且没有hash冲撞的情况发生。这一点相比ThreadLocalMap*的确性能会有所提升。这也是netty对性能优化的一方面体现,后面我们还会看到好多在细节上的优化。

    我们花了很大的篇幅分析了NioEventLoop的构造方法,目的就是为了让大家看到netty对性能的优化都是落到很多细节上的。下面我们继续分析NioEventLoop构造方法的剩余内容,接下来我们会看到netty的另一个优化,在此之前大家要熟悉Java的NIO,不然接下来内容肯定是看不懂的!

    我们可以看到NioEventLoop有下面几个成员变量:

        private Selector selector;
        private Selector unwrappedSelector;
        private SelectedSelectionKeySet selectedKeys;
    
        private final SelectorProvider provider;
    

    我们在NioEventLoop构造方法中可以看到对这几个成员变量的初始化过程。
    首先,我们可以看到,构造方法中通过openSelector()方法生成了一个SelectorTuple的对象,然后将SelectorTuple中的selectorunwrappedSelector赋值给NioEventLoop的队形属性。我们可以看一下openSelector()的内容:

        private SelectorTuple openSelector() {
            final Selector unwrappedSelector;
            try {
                //创建一个Selector对象
                unwrappedSelector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
    
            if (DISABLE_KEY_SET_OPTIMIZATION) {
                //禁止优化选型,如果选择禁止优化,就直接创建一个SelectorTuple对象返回
                return new SelectorTuple(unwrappedSelector);
            }
    
            Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        return Class.forName(
                                "sun.nio.ch.SelectorImpl",
                                false,
                                PlatformDependent.getSystemClassLoader());
                    } catch (Throwable cause) {
                        return cause;
                    }
                }
            });
    
            if (!(maybeSelectorImplClass instanceof Class) ||
                // ensure the current selector implementation is what we can instrument.
                !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
                if (maybeSelectorImplClass instanceof Throwable) {
                    Throwable t = (Throwable) maybeSelectorImplClass;
                    logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
                }
                return new SelectorTuple(unwrappedSelector);
            }
            //下面是优化的开始
            final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
            final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
            Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        //通过反射获取Selector的相关Field
                        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
                        if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                            // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                            // This allows us to also do this in Java9+ without any extra flags.
                            long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                            long publicSelectedKeysFieldOffset =
                                    PlatformDependent.objectFieldOffset(publicSelectedKeysField);
    
                            if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                                //通过反射设置Selector对应的属性的值
                                PlatformDependent.putObject(
                                        unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                                PlatformDependent.putObject(
                                        unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                                return null;
                            }
                            // We could not retrieve the offset, lets try reflection as last-resort.
                        }
    
                        Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                        if (cause != null) {
                            return cause;
                        }
                        cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                        if (cause != null) {
                            return cause;
                        }
    
                        selectedKeysField.set(unwrappedSelector, selectedKeySet);
                        publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                        return null;
                    } catch (NoSuchFieldException e) {
                        return e;
                    } catch (IllegalAccessException e) {
                        return e;
                    }
                }
            });
    
            if (maybeException instanceof Exception) {
                selectedKeys = null;
                Exception e = (Exception) maybeException;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
                return new SelectorTuple(unwrappedSelector);
            }
            selectedKeys = selectedKeySet;
            logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
            return new SelectorTuple(unwrappedSelector,
                                     new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
        }
    

    我们可以看到,整个openSelector()方法做的事情就是:判断参数是否允许相关优化,如果允许优化,就将创建的Selector的对象的两个属性:selectedKeyspublicSelectedKeys重写为:SelectedSelectionKeySet对象。关于selectedKeyspublicSelectedKeys,大家可以看一看Selector的API,这里不再赘述。这里为什么要对这两个属性重新赋值呢?为什么重新赋值了就是优化了呢?我们先来看一下这两个属性在Selector中是什么:

        //SelectorImpl的部分代码
        protected Set<SelectionKey> selectedKeys = new HashSet();
        this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
    

    我们可以看到,原来的selectedKeyspublicSelectedKeys归根结底都是HashSet。而替换成的SelectedSelectionKeySet又是什么呢?我们来看一下:

    final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    
        SelectionKey[] keys;
        int size;
    
        SelectedSelectionKeySet() {
            keys = new SelectionKey[1024];
        }
    
        @Override
        public boolean add(SelectionKey o) {
            if (o == null) {
                return false;
            }
    
            keys[size++] = o;
            if (size == keys.length) {
                increaseCapacity();
            }
            return true;
        }
    
        @Override
        public boolean remove(Object o) {
            return false;
        }
    
        @Override
        public boolean contains(Object o) {
            return false;
        }
    
        @Override
        public int size() {
            return size;
        }
    
        @Override
        public Iterator<SelectionKey> iterator() {
           //省略
        }
    
        void reset() {
            reset(0);
        }
    
        void reset(int start) {
            Arrays.fill(keys, start, size, null);
            size = 0;
        }
    
        private void increaseCapacity() {
          //省略
        }
    }
    

    我们可以看到,SelectedSelectionKeySet继承了AbstractSet,但是它内部实现压根不能算是一个Set,因为它的add方法没有保证元素在集合中唯一的相关实现!为什么要这么做呢?我们不妨先看一下jdk中对selectedKeys这个集合添加元素的相关逻辑,由于没有源码,只能看到变量名是var这种定义,不过不影响我们对逻辑的理解:

                                if (WindowsSelectorImpl.this.selectedKeys.contains(var10)) {
                                    if (var9.clearedCount != var1) {
                                        if (var10.channel.translateAndSetReadyOps(var4, var10) && var9.updateCount != var1) {
                                            var9.updateCount = var1;
                                            ++var6;
                                        }
                                    } else if (var10.channel.translateAndUpdateReadyOps(var4, var10) && var9.updateCount != var1) {
                                        var9.updateCount = var1;
                                        ++var6;
                                    }
    
                                    var9.clearedCount = var1;
                                } else {
                                    if (var9.clearedCount != var1) {
                                        var10.channel.translateAndSetReadyOps(var4, var10);
                                        if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
                                            WindowsSelectorImpl.this.selectedKeys.add(var10);
                                            var9.updateCount = var1;
                                            ++var6;
                                        }
                                    } else {
                                        var10.channel.translateAndUpdateReadyOps(var4, var10);
                                        if ((var10.nioReadyOps() & var10.nioInterestOps()) != 0) {
                                            WindowsSelectorImpl.this.selectedKeys.add(var10);
                                            var9.updateCount = var1;
                                            ++var6;
                                        }
                                    }
    
                                    var9.clearedCount = var1;
                                }
    

    我们可以看到,在把元素添加到selectedKeys之前,会判断selectedKeys是否已经包含了这个元素,包含的话就操作已经在就不再进行添加操作,不包含的时候才进行添加操作。而SelectedSelectionKeySet的判断是否存在指定元素的方法始终返回false,也就意味着,selectedKeys会被添加重复的SelectionKey对象。添加重复的SelectionKey对象会有什么影响呢?在netty中对准备就绪的SelectionKey做处理之前,都会判断SelectionKey对象就绪的状态,处理完该事件之后,会把SelectionKey对象的就绪状态移除。所以如果重复添加SelectionKey对象,在这里是不会有任何影响的!那这种用数组直接替代HashMap的操作有什么好处呢?首先,我们看,NioEventLoop继承了SingleThreadEventLoop,我们可以猜出,NioEventLoop是单线程操作selectedKeys的。单线程操作数组有什么好处呢?单线程操作可以充分利用CPU的高速缓存,避免伪共享的发生!并且netty的处理selectedKeys时,只会在处理完所有的就绪的SelectionKey清空数组,之后再次调用select方法。所以不存在添加时找空槽的情况,只要顺序的往数组里面加元素就可以了!这种操作比HashMap添加、删除操作性能要高太多(做了一个小的测试,从容量为10000的数组和HashMap中删除元素,HashMap耗时大概是数组的十倍左右)。

    我们花了大量的篇幅分析了EventLoop的构造方法。这里主要是想让大家看到netty对性能的优化真的无处不在!而且是千方百计的去优化!这也是netty被广泛应用的原因。包括好多高性能高吞吐的中间件也使用了netty做通信,比如RocketMQ、Spark。而我们在分析类似netty这种高性能框架的源码时,一定要注意到这些优化细节,这样我们才能清楚这些框架哪里好,才能知道怎么样才能正确的使用这些框架来充分发挥它们的优势!

    我们继续看NioEventLoop的主要逻辑,接下来我们看一下run()方法:

        @Override
        protected void run() {
            for (;;) {
                try {
                    try {
                        switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
    
                        case SelectStrategy.BUSY_WAIT:
                            // fall-through to SELECT since the busy-wait is not supported with NIO
    
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
    
                            // 'wakenUp.compareAndSet(false, true)' is always evaluated
                            // before calling 'selector.wakeup()' to reduce the wake-up
                            // overhead. (Selector.wakeup() is an expensive operation.)
                            //
                            // However, there is a race condition in this approach.
                            // The race condition is triggered when 'wakenUp' is set to
                            // true too early.
                            //
                            // 'wakenUp' is set to true too early if:
                            // 1) Selector is waken up between 'wakenUp.set(false)' and
                            //    'selector.select(...)'. (BAD)
                            // 2) Selector is waken up between 'selector.select(...)' and
                            //    'if (wakenUp.get()) { ... }'. (OK)
                            //
                            // In the first case, 'wakenUp' is set to true and the
                            // following 'selector.select(...)' will wake up immediately.
                            // Until 'wakenUp' is set to false again in the next round,
                            // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                            // any attempt to wake up the Selector will fail, too, causing
                            // the following 'selector.select(...)' call to block
                            // unnecessarily.
                            //
                            // To fix this problem, we wake up the selector again if wakenUp
                            // is true immediately after selector.select(...).
                            // It is inefficient in that it wakes up the selector for both
                            // the first case (BAD - wake-up required) and the second case
                            // (OK - no wake-up required).
    
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                            // fall through
                        default:
                        }
                    } catch (IOException e) {
                        // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                        // the selector and retry. https://github.com/netty/netty/issues/8566
                        rebuildSelector0();
                        handleLoopException(e);
                        continue;
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    

    run()方法是父类SingleThreadEventExecutor的模板方法的实现。我们可以看到,run()方法就是一个不断的循环,在循环内做了什么操作呢?首先,先调用selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())来获取select策略。我们先来看一下SelectStrategy这个接口:

    public interface SelectStrategy {
    
        /**
         * Indicates a blocking select should follow.
         * 接下来要执行阻塞的select操作
         */
        int SELECT = -1;
        /**
         * IO循环应该被重试,非阻塞select接下来会被直接执行
         * Indicates the IO loop should be retried, no blocking select to follow directly.
         */
        int CONTINUE = -2;
        /**
         * 接下来不要阻塞获取新的事件IO循环
         * Indicates the IO loop to poll for new events without blocking.
         */
        int BUSY_WAIT = -3;
    
        /**
         * The {@link SelectStrategy} can be used to steer the outcome of a potential select
         * call.
         *
         * @param selectSupplier The supplier with the result of a select result.
         * @param hasTasks true if tasks are waiting to be processed.
         * @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
         *         the next step should be to not select but rather jump back to the IO loop and try
         *         again. Any value >= 0 is treated as an indicator that work needs to be done.
         */
        int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
    }
    

    SelectStrategy提供了三种默认的select策略,即SELECT、CONTINUE、BUSY_WAIT。netty中实现了一个默认的DefaultSelectStrategy,它的计算select策略的方式是:

        @Override
        public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
            return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
        }
    

    如果当前EventLoop任务队列中没有任务,就执行SELECT策略,即阻塞的select。如果有的话,就返回当前NioEventLoop中持有的Selector对象的selectNow()方法的返回值,就绪的IO事件的数量。也就是不选择任何select模式。这个过程中其实已经执行了一次非阻塞的selectNow操作

        private final IntSupplier selectNowSupplier = new IntSupplier() {
            @Override
            public int get() throws Exception {
                return selectNow();
            }
        };
    
        int selectNow() throws IOException {
            try {
                return selector.selectNow();
            } finally {
                // restore wakeup state if needed
                if (wakenUp.get()) {
                    selector.wakeup();
                }
            }
        }
    

    在获取到需要执行的IO select策略后,就选择执行具体的内容,我们可以看到,CONTINUE对应的执行方法就是不执行接下来的逻辑,重新执行select策略的选择。而NIO不支持忙等操作,所以BUSY_WAIT的逻辑和SELECT的逻辑是一致性的,都调用了select(wakenUp.getAndSet(false));方法。这里,我们先要清楚wakenUp这个成员变量的含义,我们先看一下这块内容:

        /**
         * Boolean that controls determines if a blocked Selector.select should
         * break out of its selection process. In our case we use a timeout for
         * the select method and the select method will block for that time unless
         * waken up.
         */
        private final AtomicBoolean wakenUp = new AtomicBoolean();
    

    wekenUp的含义是:控制阻塞的Selector.select在执行的过程中,是否允许被打断。在使用Selector.select的过程中,select方法会被设置超时时间,设置wekenUp为ture时,Selector.select超时后不会继续重新再次被调用。
    清楚了wekenUp这个参数的含义后,我们看一下NioEventLoop的具体select操作是什么逻辑:

        private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                int selectCnt = 0;
                long currentTimeNanos = System.nanoTime();
                //计算出select阻塞时间的最后截止时间,这个时间计算的方式是当前时间加上提交到当前EventLoop中的最近需要执行的定时任务的延迟时间
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
    
                for (;;) {
                    // 计算出select的阻塞时间,加500000L是为了始终进位。如果整个select操作执行的时间超过了selectDeadLineNanos,整个方法就结束
                    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                    if (timeoutMillis <= 0) {
                        if (selectCnt == 0) {
                            selector.selectNow();
                            selectCnt = 1;
                        }
                        break;
                    }
    
                    // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                    // Selector#wakeup. So we need to check task queue again before executing select operation.
                    // If we don't, the task might be pended until select operation was timed out.
                    // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                    // 如果任务被添加进来,并且任务中想要调用Selector#wakeup方法让Selector提前从阻塞的select方法中返回的话,如果不执行下面操作,就实现不了这个效果,只能等Selector的select方法阻塞timeoutMillis时间后返回。
                    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
    
                    int selectedKeys = selector.select(timeoutMillis);
                    selectCnt ++;
    
                    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                        // - Selected something,
                        // - waken up by user, or
                        // - the task queue has a pending task.
                        // - a scheduled task is ready for processing
                        break;
                    }
                    if (Thread.interrupted()) {
                        // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                        // As this is most likely a bug in the handler of the user or it's client library we will
                        // also log it.
                        //
                        // See https://github.com/netty/netty/issues/2426
                        if (logger.isDebugEnabled()) {
                            logger.debug("Selector.select() returned prematurely because " +
                                    "Thread.currentThread().interrupt() was called. Use " +
                                    "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                        }
                        selectCnt = 1;
                        break;
                    }
    
                    long time = System.nanoTime();
                    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                        // timeoutMillis elapsed without anything selected.
                        selectCnt = 1;
                    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                        // The code exists in an extra method to ensure the method is not too big to inline as this
                        // branch is not very likely to get hit very frequently.
                        selector = selectRebuildSelector(selectCnt);
                        selectCnt = 1;
                        break;
                    }
    
                    currentTimeNanos = time;
                }
    
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                }
            } catch (CancelledKeyException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
                // Harmless exception - log anyway
            }
        }
    

    我们来分析一下这段代码的逻辑(在此之前,我们先要清楚,理论上只有当当前EventLoop的任务队列中没有任务的时候才会调用select这个方法SelectStrategy中的逻辑,只有hasTask()是false的时候才返回SELECT,在在调用EventLoop的select方法之前,wakenUp会被设置成false)。首先,计算出select操作最长的阻塞时间timeoutMillis。然后判断hasTasks()的返回值,即EventLoop中是否有添加的任务,如果有的话就说明我们在之前的SelectStrategy选择select策略之后,又有新的任务添加进来了,这个时候为了防止新添加的任务要等到select操作阻塞完成才有机会执行,就做了一个判断:当前的wekenUp如果是false,就设置成ture,然后执行一个非阻塞的selector.selectNow后跳出NioEventLoop.select;否则就继续执行接下来的逻辑。也就是执行Selector.select阻塞操作。selector.selectNow方法结束后会判断,是否有就绪的IO事件,当一下情况满足任意一条就跳出循环结束EventLoop.select方法:有就绪的IO事件、wakenUp在NioEventLoop.select调用之前是true、当前EventLoop有提交的立即执行的任务、当前EventLoop中有提交的定时执行的任务。如果不满足任意情况,就判断是否当前线程有中断状态,有的话也跳出循环。最后判断循环的总时间是否大于设置的Selector.select的超时时间,判断Selector.select是不是因为超时而结束。如果是因为超时而结束就将selectCnt设置为1,继续循环;不是的话就判断循环的次数是否大于SELECTOR_AUTO_REBUILD_THRESHOLD,是的话就跳出循环,这块是为了解决jdk6的关于NIO的bug,我们可以先不用管这个逻辑。到此整个NioEventLoop.select的过程就结束了。这个过程看起来非常乱,我们要弄清楚整个流程,首先要先明白wakenUp这个属性的生命周期。我们可以看到,wakenUp这个属性提供给外部的修改窗口只有一个:

        @Override
        protected void wakeup(boolean inEventLoop) {
            if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
                selector.wakeup();
            }
        }
    

    这个方法是protected修饰的,也就是说,这个方法是不提供其他包调用的,所以这个方法是一个netty内部的调用方法,我们可以搜索到这个方法在哪里使用:

    wakeup方法被调用处
    我们可以看到,这个方法主要是在停机的时候调用的。为的就是在停机的时候将Selector.select从阻塞中唤醒。
    细心地朋友也许会发现,NioEventLoop.select方法在调用之前,会把wakenUp设置为false,这是为什么呢?为的就是在外部调用NioEventLoop.wakeup方法的时候wakenUp.compareAndSet(false, true)这个会设置成功,然后可以调用selector.wakeup()将Selector唤醒。

    到这里,我们再回过头去看NioEventLoop.select方法,这个方法的作用其实就是:调用Selector.select方法来阻塞地获取就绪的IO事件,并且在任何时候都可以响应weakUp操作,如果NioEventLoop中添加定时任务,NioEventLoop.select会执行的时间最多就是到最近定时任务执行的时间,没有定时任务就最多执行1s。这样去理解是不是简单多了!!!

    细心的朋友可能会问:为什么要限制NioEventLoop.select的执行时间最长到下一个定时任务执行的时间呢?我们先带着疑问继续往下看NioEventLoop.run方法。

    在结束了select操作之后,继续判断一下wakenUp的标志,如果设置为ture,就调用selector.wakeup();使下一次的selector.select非阻塞。
    随后会获取当前的ioRatio,我们之前提过这个参数,这个参数是设置我们的IO操作在整个事件执行中的时间占比的,我们看一下下面的具体逻辑。首先,会判断ioRatio是不是设置100,如果是设置百分之百,就先执行processSelectedKeys(),再执行runAllTasks(),不设置事件占比限制。如果ioRatio不是100,就先执行processSelectedKeys(),并且记录下processSelectedKeys()的执行时间,然后计算出剩余时间,使用这个剩余时间来限制runAllTasks方法。这两个方法就是干什么的呢?这里我先给出答案:processSelectedKeys()的作用是处理所有的就绪的selectedKeys,也就是就绪的IO事件;而runAllTasks这两个重载方法就是执行所有的提交任务。到这里,我们可以明白为什么要限制NioEventLoop.select的执行时间最长到下一个定时任务开始执行的时间了。因为IO处理和任务执行是在一个线程里执行的,如果不限制NioEventLoop.select的执行时间,到达下一个定时任务需要执行的时间的时候,有可能整个线程还阻塞在select方法上!
    接下来我们继续分析processSelectedKeys()runAllTasks分别是怎么处理IO事件和提交的任务的。我们先来看一下processSelectedKeys()

        private void processSelectedKeys() {
            //判断是否使用了优化过的Selector,是的话就循环数组,不是的话就循环iterator。
            if (selectedKeys != null) {
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    
        private void processSelectedKeysOptimized() {
            for (int i = 0; i < selectedKeys.size; ++i) {
                final SelectionKey k = selectedKeys.keys[i];
                // null out entry in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.keys[i] = null;
    
                final Object a = k.attachment();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (needsToSelectAgain) {
                    // null out entries in the array to allow to have it GC'ed once the Channel close
                    // See https://github.com/netty/netty/issues/2363
                    selectedKeys.reset(i + 1);
    
                    selectAgain();
                    i = -1;
                }
            }
        }
    

    首先方法会判断我们是否使用数组来替代Map优化Selector,这个是我们上边分析NioEventLoop的构造方法讲的。我们这里只看优化的方法,其实两个逻辑都是一样的,只是循环的方法不一样。整个执行过程就是遍历就绪的SelectionKey。然后交给processSelectedKey的两个重载方法去处理。这里会根据SelectionKey的attachment对象的类型来判断调用哪个重载方法。我们先不用管这个attachment对象是什么时候被添加的,这个会在我们只会的分析中讲到,我们先来看一下这两个方法的逻辑:

        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                    // If the channel implementation throws an exception because there is no event loop, we ignore this
                    // because we are only trying to determine if ch is registered to this event loop and thus has authority
                    // to close ch.
                    return;
                }
                // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
                // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
                // still healthy and should not be closed.
                // See https://github.com/netty/netty/issues/5125
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
                // the NIO JDK channel implementation may throw a NotYetConnectedException.
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
    
                // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
    
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    
        private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
            int state = 0;
            try {
                task.channelReady(k.channel(), k);
                state = 1;
            } catch (Exception e) {
                k.cancel();
                invokeChannelUnregistered(task, k, e);
                state = 2;
            } finally {
                switch (state) {
                case 0:
                    k.cancel();
                    invokeChannelUnregistered(task, k, null);
                    break;
                case 1:
                    if (!k.isValid()) { // Cancelled by channelReady()
                        invokeChannelUnregistered(task, k, null);
                    }
                    break;
                }
            }
        }
    

    processSelectedKey(SelectionKey k, AbstractNioChannel ch):我们方法开始的验证逻辑先不看,主要看下面的事件就绪的逻辑。首先,会获取就绪的事件,判断就绪的事件中是否包含连接事件,如果包含,就将当前
    SelectionKey的连接就绪事件从SelectionKey的感兴趣的事件中剔除掉,然后将就绪事件交给就绪的AbstractNioChannel的unsafe去处理,调用的是unsafe.finishConnect()方法。具体处理逻辑我们本篇不做分析。然后就是去判断就绪的事件中是否包含了写就绪、读就绪、ACCEPT事件,包含的话都是委托给unsafe的对应方法。
    processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task):这个方法很简单,就是执行NioTaskchannelReady方法,如果执行失败了,就执行channelUnregistered方法。我们这里可以猜测NioTask是一个IO就绪事件的回掉方法。
    IO就绪事件的处理逻辑很简单,我们接下里看一下提交任务的处理逻辑,我们只看可以设置超时时间的重载方法protected boolean runAllTasks(long timeoutNanos)

        protected boolean runAllTasks(long timeoutNanos) {
            //把定时任务队列里到达执行时间的任务添加到非定时任务队列
            fetchFromScheduledTaskQueue();
            //从非定时任务队列获取任务
            Runnable task = pollTask();
            if (task == null) {
                afterRunningAllTasks();
                return false;
            }
    
            final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                safeExecute(task);
    
                runTasks ++;
    
                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
    
                task = pollTask();
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            afterRunningAllTasks();
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }
    

    首先,将到达执行时间的定时任务添加到非定时任务的执行列表中,然后从费定时任务列表中获取任务,没有的话就执行afterRunningAllTasks();,这是一个开放方法,我们这里先不看具体内容。如果有任务,就加入循环中,循环的内容就是:调用safeExecute来执行任务,其实就是在try-cache中执行任务,防止有异常终止。然后已经执行的方法计数加以,判断调用runAllTasks执行的任务个数和0x3F的与是不是0,也就是是不是64的倍数。如果是就检查任务执行的时间有没有超过设置的超时时间,超过了就结束循环,然后调用afterRunningAllTasks();。没有超时的话就继续获取任务。这个逻辑也比较简单。

    分析到这里我们就把NioEventLoop.run方法分析完了。run方法的作用用一句话概括就是处理就绪的IO事件和提交的任务。那么问题来了,这个run方法在什么时候被调用呢?我们一路跟着调用链寻找会发现,在NioEventLoop父类SingleThreadEventExecutorexecute(Runnable task)方法被调用的时候就调用了run()方法。当然run()方法是一个重载方法,我们上面分析的是NioEventLoop的实现。
    这里我们的NioEventLoop的关键代码分析就基本上结束了。

    三、复盘

    本篇我们分析了NioEventLoopNioEventLoop除了可以执行提交的任务之外,还可以监听注册的Channel的IO事件,并且可以根据ioRatio来控制两者执行的时间占比。这都是通过它的run()方法来执行的。
    那么,NioEventLoop在netty中的定位也显而易见了:真正的任务执行者。在EventLoop的基础上,netty实现了一个抽象类SingleThreadEventLoopSingleThreadEventLoop还继承了SingleThreadEventExecutor,这就使SingleThreadEventLoop具有一个开放性的模板方法:run()方法,我们可以通过run()来实现自己的任务处理逻辑。而NioEventLoop就是通过实现run()方法来定制自己可以同时处理提交的任务和就绪的IO事件的能力。
    下篇,我们会分析,netty是怎么将各个组件串联起来的。

    相关文章

      网友评论

          本文标题:三、netty源码分析之EventLoop

          本文链接:https://www.haomeiwen.com/subject/jrloectx.html