美文网首页
2022-03-30_netty事件循环NioEventLoop

2022-03-30_netty事件循环NioEventLoop

作者: kikop | 来源:发表于2022-05-30 15:58 被阅读0次

    20220330_netty事件循环NioEventLoopGroup多线程源码学习笔记.md

    1概述

    本文基于netty版本,V4.1.68.Final-SNAPSHOT。

    事件循环EventLoop完全是一个Thread,本质是一个线程,继承了事件循环组接口EventLoopGroup和事件执行器接口OrderEventExecutor,但没有实现。

    nioEventLoopGroup线程池创建的最大区别就是,netty自定义了ThreadPerTaskExecutor执行器进行线程的创建。而没有使用Jdk中的Executor(ThreadPoolExecutor),至于任务调度方式是一样的。

    为了合理利用资源,根据配置和可用的内核,netty可用使用多个EventLoop即多个线程。

    1.1相关类图

    1.1.1EventLoop接口关系图

    public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    

    [图片上传失败...(image-ea9bf3-1653897509473)]

    1.1.2SingleThreadEventLoop抽象类关系图

    [图片上传失败...(image-afee6f-1653897509474)]

    1.1.2.1DefaultEventLoop类关系图

        public DefaultEventLoop(EventLoopGroup parent, ThreadFactory threadFactory) {
            
            // addTaskWakesUp:true,由JDK队列进行阻塞唤醒
            super(parent, threadFactory, true);
        }
    

    [图片上传失败...(image-8f488-1653897509474)]

    1.1.2.2NioEventLoop类(单线程)

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                     EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
            // addTaskWakesUp:false,需要人为增加一个NOP队列任务唤醒IO任务
            super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
                    rejectedExecutionHandler);
        
    

    [图片上传失败...(image-fec97e-1653897509474)]

    1. SingleThreadEventExecutor抽象类:延时任务,获取待执行和过期任务,并执行,内置Queue<Runnable> taskQueue。

    2. SingleThreadEventLoop抽象类:实现EventLoopGroup接口中的next,注册等方法,内置额外的时间循环任务队列,tailTasks queue<Runnable>,作为参数给SingleThreadEventExecutor.runAllTasksFrom调用,问题来了,taskQueue不够吗?

      protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {。
      
    1. DefaultEventLoop:无限循环,实现从队列中取任务,执行任务等一些列的。
    // E:\workdirectory\Dev\study\netty-4.1\transport\src\main\java\io\netty\channel\DefaultEventLoop.java
    
    // 实现了SingleThreadEventExecutor抽象类的run方法。
    @Override
        protected void run() {
            for (;;) {
                // 1.调用SingleThreadEventExecutor.takeTask获取任务。
                Runnable task = takeTask();
                if (task != null) {
                    task.run();
                    // 3.调用SingleThreadEventExecutor.updateLastExecutionTime
                    updateLastExecutionTime();
                }
    
                // 3.调用SingleThreadEventExecutor.confirmShutdown
                if (confirmShutdown()) {
                    break;
                }
            }
        }
    

    1.1.3MultithreadEventLoopGroup抽象类

    public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    

    [图片上传失败...(image-96b39c-1653897509474)]

    1.1.3.1NioEventLoopGroup类(多线程)

    public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    

    [图片上传失败...(image-581924-1653897509474)]

    1.2多线程启动源码分析

    1.2.1构建NioEventLoopGroup

    // E:\workdirectory\Dev\study\netty-4.1\transport\src\main\java\io\netty\channel\nio\NioEventLoopGroup.java
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
        }
    // E:\workdirectory\Dev\study\netty-4.1\transport\src\main\java\io\netty\channel\MultithreadEventLoopGroup.java
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        }
    
    // E:\workdirectory\Dev\study\netty-4.1\common\src\main\java\io\netty\util\concurrent\MultithreadEventExecutorGroup.java
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
            this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
        }
    

    1.2.2线程池的创建(非JDK方式)

    [图片上传失败...(image-ce8b20-1653897509474)]

    public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
        // 本质:NioEventLoop
        private final EventExecutor[] children;
    
    /**
         * Create a new instance.
         *
         * @param nThreads          the number of threads that will be used by this instance.
         * @param executor          the Executor to use, or {@code null} if the default should be used.
         * @param chooserFactory    the {@link EventExecutorChooserFactory} to use.
         * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
         */
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            checkPositive(nThreads, "nThreads");
    
            // 1.构建 executor,ThreadPerTaskExecutor等同于 JDK ThreadPoolExecutor
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
            // 2.创建指定线程数量的NioEventLoop
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    // 2.1.创建NioEventLoop,默执行器:ThreadPerTaskExecutor
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            chooser = chooserFactory.newChooser(children);
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    

    1.2.2.1创建线程(NioEventLoopGroup中实现方法)

    [图片上传失败...(image-af9820-1653897509474)]

    // E:\workdirectory\Dev\study\netty-4.1\common\src\main\java\io\netty\util\concurrent\MultithreadEventExecutorGroup.java
    protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
    
    // E:\workdirectory\Dev\study\netty-4.1\transport\src\main\java\io\netty\channel\nio\NioEventLoopGroup.java
    @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            SelectorProvider selectorProvider = (SelectorProvider) args[0];
            SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1];
            RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2];
            EventLoopTaskQueueFactory taskQueueFactory = null;
            EventLoopTaskQueueFactory tailTaskQueueFactory = null;
    
            int argsLength = args.length;
            if (argsLength > 3) {
                taskQueueFactory = (EventLoopTaskQueueFactory) args[3];
            }
            if (argsLength > 4) {
                tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4];
            }
            // 看到没有,返回的类:NioEventLoop
    // public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
            // -->SingleThreadEventLoop-->SingleThreadEventExecutor
            // -->AbstractScheduledEventExecutor-->AbstractEventExecutor
            return new NioEventLoop(this, executor, selectorProvider,
                    selectStrategyFactory.newSelectStrategy(),
                    rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory);
        }
    

    1.2.2.2ThreadFactory

    // C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\Executors.java
    /**
         * The default thread factory
         */
    static class DefaultThreadFactory implements ThreadFactory {    
    
    // E:\workdirectory\Dev\study\netty-4.1\transport\src\main\java\io\netty\channel\MultithreadEventLoopGroup.java
    @Override
    protected ThreadFactory newDefaultThreadFactory() {
        return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
    }
    
    public class DefaultThreadFactory implements ThreadFactory {
    

    1.2.2.3netty默认执行器ThreadPerTaskExecutor

    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    }
    

    1.2.2.4创建线程(任务首次提交NioEventLoop.execute的时候)

    提交到某一个NioEventLoop中

    1.2.2.4.1execute(无返回值)
    NioEventLoopGroup group = new NioEventLoopGroup(1);
    final NioEventLoop loop = (NioEventLoop) group.next()
    loop.execute(task);
    
    // E:\workdirectory\Dev\study\netty-4.1\common\src\main\java\io\netty\util\concurrent\SingleThreadEventExecutor.java
        @Override
        public void execute(Runnable task) {
            ObjectUtil.checkNotNull(task, "task");
            execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
        }
    private void execute(Runnable task, boolean immediate) {
            boolean inEventLoop = inEventLoop();
            addTask(task);
            if (!inEventLoop) {
                startThread();
    
        private void doStartThread() {
            assert thread == null;
            // executor:
            executor.execute(new Runnable() {
    
     @Override
        public void execute(Runnable command) {
            // 这里是创建线程的地方
            threadFactory.newThread(command).start();
        }
    
    1.2.2.4.2submit(支持异步获取)
    NioEventLoopGroup group = new NioEventLoopGroup(1);
    final NioEventLoop loop = (NioEventLoop) group.next()
    loop.submit(task);
    
    // E:\workdirectory\Dev\study\netty-4.1\common\src\main\java\io\netty\util\concurrent\AbstractEventExecutor.java
    @Override
        public Future<?> submit(Runnable task) {
            return (Future<?>) super.submit(task);
        }
    
    // C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\AbstractExecutorService.java
    public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            // 包装成 Jdk.RunnableFuture,class is:Jdk.FutureTask
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
    // E:\workdirectory\Dev\study\netty-4.1\common\src\main\java\io\netty\util\concurrent\SingleThreadEventExecutor.java
    @Override
        public void execute(Runnable task) { // task is:FutrueTask,可异步接收的
            ObjectUtil.checkNotNull(task, "task");
            execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
        }
    

    2代码示例

    2.1testReregister

    // E:\workdirectory\Dev\study\netty-4.1\transport\src\test\java\io\netty\channel\AbstractEventLoopTest.java
    @Test
        public void testReregister() {
            EventLoopGroup group = newEventLoopGroup();
            EventLoopGroup group2 = newEventLoopGroup();
            final EventExecutorGroup eventExecutorGroup = new DefaultEventExecutorGroup(2);
    
            ServerBootstrap bootstrap = new ServerBootstrap();
            ChannelFuture future = bootstrap.channel(newChannel()).group(group)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                        }
                    }).handler(new ChannelInitializer<ServerSocketChannel>() {
                        @Override
                        public void initChannel(ServerSocketChannel ch) {
                            ch.pipeline().addLast(new TestChannelHandler());
                            ch.pipeline().addLast(eventExecutorGroup, new TestChannelHandler2());
                        }
                    })
                    .bind(0).awaitUninterruptibly();
    
            EventExecutor executor = future.channel().pipeline().context(TestChannelHandler2.class).executor();
            EventExecutor executor1 = future.channel().pipeline().context(TestChannelHandler.class).executor();
            future.channel().deregister().awaitUninterruptibly();
            Channel channel = group2.register(future.channel()).awaitUninterruptibly().channel();
            EventExecutor executorNew = channel.pipeline().context(TestChannelHandler.class).executor();
            assertNotSame(executor1, executorNew);
            assertSame(executor, future.channel().pipeline().context(TestChannelHandler2.class).executor());
        }
    

    2.2testRegistrationAfterShutdown

    @Test
        @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
        @SuppressWarnings("deprecation")
        public void testRegistrationAfterShutdown() throws Exception {
            // 1.关闭线程池
            loopA.shutdown();
    
            // 2.禁用log
            // Disable logging temporarily.
            Logger root = (Logger) LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME);
            List<Appender<ILoggingEvent>> appenders = new ArrayList<Appender<ILoggingEvent>>();
            for (Iterator<Appender<ILoggingEvent>> i = root.iteratorForAppenders(); i.hasNext();) {
                Appender<ILoggingEvent> a = i.next();
                appenders.add(a);
                root.detachAppender(a);
            }
    
            try {
                // throw new RejectedExecutionException("event executor terminated");
                ChannelFuture f = loopA.register(new LocalChannel());
                f.awaitUninterruptibly();
                assertFalse(f.isSuccess());
                assertThat(f.cause(), is(instanceOf(RejectedExecutionException.class)));
                assertFalse(f.channel().isOpen());
            } finally {
                for (Appender<ILoggingEvent> a: appenders) {
                    root.addAppender(a);
                }
            }
        }
    

    参考

    相关文章

      网友评论

          本文标题:2022-03-30_netty事件循环NioEventLoop

          本文链接:https://www.haomeiwen.com/subject/crboprtx.html