美文网首页
Netty源码_NioEventLoopGroup详解

Netty源码_NioEventLoopGroup详解

作者: wo883721 | 来源:发表于2021-10-27 18:47 被阅读0次

    上一章介绍NioEventLoop 的实现原理,但是我们在 netty 一般都是直接使用 NioEventLoopGroup 类,直接创建一个事件轮询器组。

    • 事件轮询器组EventExecutorGroup的实现和事件轮询器EventExecutor 的实现,其实两个分支。
    • 我们前面几章已经详细讲解了事件轮询器 EventExecutor 的实现逻辑,下面我们讲解事件轮询器组EventExecutorGroup的实现,它的实现比较简单。

    一. AbstractEventExecutorGroup类

    /**
     * EventExecutorGroup实现的抽象基类。
     */
    public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
        @Override
        public Future<?> submit(Runnable task) {
            // 通过 next() 方法,选择管理的一个 EventExecutor 处理任务 task
            return next().submit(task);
        }
    
        @Override
        public <T> Future<T> submit(Runnable task, T result) {
            // 通过 next() 方法,选择管理的一个 EventExecutor 处理任务 task
            return next().submit(task, result);
        }
    
        @Override
        public <T> Future<T> submit(Callable<T> task) {
            // 通过 next() 方法,选择管理的一个 EventExecutor 处理任务 task
            return next().submit(task);
        }
    
        @Override
        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
            // 通过 next() 方法,选择管理的一个 EventExecutor 处理计划任务
            return next().schedule(command, delay, unit);
        }
    
        @Override
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            // 通过 next() 方法,选择管理的一个 EventExecutor 处理计划任务
            return next().schedule(callable, delay, unit);
        }
    
        @Override
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
            // 通过 next() 方法,选择管理的一个 EventExecutor 处理计划任务
            return next().scheduleAtFixedRate(command, initialDelay, period, unit);
        }
    
        @Override
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
            // 通过 next() 方法,选择管理的一个 EventExecutor 处理计划任务
            return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
        }
    
        @Override
        public Future<?> shutdownGracefully() {
            // 优雅关闭 EventExecutorGroup
            // 它会关闭所管理的所有 EventExecutor
            return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
        }
    
        /**
         * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
         */
        @Override
        @Deprecated
        public abstract void shutdown();
    
        /**
         * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
         */
        @Override
        @Deprecated
        public List<Runnable> shutdownNow() {
            shutdown();
            return Collections.emptyList();
        }
    
        @Override
        public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
                throws InterruptedException {
            // 通过 next() 方法,选择管理的一个 EventExecutor 处理任务
            return next().invokeAll(tasks);
        }
    
        @Override
        public <T> List<java.util.concurrent.Future<T>> invokeAll(
                Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
            // 通过 next() 方法,选择管理的一个 EventExecutor 处理任务
            return next().invokeAll(tasks, timeout, unit);
        }
    
        @Override
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
            // 通过 next() 方法,选择管理的一个 EventExecutor 处理任务
            return next().invokeAny(tasks);
        }
    
        @Override
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            // 通过 next() 方法,选择管理的一个 EventExecutor 处理任务
            return next().invokeAny(tasks, timeout, unit);
        }
    
        @Override
        public void execute(Runnable command) {
            // 通过 next() 方法,选择管理的一个 EventExecutor 处理任务
            next().execute(command);
        }
    }
    

    这个是事件轮询器组EventExecutorGroup 实现的抽象基类。

    你会发现它所有处理任务的方法,都是通过 next() 交给它所管理的子事件轮询器EventExecutor 来处理。

    二. MultithreadEventExecutorGroup 类

    /**
     * EventExecutorGroup实现的抽象基类,
     * 它可以同时用多个线程处理它们的任务。
     */
    public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    
        // 该事件轮询器组所管理的所有事件轮询器
        private final EventExecutor[] children;
        // 不可修改的事件轮询器集合
        private final Set<EventExecutor> readonlyChildren;
        // 记录处于终止状态的子事件轮询器的数量
        private final AtomicInteger terminatedChildren = new AtomicInteger();
        private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
        // 子事件轮询器的选择器
        private final EventExecutorChooserFactory.EventExecutorChooser chooser;
    
        /**
         * 创建一个新实例。
         * @param nThreads          该事件轮询器组的线程数
         * @param threadFactory     线程创建工厂
         * @param args              将传递给每个newChild(Executor, Object…)调用的参数
         */
        protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
            this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
        }
    
        /**
         * 创建一个新实例。
         * @param nThreads          该事件轮询器组的线程数
         * @param executor          线程创建工厂
         * @param args              将传递给每个newChild(Executor, Object…)调用的参数
         */
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
            this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
        }
    
        /**
         * 创建一个新实例。
         * @param nThreads          该事件轮询器组的线程数
         * @param executor
         * @param chooserFactory    子事件轮询器的选择器
         * @param args              将传递给每个newChild(Executor, Object…)调用的参数
         */
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            checkPositive(nThreads, "nThreads");
    
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            // 根据线程数 nThreads,创建所管理的 EventExecutor 数组
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    // 通过 newChild 方法,创建子的 EventExecutor 实例
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    // 如果创建失败,需要关闭已经创建的子的 EventExecutor
                    if (!success) {
    
                        for (int j = 0; j < i; j ++) {
                            // 关闭创建的子事件执行器 EventExecutor
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                // 通过 while 循环,确保子事件执行器 EventExecutor 都已经终止 Terminated
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            // 创建子事件执行器 EventExecutor 的选择器 chooser
            chooser = chooserFactory.newChooser(children);
    
            // 创建一个子事件执行器终止的监听器
            // 当所有子事件执行器都终止后,就表示这个EventExecutorGroup也终止
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
    
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        // 当所有子事件执行器都终止,
                        // 调用 EventExecutorGroup 的setSuccess 方法,通知关闭
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                // 每个子事件执行器都添加监听器
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            // 返回一个包装的不可修改集合
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    
        protected ThreadFactory newDefaultThreadFactory() {
            return new DefaultThreadFactory(getClass());
        }
    
        @Override
        public EventExecutor next() {
            // 通过选择器返回下一个子事件轮询器
            return chooser.next();
        }
    
        @Override
        public Iterator<EventExecutor> iterator() {
            return readonlyChildren.iterator();
        }
    
        /**
         * 返回此事件轮询器组使用的EventExecutor的数量。
         * 这个数字是它所使用的线程的1:1映射。
         */
        public final int executorCount() {
            return children.length;
        }
    
        /**
         * 子类必须实现,创建属于自己的事件轮询器 EventExecutor
         */
        protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
    
        @Override
        public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
            // 通过循环遍历,关闭所管理的所有子事件轮询器 EventExecutor
            for (EventExecutor l: children) {
                l.shutdownGracefully(quietPeriod, timeout, unit);
            }
            return terminationFuture();
        }
    
        @Override
        public Future<?> terminationFuture() {
            return terminationFuture;
        }
    
        @Override
        @Deprecated
        public void shutdown() {
            // 通过循环遍历,关闭所管理的所有子事件轮询器 EventExecutor
            for (EventExecutor l: children) {
                l.shutdown();
            }
        }
    
        @Override
        public boolean isShuttingDown() {
            // 只有当所有子事件轮询器 EventExecutor 都isShuttingDown,返回true
            for (EventExecutor l: children) {
                if (!l.isShuttingDown()) {
                    return false;
                }
            }
            return true;
        }
    
        @Override
        public boolean isShutdown() {
            // 只有当所有子事件轮询器 EventExecutor 都isShutdown,返回true
            for (EventExecutor l: children) {
                if (!l.isShutdown()) {
                    return false;
                }
            }
            return true;
        }
    
        @Override
        public boolean isTerminated() {
            // 只有当所有子事件轮询器 EventExecutor 都 isTerminated,返回true
            for (EventExecutor l: children) {
                if (!l.isTerminated()) {
                    return false;
                }
            }
            return true;
        }
    
        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException {
            long deadline = System.nanoTime() + unit.toNanos(timeout);
            loop: for (EventExecutor l: children) {
                for (;;) {
                    long timeLeft = deadline - System.nanoTime();
                    if (timeLeft <= 0) {
                        break loop;
                    }
                    if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
                        break;
                    }
                }
            }
            return isTerminated();
        }
    }
    

    这个 MultithreadEventExecutorGroup 类其实为事件轮询器组EventExecutorGroup 奠定了基础啊。
    在它的构造方法中,我们发现:

    1. 它会为每个线程创建一个事件轮询器EventExecutor,即线程和事件轮询器一一对应。
    2. 它会创建一个子事件轮询器选择器 EventExecutorChooser 实例。
    3. 提供了 EventExecutor newChild(Executor executor, Object... args) 抽样方法,让子类实现,返回子类对应的事件轮询器类型实例。

    三. EventExecutorChooser 类

    /**
     * 创建新的 EventExecutorChooser 的工厂。
     */
    @UnstableApi
    public interface EventExecutorChooserFactory {
    
        /**
         * Returns a new {@link EventExecutorChooser}.
         */
        EventExecutorChooser newChooser(EventExecutor[] executors);
    
        /**
         * 选择要使用的下一个EventExecutor。
         */
        @UnstableApi
        interface EventExecutorChooser {
    
            /**
             * 返回要使用 EventExecutor。
             */
            EventExecutor next();
        }
    }
    
    /**
     * 默认的 EventExecutorChooserFactory 实现
     */
    @UnstableApi
    public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
    
        public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
    
        private DefaultEventExecutorChooserFactory() { }
    
        @Override
        public EventExecutorChooser newChooser(EventExecutor[] executors) {
            if (isPowerOfTwo(executors.length)) {
                // 如果是 2 的幂数,可以使用 & 运算得到余数,效率更快
                return new PowerOfTwoEventExecutorChooser(executors);
            } else {
                // 不是 2 的幂数,只能使用 % 运算得到余数
                return new GenericEventExecutorChooser(executors);
            }
        }
    
        private static boolean isPowerOfTwo(int val) {
            // 返回是不是 2 的幂数
            return (val & -val) == val;
        }
    
        private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
            private final AtomicInteger idx = new AtomicInteger();
            private final EventExecutor[] executors;
    
            PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
    
            @Override
            public EventExecutor next() {
                // 因为 executors.length 是 2的幂数,可以使用 & 运算得到余数
                return executors[idx.getAndIncrement() & executors.length - 1];
            }
        }
    
        private static final class GenericEventExecutorChooser implements EventExecutorChooser {
            private final AtomicLong idx = new AtomicLong();
            private final EventExecutor[] executors;
    
            GenericEventExecutorChooser(EventExecutor[] executors) {
                this.executors = executors;
            }
    
            @Override
            public EventExecutor next() {
                // 只能使用 % 运算得到余数
                return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
            }
        }
    }
    

    netty 提供的默认实现中,如果子事件轮询器的数量是 2 的幂数,那么可能更快一点。

    四. MultithreadEventLoopGroup 类

    /**
     * EventLoopGroup实现的抽象基类,
     * 它可以同时用多个线程处理它们的任务。
     */
    public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    
        private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
    
        private static final int DEFAULT_EVENT_LOOP_THREADS;
    
        static {
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
            }
        }
    
        /**
         * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
         */
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        }
    
        /**
         * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
         */
        protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
        }
    
        /**
         * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor,
         * EventExecutorChooserFactory, Object...)
         */
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                         Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
        }
    
        @Override
        protected ThreadFactory newDefaultThreadFactory() {
            return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
        }
    
        @Override
        public EventLoop next() {
            return (EventLoop) super.next();
        }
    
        @Override
        protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
    
        @Override
        public ChannelFuture register(Channel channel) {
            return next().register(channel);
        }
    
        @Override
        public ChannelFuture register(ChannelPromise promise) {
            return next().register(promise);
        }
    
        @Deprecated
        @Override
        public ChannelFuture register(Channel channel, ChannelPromise promise) {
            return next().register(channel, promise);
        }
    }
    

    这个类没有啥可讲解的。

    五. NioEventLoopGroup 类

    /**
     * MultithreadEventLoopGroup 实现,用于基于NIO选择器的通道。
     */
    public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    
        /**
         * 使用默认线程数、默认ThreadFactory 和SelectorProvider (由SelectorProvider.provider()返回)
         * 创建一个新实例。
         */
        public NioEventLoopGroup() {
            this(0);
        }
    
        /**
         * 使用指定数量的线程、默认ThreadFactory 和 SelectorProvider(由SelectorProvider.provider()返回)
         * 创建一个新实例。
         */
        public NioEventLoopGroup(int nThreads) {
            this(nThreads, (Executor) null);
        }
    
        /**
         * 使用默认线程数、给定的ThreadFactory 和SelectorProvider(由SelectorProvider.provider()返回)
         * 创建一个新实例。
         */
        public NioEventLoopGroup(ThreadFactory threadFactory) {
            this(0, threadFactory, SelectorProvider.provider());
        }
    
        /**
         * 使用指定数量的线程、给定的ThreadFactory和SelectorProvider(由SelectorProvider.provider()返回)
         * 创建一个新实例。
         */
        public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
            this(nThreads, threadFactory, SelectorProvider.provider());
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor) {
            this(nThreads, executor, SelectorProvider.provider());
        }
    
        /**
         * 使用指定数量的线程、给定的ThreadFactory和给定的SelectorProvider
         * 创建一个新实例。
         */
        public NioEventLoopGroup(
                int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
            this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
        }
    
        public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
            final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
        }
    
        public NioEventLoopGroup(
                int nThreads, Executor executor, final SelectorProvider selectorProvider) {
            this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                    RejectedExecutionHandlers.reject());
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory,
                                 final RejectedExecutionHandler rejectedExecutionHandler) {
            super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory,
                                 final RejectedExecutionHandler rejectedExecutionHandler,
                                 final EventLoopTaskQueueFactory taskQueueFactory) {
            super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                    rejectedExecutionHandler, taskQueueFactory);
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 SelectorProvider selectorProvider,
                                 SelectStrategyFactory selectStrategyFactory,
                                 RejectedExecutionHandler rejectedExecutionHandler,
                                 EventLoopTaskQueueFactory taskQueueFactory,
                                 EventLoopTaskQueueFactory tailTaskQueueFactory) {
            super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                    rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
        }
    
        /**
         * 设置花在子事件循环中的I/O所需时间的百分比。
         * 默认值是50,这意味着事件循环将尝试在I/O上花费与非I/O任务相同的时间。
         */
        public void setIoRatio(int ioRatio) {
            // 所有管理的子事件轮询器 NioEventLoop,都设置这个 ioRatio
            for (EventExecutor e: this) {
                ((NioEventLoop) e).setIoRatio(ioRatio);
            }
        }
    
        /**
         * 用新创建的选择器替换子事件循环的当前选择器,
         * 以解决臭名昭著的epoll 100% CPU错误。
         */
        public void rebuildSelectors() {
            for (EventExecutor e: this) {
                ((NioEventLoop) e).rebuildSelector();
            }
        }
    
        /**
         * 创建此事件轮询器组 NioEventLoopGroup 所管理的
         * 子事件轮询器 NioEventLoop 实例
         */
        @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            SelectorProvider selectorProvider = (SelectorProvider) args[0];
            SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
            RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
            EventLoopTaskQueueFactory taskQueueFactory = null;
            EventLoopTaskQueueFactory tailTaskQueueFactory = null;
    
            int argsLength = args.length;
            if (argsLength > 3) {
                taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
            }
            if (argsLength > 4) {
                tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
            }
            // 直接 new 出 NioEventLoop 实例
            return new NioEventLoop(this, executor, selectorProvider,
                    selectStrategyFactory.newSelectStrategy(),
                    rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
        }
    }
    

    这个类其实也没有什么好讲的,就是复写 newChild 方法,创建所管理的事件轮询器 NioEventLoop

    六. 总结

    你会发现事件轮询器组EventExecutorGroup 的实现非常简单,就是管理子的事件轮询器 EventLoop

    相关文章

      网友评论

          本文标题:Netty源码_NioEventLoopGroup详解

          本文链接:https://www.haomeiwen.com/subject/ckypaltx.html