Netty源码阅读——线程模型

作者: 曾泽浩 | 来源:发表于2018-12-25 14:39 被阅读2次

    在写Netty入门应用的时候,会用到NioEventLoopGroup,隐约知道是一个线程池的东西,但是对于具体的实现并不是很清楚。在学习的过程中,有几个问题一直困扰着我?

    • 它与线程池有什么区别呢?
    • 线程是什么时候启动的?
    • 提交的任务是怎么执行的?

    带着这些问题一起思考,看看Netty是怎么解决这些问题的

    首先来看一张类图吧


    SingleThreadEventExecutor.png

    在这张类图中,有几个比较重要的接口和类,EventExecutorLoop、EventLoop以及SingleThreadEventExecutor。

    在类的继承图当中,我们也可以看到,相对于传统的线程池ExecutorService,EventExecutorLoop还继承了ScheduledExecutorService,也就是说,除了传统线程池提供的功能之外,还支持执行可调度的任务,而且还是有序的。

    EventExecutorLoop 与 EventExecutor的关系其实就是父与子的关系,EventExecutorLoop通过next()方法可以拿到孩子,EventExecutor通过parent()方法可以拿到父亲。

    SingleThreadEventExecutor就是实现了上面EventExecutorLoop接口功能的实现。

    解决了上面第一个问题,与线程池之间有什么区别。

    EventExecutorLoop的主要作用:

    • next()方法可以从这个组中拿到一个Executor

      /**
       * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
       */
      EventExecutor next();
      
    • 可以处理这个事件执行器组的生命周期

      /**
       * Returns {@code true} if and only if all {@link EventExecutor}s managed by this {@link EventExecutorGroup}
       * are being {@linkplain #shutdownGracefully() shut down gracefully} or was {@linkplain #isShutdown() shut down}.
       */
      boolean isShuttingDown();
      
      /**
       * Shortcut method for {@link #shutdownGracefully(long, long, TimeUnit)} with sensible default values.
       *
       * @return the {@link #terminationFuture()}
       */
      Future<?> shutdownGracefully();
      
      /**
       * Signals this executor that the caller wants the executor to be shut down.  Once this method is called,
       * {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down.
       * Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for <i>'the quiet period'</i>
       * (usually a couple seconds) before it shuts itself down.  If a task is submitted during the quiet period,
       * it is guaranteed to be accepted and the quiet period will start over.
       *
       * @param quietPeriod the quiet period as described in the documentation
       * @param timeout     the maximum amount of time to wait until the executor is {@linkplain #shutdown()}
       *                    regardless if a task was submitted during the quiet period
       * @param unit        the unit of {@code quietPeriod} and {@code timeout}
       *
       * @return the {@link #terminationFuture()}
       */
      Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
      
      /**
       * Returns the {@link Future} which is notified when all {@link EventExecutor}s managed by this
       * {@link EventExecutorGroup} have been terminated.
       */
      Future<?> terminationFuture();
      
      /**
       * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
       */
      @Override
      @Deprecated
      void shutdown();
      
      /**
       * @deprecated {@link #shutdownGracefully(long, long, TimeUnit)} or {@link #shutdownGracefully()} instead.
       */
      @Override
      @Deprecated
      List<Runnable> shutdownNow();
      
    • EventExecutorGroup 整合了可调度任务执行器的功能

      /**
           * submit最终也会去执行execute()
           * @param task
           * @return
           */
          @Override
          Future<?> submit(Runnable task);
      
          @Override
          <T> Future<T> submit(Runnable task, T result);
      
          @Override
          <T> Future<T> submit(Callable<T> task);
      
          @Override
          ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
      
          @Override
          <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
      
          @Override
          ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
      
          @Override
          ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
      

    现在看一下第二个问题,线程是什么时候启动的

    看一下SingleThreadEventExecutor的构造函数

    protected SingleThreadEventExecutor(
            EventExecutorGroup parent, ThreadFactory threadFactory,
            boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
        this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
    }
    

    上面的构造函数中,new ThreadPerTaskExecutor(threadFactory)

    ThreadPerTaskExecutor

    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
            this.threadFactory = threadFactory;
        }
    
        // 执行Runnable的时候顺便启动线程
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    }
    

    threadFactory.newThread(command).start();

    从上面的execute()方法中,线程工厂会去创建一个线程并且去启动线程。

    当我们提交一个任务时,会去调用SingleThreadEventExecutor的execute()

    execute()

        @Override
        public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
            // thread是不是当前线程
            boolean inEventLoop = inEventLoop();
            // 添加任务
            addTask(task);
            if (!inEventLoop) {
                // 启动线程
                startThread();
                // 当前线程状态是不是已经关闭了
                if (isShutdown()) {
                    boolean reject = false;
                    try {
                        if (removeTask(task)) {
                            reject = true;
                        }
                    } catch (UnsupportedOperationException e) {
                        // The task queue does not support removal so the best thing we can do is to just move on and
                        // hope we will be able to pick-up the task before its completely terminated.
                        // In worst case we will log on termination.
                    }
                    if (reject) {
                        reject();
                    }
                }
            }
    
            // 唤醒
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
    

    startThread()

    private void startThread() {
        // 判断线程的状态
        // 如果当前线程的状态是未开启的
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try {
                    // 真正启动线程
                    doStartThread();
                } catch (Throwable cause) {
                    STATE_UPDATER.set(this, ST_NOT_STARTED);
                    PlatformDependent.throwException(cause);
                }
            }
        }
    }
    

    doStartThread()

    private void doStartThread() {
        assert thread == null;
        // 这里execcutor就是SingleThreadEventExecutor的构造函数创建的
        // ThreadPerTaskExecutor,而ThreadPerTaskExecutor执行execute方法的时候就会去创建一个线程并且开启
        // executor在这里会真正创建一个线程
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                // 线程是否被中断
                if (interrupted) {
                    thread.interrupt();
                }
    
                boolean success = false;
                // 更新上次执行的时间
                updateLastExecutionTime();
                try {
                    // 执行任务
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    ...
    }
    

    上面的几个步骤当中,总结一下,当我们提交一个任务的时候,会调用SingleThreadEventExecutor的execute()方法,在execute()方法中,就会去判断是否需要去创建一个新的线程,如果需要的话,就会接着执行

    doStartThread()方法,接着就会执行executor.execute(),executor是ThreadPerTaskExecutor的一个实例化对象,执行execute()方法时,就会去创建一个线程并且启动它。

    这就回答了第二个问题,线程是什么时候被创建的

    接着,第三个问题,提交的任务是怎么被执行的

    在上面doStartThread()方法中,有一句

    SingleThreadEventExecutor.this.run();
    
    /**
     * run()方法,线程启动的时间会调用这个方法,由子类去执行
     */
    protected abstract void run();
    

    run()方法是一个protected修饰的抽象方法,这说明了需要子类去实现run()方法。

    DefaultEventExecutor继承了SingleThreadEventExecutor,并且实现了run()方法

    /**
     * 开始一个线程的时候会执行这个方法
     * 死循环,一直拿任务,处理任务
     */
    @Override
    protected void run() {
        for (;;) {
            Runnable task = takeTask();
            if (task != null) {
                task.run();
                updateLastExecutionTime();
            }
    
            if (confirmShutdown()) {
                break;
            }
        }
    }
    

    从上面的run()方法中,是一个死循环,一直takeTask(),拿到任务之后,就会调用任务的run()方法。在SingleThreadEventExecutor的execute()方法中会去addTask()添加任务,这里的run()方法中,会一直去拿任务。

    上面回答了最后一个问题,任务是怎么被执行的。

    上面我们都是围绕着SingleThreadEventExecutor去展开讨论呢,它是一个单线程的执行器,通常我们在使用的时候都是多线程的。

    下面继续看一个类MultithreadEventExecutorGroup

    构造函数

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }
    
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
    
        // EventExecutor数组
        children = new EventExecutor[nThreads];
    
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                // 初始化, 貌似所有的children都是同一个executor呀
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                // 初始化失败
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }
    
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }
    
        // EventExecutor选择器,选择下一个EventExecutor
        chooser = chooserFactory.newChooser(children);
    
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };
    
        // 添加中断之后的监听器
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }
    
        // 创建只读集合
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }
    

    上面的构造函数中,最重要的是初始化了一个EventExecutor数组

    children = new EventExecutor[nThreads];

    并且,children[i] = newChild(executor, args);

    其中,newChild()是一个抽象方法,需要子类去实现。

    DefaultEventExecutorGroup继承了MultithreadEventExecutorGroup,并且实现了newChild()方法

    @Override
    protected EventExecutor newChild(Executor executor, Object... args) throws Exception {
        // 创建默认的EventExecutor
        return new DefaultEventExecutor(this, executor, (Integer) args[0], (RejectedExecutionHandler) args[1]);
    }
    

    DefaultEventExecutor恰好是继承了SingleThreadEventExecutor

    EventExecutor

    在最上面提到的EventExecutorLoop接口,在一个next()方法,就是拿到EventExecutor

    简言之,DefaultEventExecutorGroup是一个线程池,可以通过next()方法得到

    一个DefaultEventExecutor。每次我们往DefaultEventExecutorGroup提交任务的时候,首先会通过next()方法去拿到一个DefaultEventExecutor(其实就是一个线程),然后去执行SingleThreadEventExecutor的execute()方法,这一步会去判断需不需要创建一个新的线程,如果不需要就执行添加任务。然后DefaultEventExecutor的run()方法是一个死循环,会一直拿任务,执行任务。

    其实NioEventLoopGroup也是继承了MultithreadEventLoopGroup,所以NioEventLoopGroup本质上也是一个线程池,但是它不是提交Runnable任务,而是注册Channel。下面其实我们可以把DefaultEventExecutorGroup当成一个线程池,然后提交各种任务。

    public class DefaultEventExecutorTest {
        public static void main(String[] args) {
            DefaultEventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(4);
            defaultEventExecutorGroup.next().execute(new Task());
            defaultEventExecutorGroup.next().schedule(new Task(),5, TimeUnit.SECONDS);
    
        }
    }
    
    public class Task implements Runnable {
        public void run() {
            System.out.println("Hello world");
            System.out.println(Thread.currentThread().getName());
        }
    }
    

    哈哈哈,到此结束了,这是笔者自身的一些理解,如有错误之处,请指正。

    相关文章

      网友评论

        本文标题:Netty源码阅读——线程模型

        本文链接:https://www.haomeiwen.com/subject/ccjflqtx.html