美文网首页
二、netty源码分析之EventLoopGroup

二、netty源码分析之EventLoopGroup

作者: 丑星星 | 来源:发表于2019-09-28 09:29 被阅读0次

    一、EventLoopGroup功能概述

    EventLoopGroup是netty中一个比较核心的组件,想要知道EventLoopGroup的功能,我们先看一下EventLoopGroup的类图关系:

    EventLoopGroup
    Exector是java的JUC包中定义的一个接口,我们可以看一下具体定义:
    public interface Executor {
    
        /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         *
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);
    }
    

    从代码的注释中我们可以看到,Exector定义的execute方法的作用。即执行提交的任务。执行可以是在一个的线程中、或者在线程池中、也可以在方法调用线程中。总的来说,Exector开放了提交任务执行的能力。

    接下来是ExecutorService接口。它继承自Exector,新定义了以下方法:

    public interface ExecutorService extends Executor {
        /**
         * 停止接受新提交的任务,已经提交的等待中的任务会执行完成
         */
        void shutdown();
         /**
         * 停止提交新的任务,已经提交的等待中的任务
         * 也会停止等待,返回等待中的任务表
         */
        List<Runnable> shutdownNow();
        /**
         * 返回执行器是否被停止
         */
        boolean isShutdown();
        /**
         * 返回执行器在shutdown之后,是否所有的任务都被执行完。
         *shutdown()或者shutdownNow()被调用后才会终止。
         */
        boolean isTerminated();
        /**
         * 阻塞直到所有的任务都执行完,或者超时,或者线程受到interrupted信号
         */
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
        /**
         * 提交有返回值的任务
         */
        <T> Future<T> submit(Callable<T> task);
        /**
         * 提交一个任务,任务成功Future返回给定的result
         */
        <T> Future<T> submit(Runnable task, T result);
        /**
         * 提交一个任务,任务成功Future返回null
         */
        Future<?> submit(Runnable task);
        /**
         * 执行给定的任务列表,当执行完成后返回持有执行状态和执行结果的Future列表
         */
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
       /**
         * 执行给定的任务列表,当执行完或者超时后返回
         *持有执行状态和执行结果的Future列表
         */
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
       /**
         * 执行给定的任务列表,如果所有任务都执行成功,返回其中的一个结果
         */
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
       /**
         * 执行给定的任务列表,如果所有任务都在给定的时
         * 间内执行成功,返回其中的一个结果
         */
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    

    可以看到ExecutorService这个接口开放了提交有返回结果的任务的能力,同时开放了停止执行器的能力。

    接下来是ScheduledExecutorService

    public interface ScheduledExecutorService extends ExecutorService {
       /**
         * 创建并且执行一个一次性的任务,这个任务在给定的延迟事件之后才可以被执行
         */
        public ScheduledFuture<?> schedule(Runnable command,
                                           long delay, TimeUnit unit);
    
          /**
         * 创建并且执行一个带有返回值的一次性的任务,
         * 这个任务在给定的延迟事件之后才可以被执行
         */
        public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                               long delay, TimeUnit unit);
       /**
         * 创建并且执行一个周期性的任务,任务的开始时间是固定间隔的
         */
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit);
       /**
         * 创建并且执行一个周期性的任务,一个任务执行完成到
         * 下个任务执行开始之间的时间间隔是固定的
         */
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit);
    }
    

    我们可以看到,ScheduledExecutorService具有延迟执行和周期执行任务的能力。

    接下来是EventExecutorGroup接口,EventExecutorGroup的方法比较多,我们这里只列出部分关键方法:

    public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
    
        boolean isShuttingDown();
    
        Future<?> shutdownGracefully();
    
        Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
    
        /**
         * Returns the {@link Future} which is notified when all {@link EventExecutor}s managed by this
         * {@link EventExecutorGroup} have been terminated.
         */
        Future<?> terminationFuture();
    
        /**
         * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
         */
        EventExecutor next();
    
        @Override
        Iterator<EventExecutor> iterator();
    }
    

    我们可以看到EventExecutorGroup主要提供了两个能力:一是优雅停机的能力,优雅停机这块我们本篇先不去分析,放到接下来的笔记中去分析;第二个就是执行器调度的能力,通过next()方法来返回下一个要执行任务的EventExecutor

    最后就是EventLoopGroup了。我们看一下这个接口的代码:

    public interface EventLoopGroup extends EventExecutorGroup {
        /**
         * Return the next {@link EventLoop} to use
         */
        @Override
        EventLoop next();
    
        /**
         * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
         * will get notified once the registration was complete.
         */
        ChannelFuture register(Channel channel);
    
        /**
         * Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed
         * {@link ChannelFuture} will get notified once the registration was complete and also will get returned.
         */
        ChannelFuture register(ChannelPromise promise);
    
        /**
         * Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
         * will get notified once the registration was complete and also will get returned.
         *
         * @deprecated Use {@link #register(ChannelPromise)} instead.
         */
        @Deprecated
        ChannelFuture register(Channel channel, ChannelPromise promise);
    }
    

    从上面的代码可以看到,EventLoopGroup又新增了注册Channel的方法。到这里为止,我们可以给EventLoopGroup下个定义:有优雅停机功能、可以注册Channel的事件执行器。到现在我们只看了EventLoopGroup这个接口,也许会对这个接口的理解比较模糊,接下来我们就结合EventLoopGroup这个接口的一个实现类来看看EventLoopGroup这个接口在netty中究竟是扮演者一个什么角色。我们下面拿我们开发中比较常用的NioEventLoopGroup来具体分析一下。

    从NioEventLoopGroup来看EventLoopGroup在netty中扮演的角色

    我们看一下NioEventLoopGroup的部分内容:

    public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    
        public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
            final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
        }
    
        public NioEventLoopGroup(
                int nThreads, Executor executor, final SelectorProvider selectorProvider) {
            this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                    RejectedExecutionHandlers.reject());
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory,
                                 final RejectedExecutionHandler rejectedExecutionHandler) {
            super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory,
                                 final RejectedExecutionHandler rejectedExecutionHandler,
                                 final EventLoopTaskQueueFactory taskQueueFactory) {
            super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                    rejectedExecutionHandler, taskQueueFactory);
        }
    
        /**
         * Sets the percentage of the desired amount of time spent for I/O in the child event loops.  The default value is
         * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
         */
        public void setIoRatio(int ioRatio) {
            for (EventExecutor e: this) {
                ((NioEventLoop) e).setIoRatio(ioRatio);
            }
        }
    
        /**
         * Replaces the current {@link Selector}s of the child event loops with newly created {@link Selector}s to work
         * around the  infamous epoll 100% CPU bug.
         */
        public void rebuildSelectors() {
            for (EventExecutor e: this) {
                ((NioEventLoop) e).rebuildSelector();
            }
        }
    
        @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
        }
    }
    

    NioEventLoopGroup的内容其实并没有多少,逻辑大部分都在其父类之中。除了构造方法之外,只有三个方法:
    1、setIoRatio(int ioRatio)设置IO运行率,方法内容也很简单,就是遍历自身持有的NioEventLoop对象,并且设置NioEventLoop的ioRatio参数。
    2、rebuildSelectors()遍历自身持有的NioEventLoop对象,调用NioEventLoop对象的rebuildSelector()方法。
    3、EventLoop newChild(Executor executor, Object... args)创建一个NioEventLoop对象。这是一个父类的模板方法。

    父类MultithreadEventLoopGroup是一个抽象类,这个抽象类实现了几个重载的register方法。方法内容都是调用父类的next()方法获取自身的一个EventLoop对象,然后把需要注册的Channel、ChannelPromise参数注册到EventLoop。

        @Override
        public EventLoop next() {
            return (EventLoop) super.next();
        }
        @Override
        public ChannelFuture register(Channel channel) {
            return next().register(channel);
        }
        @Override
        public ChannelFuture register(ChannelPromise promise) {
            return next().register(promise);
        }
    

    next()方法就是实现EventLoopGroupEventLoop调度的方法(EventLoop是真正执行任务的执行器,后面我们会说到)。next()方法是怎么实现对EventLoop调度的呢?我们可以看到这个方法调用了EventExecutorChooser的实现类的next()方法。netty默认提供了两个实现类,一个是GenericEventExecutorChooser,另一个是PowerOfTwoEventExecutorChooser,这两个选择器都是轮询EventLoopGroup持有的EventLoop。这两个选择器都是轮询EventLoop,有什么区别呢?我们从这两个实现类可以看出netty对性能追求的极致之处:

    // GenericEventExecutorChooser的选择方法
    return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    
    // PowerOfTwoEventExecutorChooser的选择方法
    return executors[idx.getAndIncrement() & executors.length - 1];
    

    GenericEventExecutorChooser是通用的轮询方法;而PowerOfTwoEventExecutorChooser是专门用来处理当EventLoop数量是2的次方数时的情况,用位运算取idx的低位(低log2 (executors.length) 位)。netty不会放过哪怕这一点点对性能的优化!!!

    我们继续按图索骥看一下MultithreadEventLoopGroup的父类MultithreadEventExecutorGroup。我们可以看到,MultithreadEventLoopGroup实现了停机的功能,不过都是调用的持有的EventExecutor的对应方法,我们这里就不详细分析了。MultithreadEventExecutorGroup最主要的逻辑在它的构造方法中,构造方法实现了对EventExecutor的初始化等工作,我们详细看一下这块内容:

        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            chooser = chooserFactory.newChooser(children);
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    

    首先,判断参数传入的Executor执行器对象是否是空,如果是空,则初始化一个默认的实现类ThreadPerTaskExecutor,我们从这个类的名字可以看出它的能力,每个任务一个线程,我们看它的实现也确实如此:

        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    

    每次提交任务的时候,都会调用线程工厂创建一个线程来执行任务!这个Executor对象是最终用来执行任务的,不过却不是EventLoopGroup调度的内容。我们继续往下看MultithreadEventExecutorGroup的构造方法。初始化EventExecutor[] children这个成员变量,新建一个大小为nThreads的数组,然后循环为数组元素赋值。这个EventExecutor[] children才是EventLoopGroup真正调度的内容。在填充数组内容的时候,调用了子类的模板方法newChild(),在之前我们在NioEventLoopGroup中看到的newChild()方法就是这个模板方法的一个实现。

    children数组填充完成之后,是初始化成员变量chooser的操作,chooser就是用来调度所有执行器,我们上面已经分析过了,这里不再赘述。
    接下来是对EventExecutor[] children中的每个执行器添加终止监听器,确保EventExecutor[] children中的所有执行器都终止后,会调用设置成员变量terminationFuture的状态。

    最后是对readonlyChildren这个成员变量赋值,看这个变量的名字我们也能猜出来这个成员变量是children变量的只读版本,事实也的确如此。

    复盘

    我们分析了NioEventLoop的核心源码,整个内容也许有点绕,我们这里再把NioEventLoop的类图贴出来帮助大家理解:

    NioEventLoopGroup
    我们可以看到,EventExecutorGroup以及其父接口,开放了任务提交、任务执行、优雅停机等能力。而实现类AbstractEventExecutorGroup这个抽象类实现了任务提交的基本方法,MultithreadEventExecutorGroup则实现了多线程任务调度的基本方法。子类MultithreadEventLoopGroup不仅继承了MultithreadEventExecutorGroup这个抽象了,具备了多线程执行调度的基本能力,而且还实现了EventLoopGroup接口,具备注册Channel的能力。其对外开放的变化点仅仅有创建执行器这个功能。我们继承MultithreadEventExecutorGroup仅仅可以通过实现newChild这个抽象方法来定制自己的EventLoop事件处理器(不考虑方法重写)。最后,NioEventLoopGroup继承了MultithreadEventExecutorGroup,通过实现newChild方法,来指定事件处理器是NioEventLoop,下篇我们会分析NioEventLoop的功能。

    到这里我们基于NioEventLoopGroup分析了EventLoopGroup的基本能力。EventLoopGroup可以注册Channel,然后具有任务执行、选择执行器去执行任务,也就是执行器调度的能力。也许现在大家对EventLoopGroup的功能还是比较模糊,不过没关系,加下来我们会继续分析EventLoopBootStrap等关键组件。等分析完这些内容,大家对netty的理解就会更清晰。

    相关文章

      网友评论

          本文标题:二、netty源码分析之EventLoopGroup

          本文链接:https://www.haomeiwen.com/subject/thxvectx.html