美文网首页Netty
NioEventLoopGroup类

NioEventLoopGroup类

作者: 骁兵 | 来源:发表于2019-03-05 16:46 被阅读0次

    NioEventLoopGroup继承关系图
      从上图可以看出,NioEventLoopGroup实现了ExecutorExecutorServiceScheduledExecutorServiceIterableEventExecutorGroupEventLoopGroup接口,另外三个抽象类AbstractEventExecutorGroupMultithreadEventExecutorGroupMultithreadEventLoopGroup提供了接口的部分实现。
    • AbstractEventExecutorGroup实现了ExecutorExecutorServiceScheduledExecutorService中的部分方法,都是委托给next()方法返回的EventExecutor
    • MultithreadEventExecutorGroup主要是创建了EventExecutor[] children数组,并将多个方法委托给children
    • MultithreadEventLoopGroupEventExecutor类型的chlild转化成EventLoopEventExecutor的子接口),并将register()方法委托给EventLoop
    • NioEventLoopGroup实现newChild()方法,返回NioEventLoop(EventLoop的实现类),并将几个方法委托给NioEventLoop

    Executor

      Executor接口比较简单,只有一个execute()方法,该方法在未来某个时间点执行参数Runnable实例,可以在当前线程、新建线程或线程池里面执行,取决于具体实现。

    public interface Executor {
        void execute(Runnable command);
    }
    

    ExecutorService

      ExecutorService继承了Executor,是建立在Executor基础上的一个服务,它是Executor基础上增加了任务管理服务。
      除了执行任务的基础功能外,ExecutorService还提供submit类方法,用于提交任务,并返回futrue实例(可以用于取消执行任务,或者等待执行完成)用于和任务进行通信。
      同时,还支持对Executor进行shutdown()操作,这一步可以用于资源的释放。

    public interface ExecutorService extends Executor {
        //拒绝新任务,但在terminate之前可以执行先前提交的任务
        void shutdown();
    
        //拒绝新任务,并删除之前提交的正在等待执行的任务,同时会尝试终止正在执行的任务
        List<Runnable> shutdownNow();
    
        boolean isShutdown();
    
        /**如果shutdown后所有任务都已完成,则返回true。 
        请注意,除非先调用shutdown或shutdownNow,否则isTerminated永远不会为真。**/
        boolean isTerminated();
    
        //阻塞,直到shutdown后剩下的所有任务被完成,或者等待超时,或者被中断(interrupted)
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        //提交任务,返回的Future实例可以用于取消执行任务,或者等待执行完成
        <T> Future<T> submit(Callable<T> task);
    
        <T> Future<T> submit(Runnable task, T result);
    
        Future<?> submit(Runnable task);
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

      Executors类提供ExecutorService类的多个工厂方法。

    ScheduledExecutorService

      ScheduledExecutorService可以在给定延时后执行任务,或者定期执行任务。
      scheduleAtFixedRatescheduleWithFixedDelay方法创建并执行定期运行的任务,直到被取消。
      scheduleAtFixedRate表示每隔多少时间,执行一次任务。
      scheduleWithFixedDelay等到前一个任务结束的时刻,才开始结算间隔时间,如0秒开始执行第一次任务,任务耗时5秒,任务间隔时间3秒,那么第二次任务执行的时间是在第8秒开始。
      通过 Executor.execute(Runnable)ExecutorService的submit()方法提交的任务延迟时间为0。

    public interface ScheduledExecutorService extends ExecutorService {
    
        public ScheduledFuture<?> schedule(Runnable command,
                                           long delay, TimeUnit unit);
    
        public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                               long delay, TimeUnit unit);
    
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit);
    
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit);
    
    }
    

    EventExecutorGroup

      EventExecutorGroup负责通过next()方法提供EventExecutor。 除此之外,它还负责处理它们的生命周期并允许以全局的方式关闭它们。

    public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
        //当被此EventExecutorGroup管理的所有EventExecutor被shutdown或者被shutdownGracefully时,返回true
        boolean isShuttingDown();
    
        //向executor发出关闭信号,此方法一旦被调用,isShuttingDown()就返回true,然后executor准备关闭自己
        Future<?> shutdownGracefully();
    
        Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
    
        //返回一个future,当所有被当前这个EventExecutorGroup管理的EventExecutor被终止时,会通知这个future
        Future<?> terminationFuture();
    
        //返回一个EventExecutor
        EventExecutor next();
       
        //被shutdownGracefully方法替代
        @Override
        @Deprecated
        void shutdown();
    
        //被shutdownGracefully方法替代
        @Override
        @Deprecated
        List<Runnable> shutdownNow();
    
        @Override
        Iterator<EventExecutor> iterator();
    
        @Override
        Future<?> submit(Runnable task);
    
        @Override
        <T> Future<T> submit(Runnable task, T result);
    
        @Override
        <T> Future<T> submit(Callable<T> task);
    
        @Override
        ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
    
        @Override
        <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
    
        @Override
        ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
    
        @Override
        ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
    }
    

    EventLoopGroup

      EventLoopGroup是特殊的EventExecutorGroup,它可以注册channel,被注册的channel会在事件循环(event loop)中被select和处理。

    public interface EventLoopGroup extends EventExecutorGroup {
        //返回下一个EventLoop
        @Override
        EventLoop next();
    
        //注册Channel,返回的ChannelFuture会在注册完成时得到通知
        ChannelFuture register(Channel channel);
    
        ChannelFuture register(ChannelPromise promise);
    
        @Deprecated
        ChannelFuture register(Channel channel, ChannelPromise promise);
    }
    

    AbstractEventExecutorGroup

      AbstractEventExecutorGroup只是将部分方法的实现的委托给next()方法返回的EventExecutor,但是没具体实现next()方法。

    public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
        @Override
        public Future<?> submit(Runnable task) {
            return next().submit(task);
        }
    
        @Override
        public <T> Future<T> submit(Runnable task, T result) {
            return next().submit(task, result);
        }
    
        @Override
        public <T> Future<T> submit(Callable<T> task) {
            return next().submit(task);
        }
    
        @Override
        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
            return next().schedule(command, delay, unit);
        }
    
        @Override
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            return next().schedule(callable, delay, unit);
        }
    
        @Override
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
            return next().scheduleAtFixedRate(command, initialDelay, period, unit);
        }
    
        @Override
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
            return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
        }
    
        @Override
        public Future<?> shutdownGracefully() {
            return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
        }
    
        /**
         * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
         */
        @Override
        @Deprecated
        public abstract void shutdown();
    
        /**
         * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
         */
        @Override
        @Deprecated
        public List<Runnable> shutdownNow() {
            shutdown();
            return Collections.emptyList();
        }
    
        @Override
        public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
                throws InterruptedException {
            return next().invokeAll(tasks);
        }
    
        @Override
        public <T> List<java.util.concurrent.Future<T>> invokeAll(
                Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
            return next().invokeAll(tasks, timeout, unit);
        }
    
        @Override
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
            return next().invokeAny(tasks);
        }
    
        @Override
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            return next().invokeAny(tasks, timeout, unit);
        }
    
        @Override
        public void execute(Runnable command) {
            next().execute(command);
        }
    }
    

    MultithreadEventExecutorGroup

      MultithreadEventExecutorGroup主要是创建了EventExecutor[] children
    数组,并将多个方法委托给children

    public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    
        private final EventExecutor[] children;
        private final Set<EventExecutor> readonlyChildren;
        private final AtomicInteger terminatedChildren = new AtomicInteger();
        private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
        private final EventExecutorChooserFactory.EventExecutorChooser chooser;
    
        protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
            this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
        }
    
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
            this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
        }
    
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            chooser = chooserFactory.newChooser(children);
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    
        protected ThreadFactory newDefaultThreadFactory() {
            return new DefaultThreadFactory(getClass());
        }
    
        @Override
        public EventExecutor next() {
            return chooser.next();
        }
    
        @Override
        public Iterator<EventExecutor> iterator() {
            return readonlyChildren.iterator();
        }
    
        public final int executorCount() {
            return children.length;
        }
    
        protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
    
        @Override
        public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
            for (EventExecutor l: children) {
                l.shutdownGracefully(quietPeriod, timeout, unit);
            }
            return terminationFuture();
        }
    
        @Override
        public Future<?> terminationFuture() {
            return terminationFuture;
        }
    
        @Override
        @Deprecated
        public void shutdown() {
            for (EventExecutor l: children) {
                l.shutdown();
            }
        }
    
        @Override
        public boolean isShuttingDown() {
            for (EventExecutor l: children) {
                if (!l.isShuttingDown()) {
                    return false;
                }
            }
            return true;
        }
    
        @Override
        public boolean isShutdown() {
            for (EventExecutor l: children) {
                if (!l.isShutdown()) {
                    return false;
                }
            }
            return true;
        }
    
        @Override
        public boolean isTerminated() {
            for (EventExecutor l: children) {
                if (!l.isTerminated()) {
                    return false;
                }
            }
            return true;
        }
    
        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException {
            long deadline = System.nanoTime() + unit.toNanos(timeout);
            loop: for (EventExecutor l: children) {
                for (;;) {
                    long timeLeft = deadline - System.nanoTime();
                    if (timeLeft <= 0) {
                        break loop;
                    }
                    if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
                        break;
                    }
                }
            }
            return isTerminated();
        }
    }
    

    MultithreadEventLoopGroup

      MultithreadEventLoopGroupEventExecutor类型的chlild转化成EventLoopEventExecutor的子接口),并将register()方法委托给EventLoop

    public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    
        private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
    
        private static final int DEFAULT_EVENT_LOOP_THREADS;
    
        static {
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
            }
        }
    
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        }
    
        protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
        }
    
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                         Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
        }
    
        @Override
        protected ThreadFactory newDefaultThreadFactory() {
            return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
        }
    
        @Override
        public EventLoop next() {
            return (EventLoop) super.next();
        }
    
        @Override
        protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;
    
        @Override
        public ChannelFuture register(Channel channel) {
            return next().register(channel);
        }
    
        @Override
        public ChannelFuture register(ChannelPromise promise) {
            return next().register(promise);
        }
    
        @Deprecated
        @Override
        public ChannelFuture register(Channel channel, ChannelPromise promise) {
            return next().register(channel, promise);
        }
    }
    

    NioEventLoopGroup

      NioEventLoopGroup实现newChild()方法,返回NioEventLoop(EventLoop的实现类),并将几个方法委托给NioEventLoop

    public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    
        public NioEventLoopGroup() {
            this(0);
        }
    
        public NioEventLoopGroup(int nThreads) {
            this(nThreads, (Executor) null);
        }
    
        public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
            this(nThreads, threadFactory, SelectorProvider.provider());
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor) {
            this(nThreads, executor, SelectorProvider.provider());
        }
    
        public NioEventLoopGroup(
                int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
            this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
        }
    
        public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
            final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
        }
    
        public NioEventLoopGroup(
                int nThreads, Executor executor, final SelectorProvider selectorProvider) {
            this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                    RejectedExecutionHandlers.reject());
        }
    
        public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                 final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory,
                                 final RejectedExecutionHandler rejectedExecutionHandler) {
            super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
        }
    
        public void setIoRatio(int ioRatio) {
            for (EventExecutor e: this) {
                ((NioEventLoop) e).setIoRatio(ioRatio);
            }
        }
    
        public void rebuildSelectors() {
            for (EventExecutor e: this) {
                ((NioEventLoop) e).rebuildSelector();
            }
        }
    
        @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
        }
    }
    

    相关文章

      网友评论

        本文标题:NioEventLoopGroup类

        本文链接:https://www.haomeiwen.com/subject/pqsouqtx.html