美文网首页netty首页投稿(暂停使用,暂停投稿)程序员
Netty 源码解析 ——— Netty 优雅关闭流程

Netty 源码解析 ——— Netty 优雅关闭流程

作者: tomas家的小拨浪鼓 | 来源:发表于2017-11-27 03:42 被阅读1260次

    本文是Netty文集中“Netty 源码解析”系列的文章。主要对Netty的重要流程以及类进行源码解析,以使得我们更好的去使用Netty。Netty是一个非常优秀的网络框架,对其源码解读的过程也是不断学习的过程。

    Netty的优雅关闭操作

    Netty是通过『eventLoopGroup.shutdownGracefully()』操作来实现它的优雅关闭的。

    我们先来看下shutdownGracefully方法的doc说明:

        /**
         * Signals this executor that the caller wants the executor to be shut down.  Once this method is called,
         * {@link #isShuttingDown()} starts to return {@code true}, and the executor prepares to shut itself down.
         * Unlike {@link #shutdown()}, graceful shutdown ensures that no tasks are submitted for <i>'the quiet period'</i>
         * (usually a couple seconds) before it shuts itself down.  If a task is submitted during the quiet period,
         * it is guaranteed to be accepted and the quiet period will start over.
         *
         * @param quietPeriod the quiet period as described in the documentation
         * @param timeout     the maximum amount of time to wait until the executor is {@linkplain #shutdown()}
         *                    regardless if a task was submitted during the quiet period
         * @param unit        the unit of {@code quietPeriod} and {@code timeout}
         *
         * @return the {@link #terminationFuture()}
         */
        Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
    

    调用者希望执行器进行关闭的信号。一旦这个方法被调用了,『isShuttingDown()』方法将开始都会返回true,同时执行器准备关闭它自己。不像『shutdown()』方法,优雅关闭会确保在它关闭它自己之前没有任务在’the quiet period’(平静期,即,gracefulShutdownQuietPeriod属性)内提交。如果一个任务在平静期内提交了,它会保证任务被接受并且重新开始平静期。
    如果你现在,对这段描述有些许困惑,没关系,请继续往下看,gracefulShutdownQuietPeriod(即,quietPeriod参数)、gracefulShutdownStartTime(即,timeout参数)主要会在『confirmShutdown()』方法中使用,下面会结合方法的实现场景来说明gracefulShutdownStartTime、gracefulShutdownQuietPeriod的含义。

    源码解析

        // AbstractEventExecutorGroup#shutdownGracefully
        public Future<?> shutdownGracefully() {
            return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
        }
    
        static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
        static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
    
        // MultithreadEventExecutorGroup#shutdownGracefully
        public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
            for (EventExecutor l: children) {
                l.shutdownGracefully(quietPeriod, timeout, unit);
            }
            return terminationFuture();
        }
    

    遍历EventExecutor[]数组,取出EventExecutor执行shutdownGracefully操作。因为优雅关闭的流程主要是在各个NioEventLoop线程各自完成的,它是一个异步操作,因此此时返回该异步操作的Future,它是一个无返回结果的DefaultPromise对象。



    ① 确保 quietPeriod、unit的为有效值,即『quietPeriod >= 0』、『unit != null』。同时,确保timeout、quietPeriod之间的正确性,即『quietPeriod <= timeout』。
    ② 如果该NioEventLoop已经执行过关闭操作了,可能是『shutdownGracefully()』这样的优雅关闭,也有可能是『shutdown() or shutdownNow()』,当然后两种方法已经不建议使用了(Deprecated)。那么直接返回该异步操作的Future对象。
    ③ 使用自旋锁(『自旋 + CAS』)的方式修改当前NioEventLoop所关联的线程的状态(volatile修饰的成员变量state)。因为此方法可能被多线程同时调用,所以使用了自旋锁的方式来保证NioEventLoop所关联的线程状态(state成员变量)的修改是原子性的。
    之前我们说过,NioEventLoop所关联的线程总共有5个状态,分别是:

    private static final int ST_NOT_STARTED = 1;    // 线程还未启动
    private static final int ST_STARTED = 2;        // 线程已经启动
    private static final int ST_SHUTTING_DOWN = 3;  // 线程正在关闭
    private static final int ST_SHUTDOWN = 4;       // 线程已经关闭
    private static final int ST_TERMINATED = 5;     // 线程已经终止
    

    其中,在正常的线程状态流为:ST_NOT_STARTED ——> ST_STARTED ——> ST_SHUTTING_DOWN ——> ST_TERMINATED。
    而ST_SHUTDOWN这个线程状态是已经弃用的『shutdown() or shutdownNow()』所会设置的线程状态,但是无论怎样在此步骤中,线程的状态至少为会置为ST_SHUTTING_DOWN,或者说正常情况下都是会设置为ST_SHUTTING_DOWN的。
    补充简单说明下两个知识点:
    a) 自旋锁(Spin lock):由它自己去占有CPU运行的时间,然后去尝试进行更新,直到更新成功完成。也因为它是占用CPU资源的方式,所以自旋锁实现的操作是非常简短的,不然其他线程可能会一直在自旋等待该自旋锁。也正式因为自旋锁是不会释放CPU的,也就是线程无需被挂起,这样就没有线程上下文切换的问题了。
    因此,自旋锁一般用于在多核处理器中预计线程持有锁的时间很短(即锁操作所需的时间非常的短)情况,甚至时间短于两次线程上下文的切换的开销。
    b) volatile的可见性:volatile除了保证单个变量的读/写具有原子性外,还有有一个很重要的特性就是对线程内存可见性的保证(即,对一个 volatile 变量的读,总是能看到(任意线程)对这个 volatile 变量最后的写入)。因为此处修改state字段(本文是Netty服务端主线程)的线程和使用该字段的线程(NioEventLoop所关联线程)不是同一个线程。因此通过volatile来修饰state字段来实现,通过主线程修改了EventLoop所关联的线程状态后,在NioEventLoop的事件循环中能立即正确感知其线程状态的变化,从而做出相应的操作。
    ④ 根据传入的参数,设置成员变量gracefulShutdownQuietPeriod、gracefulShutdownTimeout。这里分别为默认值,gracefulShutdownQuietPeriod为2秒,gracefulShutdownTimeout为15秒。
    ⑤ 如果NioEventLoop所关联的线程之前的状态为ST_NOT_STARTED,则说明该线程还未被启动过,那么启动该线程。
    Q:为什么我们在执行关闭操作的时候,还需要特意去启动那些未启动的NioEventLoop线程了?
    A:是这样的,在基于NIO的网络传输模式中,会在构建NioEventLoopGroup的时候就预先将一定数量的NioEventLoop给创建好(默认为操作系统可运行处理器数的2倍),而NioEventLoop在初始化的时候就会将其上的Selector给开启了。同时Selector的关闭是在『doStartThread()』方法中最后会去完成的事。关于『doStartThread()』方法将在后面详细展开。

    好了,在完成将NioEventLoop所关联的线程状态修改为’ST_SHUTTING_DOWN’,也就说明关闭流程的开始。那么,接下来我们来看看NioEventLoop中是如果完成优雅的关闭的。

    我们先来看看doStartThread()方法:

        private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    if (interrupted) {
                        thread.interrupt();
                    }
    
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        for (;;) {
                            int oldState = state;
                            if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                                break;
                            }
                        }
    
                        // Check if confirmShutdown() was called at the end of the loop.
                        if (success && gracefulShutdownStartTime == 0) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                    "before run() implementation terminates.");
                        }
    
                        try {
                            // Run all remaining tasks and shutdown hooks.
                            for (;;) {
                                if (confirmShutdown()) {
                                    break;
                                }
                            }
                        } finally {
                            try {
                                cleanup();
                            } finally {
                                STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                                threadLock.release();
                                if (!taskQueue.isEmpty()) {
                                    logger.warn(
                                            "An event executor terminated with " +
                                                    "non-empty task queue (" + taskQueue.size() + ')');
                                }
    
                                terminationFuture.setSuccess(null);
                            }
                        }
                    }
                }
            });
        }
    

    ① 这里executor.execute方法底层会通过ThreadPerTaskExecutor.execute(Runnable)方法来创建并启动执行任务的唯一线程。然后启动的线程就会执行我们通过executor.execute方法提交上来的这个任务(具体的这块说明请见Netty 源码解析 ——— 服务端启动流程 (上))。
    ② 在Runnable任务中,会将当前的线程设置为NioEventLoop所关联的线程,即对成员变量thread赋值为Thread.currentThread()。然后执行『SingleThreadEventExecutor.this.run();』这里实际调用的是『NioEventLoop#run()』方法来进行事件循环操作。
    ③ 当事件循环操作退出后(当NioEventLoop需要关闭时,事件循环才会退出),进行关闭的后续操作。

    当NioEventLoop已经处于使用状态(即,上面有Channel与其绑定),那么此时它会处于事件循环操作中;若NioEventLoop没有处于使用状态(即,该NioEventLoop已经被初始化构建好了,但还没有任何一个Channel与其绑定过),那么在执行shutdownGracefully()后,也会因为调用了doStartThread()方法,此时该NioEventLoop也会处于事件循环中。
    那么,接下来我们就来看看NioEventLoop中事件循环对于优雅关闭都完成了哪些操作了?

    『NioEventLoop#run()』:

        protected void run() {
            for (;;) {
                try {
                    ......
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    

    此处,我们仅对与优雅关闭流程相关的部分进行展开。
    事件循环首先会对Selector上注册的Channel所就绪的I/O事件做处理,然后处理taskQueue中的任务以及时间已经到达的定时/周期性任务。最后,在每次事件循环的最后都会判断一次当前的线程状态,如果发现当前的线程状态处于正在关闭的状态(即,state >= ST_SHUTTING_DOWN)则会开始处理关闭流程,即:

        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    

    注意,事件循环中将正常的工作流程放在了一个try-catch中,将关闭流程放在了另一个try-catch中,这是为了它们之间能够不会互相影响。这样即便工作流程抛出异常了,每次事件循环的最后依旧能够去处理关闭事件。

    关闭流程主要分为两步:
    ① 『closeAll()』:

        private void closeAll() {
            selectAgain();
            Set<SelectionKey> keys = selector.keys();
            Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
            for (SelectionKey k: keys) {
                Object a = k.attachment();
                if (a instanceof AbstractNioChannel) {
                    channels.add((AbstractNioChannel) a);
                } else {
                    k.cancel();
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    invokeChannelUnregistered(task, k, null);
                }
            }
    
            for (AbstractNioChannel ch: channels) {
                ch.unsafe().close(ch.unsafe().voidPromise());
            }
        }
    

    获取该注册到这个Selector所有Channel所对应的SelectionKey,然后获取SelectionKey附加对象attachment(),若attachment是一个AbstractNioChannel对象则先让放入到channels集合中,否则直接调用『k.cancel(),即selectionKey.cancel()』操作将这个SelectableChannel从Selector上注销。最后遍历channels集合,依次取出AbstractNioChannel,进行AbstractNioChannel的关闭操作(『ch.unsafe().close(ch.unsafe().voidPromise());』)


    1. 如设置了Socket#SO_LINGER配置项(即,config().getSoLinger() > 0),则说明当需要关闭socket时,如果这时send buffer里还有数据没有发送完,则先尝试把send buffer中的数据发送完了再关闭socket。所以此时会先执行doDeregister()操作,将当前的SocketChannel从Selector上注销,然后将close()操作作为一个任务放到另一个执行器去执行,也就是说不在当前的NioEventLoop的线程上去执行当前SocketChannel的关闭操作,因为此时SocketChannel不会马上关闭,它需要尝试在l_linger time时间内将发送缓存区中的数据发送出去并等待对方的确认。在l_linger time时间之后socket才会真正的被关闭。
    2. 如果没有设置Socket#SO_LINGER配置项,则直接在NioEventLoop线程上进行SocketChannel/ServerSocektChannel的close()操作。并将outboundBuffer中所有还未发送出去的消息标志为操作失败(fail flush),然后关闭outboundBuffer,释放相关资源。在关闭socket之后,将SocketChannel/ServerSocketChannel从Selector上注销(即,『selectionKey.cancel()』。selectionKey表示一个SocketChannel/ServerSocketChannel注册到Selector的关联关系)。
    3. 触发‘channelInactive’事件和‘channelUnregistered’事件,这两个事件都会在ChannelPipeline中得以传播。但这两个事件的触发会被封装为一个任务提交至当前的NioEventLoop的taskQueue在随后被执行,这么做的原因是为了确保‘channelInactive’事件和‘channelUnregistered’事件的触发会在NioEventLoop线程上执行。‘channelInactive’事件和‘channelUnregistered’事件都是入站事件,它们会依次顺序调用ChannelPipeline中的ChannelInboundHandler的channelInactive()方法以及channelUnregistered()方法。并且,ChannelPipeline中的head在处理‘channelUnregistered’事件时除了将该事件传播给ChannelPipeline中的下一个ChannelInboundHandler外,还会触发一个destroy()操作
            public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelUnregistered();
    
                // Remove all handlers sequentially if channel is closed and unregistered.
                if (!channel.isOpen()) {
                    destroy();
                }
            }
    

    该destroy()操作会删除ChannelPipeline中的所有的handler(除了head、tail之外),并触发每个Handler的handlerRemoved()方法。注意,这里handler的移除操作是先顺序移除head到tail间所有的ChannelInboundHandler,然后在顺序移除tail到head间所有的ChannelOutboundHandler。




    ② 『confirmShutdown()』:
    protected boolean confirmShutdown() {
        if (!isShuttingDown()) {
            return false;
        }
    
        if (!inEventLoop()) {
            throw new IllegalStateException("must be invoked from an event loop");
        }
    
        cancelScheduledTasks();
    
        if (gracefulShutdownStartTime == 0) {
            gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
        }
    
        if (runAllTasks() || runShutdownHooks()) {
            if (isShutdown()) {
                // Executor shut down - no new tasks anymore.
                return true;
            }
    
            // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period or
            // terminate if the quiet period is 0.
            // See https://github.com/netty/netty/issues/4241
            if (gracefulShutdownQuietPeriod == 0) {
                return true;
            }
            wakeup(true);
            return false;
        }
    
        final long nanoTime = ScheduledFutureTask.nanoTime();
    
        if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
            return true;
        }
    
        if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
            // Check if any tasks were added to the queue every 100ms.
            // TODO: Change the behavior of takeTask() so that it returns on timeout.
            wakeup(true);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // Ignore
            }
    
            return false;
        }
    
        // No tasks were added for last quiet period - hopefully safe to shut down.
        // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
        return true;
    }
    

    首先,先简单的描述下『runAllTasks()』和『runShutdownHooks()』所会完成的操作:
    a) runAllTasks():首先会将已经到运行时间的定时/周期性任务放入taskQueue中,然后依次执行taskQueue中的任务。当且仅当taskQueue中的任务都执行完了,该方法会返回true,并且会将最后一个任务执行完后此时的系统时间赋值为成员变量lastExecutionTime;否则,如果该taskQueue中没有要执行的任务,那么该方法会返回false。
    b) runShutdownHooks():执行用户自定义的所有shutdownHook,比如我们通过(『nioEventloop.addShutdownHook(runnable)』方法来提交我们希望该NioEventLoop被关闭时所要执行的一些操作)。当shutdownHook都执行完了该方法会返回true,并且会在执行完最后一个showdownHook后将此时的系统时间赋值为成员变量lastExecutionTime;否则,如果没有任何需要执行的shutdownHook,即shutdownHooks集合为空,那么该方法将返回false。

    接下来,我们来判断在什么条件下confirmShutdown()方法将返回true,以至于可以退出NioEventLoop的事件循环,继续doStartThread()的后续操作以完成最后的优雅关闭流程。
    我们分两种情况来讨论:
    ① gracefulShutdownQuietPeriod == 0
    如果taskQueue中待执行的任务,或者有到期的定时/周期性任务,再或者有用户自定义的shutdownHook任务,那么会在执行完任务后退出confirmShutdown方法,并返回true;否则,如果没有任务待执行的任务,那么‘nanoTime - lastExecutionTime > gracefulShutdownQuietPeriod’也会使得confirmShutdown()方法退出,并返回true。

    ② gracefulShutdownQuietPeriod > 0

    1. 从『if (runAllTasks() || runShutdownHooks())』这个判断语句中,我们能够确保只有在taskQueue中所有的任务都被执行完了,并且shutdownHooks集合中所有的shutdownHook也都执行完了之后,这个判断语句才会返回false。也就是说,当该if语句返回false时,我们能够确保所有的任务和shutdownHook都已经执行完了。
    2. 『nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout』:接下来我们判断,执行完上面所有任务(包括taskQueue中的任务、可执行的定时/周期性任务、所有的shutdownHook任务)所需的时间是否已经操作了优雅关闭的超时时间(gracefulShutdownTimeout),如果已经超过了,那么则退出confirmShutdown方法,并返回true。否则,继续下面的步骤
    3. 『nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod』:如果‘当前时间距离最后一次执行任务的时间’小于等于’优雅退出的平静期(gracefulShutdownQuietPeriod)’。则使NioEventLoop线程睡眠100ms后,退出confirmShutdown方法,并返回false,这时说明关闭操作是未被批准的,那么NioEventLoop的事件循环并不会退出,并且会在下次事件循的最后再次调用confirmShutdown()方法进行关闭操作的确认,也就是会从新执行步骤1;否则,如果‘当前时间距离最后一次执行任务的时间’大于’优雅退出的平静期(gracefulShutdownQuietPeriod)’,则退出confirmShutdown方法,并返回true。此时说明,在一个优雅退出的平静期(gracefulShutdownQuietPeriod)内都没有任何的任务被提交至该NioEventLoop线程上,那么我们就有希望能够安全的进行关闭。为什么说是有希望了?这是因为我们实在没有办法保证在此时用户不会通过execute()来提交一个任务。


      我们用一个流程图来说明gracefulShutdownQuietPeriod、gracefulShutdownTimeout在confirmShutdown操作中起到的作用和关系(注意,下面并不是confirmShutdown()方法流程图):


      好了,在结束NioEventLoop的事件循环后,我们继续来看doStartThread()的后续操作。
      首先会将变量success设置为true,接下就是执行finally块中的代码了:
      ① 如果当前NioEventLoop线程的状态还不是处于关闭相关的状态的话,则通过自旋锁的方式将当前NioEventLoop线程的状态修改为’ST_SHUTTING_DOWN’。从我们当前优雅关闭的流程来说,当前NioEventLoop线程的此时就是ST_SHUTTING_DOWN了。
      ② 判断,如果NioEventLoop事件循环结束了,但是‘gracefulShutdownStartTime’成员变量却为0,则说明事件循环不是因为confirmShutdown()方法而导致的结束,那么就打印一个错误日志,告知当前的EventExecutor的实现是由问题的,因为事件循环的终止必须是通过调用confirmShutdown()方法来实现的,也就是说,事件循环能够正确退出,也就是因为关闭操作被确认了。
      ③ 此时会通过自旋锁的方式再次调用一次『confirmShutdown()』,以确保所有的NioEventLoop中taskQueue中所有的任务以及用户自定义的所有shutdownHook也都执行了。之后才会进行关闭操作。
      ④ cleanup():
        protected void cleanup() {
            try {
                selector.close();
            } catch (IOException e) {
                logger.warn("Failed to close a selector.", e);
            }
        }
    

    会将当前NioEventLoop所关联的Selector关闭。
    ⑤ 修改NioEventLoop线程的状态为’ST_TERMINATED’。注意,在此操作完成之后,所有提交至该NioEventLoop显示的任务都会被拒绝,也就是该NioEventLoop不会再接收任何的任务了。

    protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }
    
    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        return taskQueue.offer(task);
    }
    
    public boolean isShutdown() {
        return state >= ST_SHUTDOWN;
    }
    

    ⑥ threadLock.release():threadLock是一个初始化资源为0的信号量,此操作会使得信号量的资源+1。那么这种情况下,如果有用户操作了awaitTermination方法的话(该方法底层会通过『threadLock.tryAcquire(timeout, unit)』来阻塞的尝试获取信号量的资源),该方法就会结束阻塞并返回,当然它也可以因为设置的等待超时间已到而返回。
    ⑦ 此时会再次判断该NioEventLoop的taskQueue是否为空,如果为非空,只会打印警告日志,告知用户,当前NioEventLoop在退出时仍有未完成的任务。而这个任务可能是在步骤③完成后,步骤⑤完成之前,又有用户提交上来的。
    ⑧ 设置该优雅关闭异步操作为成功完成。

    后记

    好了,整个NioEventLoopGroup的整个优雅关闭流程就分析完了,一句简单『nioEventLoopGroup.shutdownGracefully()』操作背后竟然有着如此复杂的关闭流程,再次佩服Netty为我们将复杂的流程给封闭化,而提供最为简便的API供用户来更好更方便的去使用它。
    若文章有任何错误,望大家不吝指教:)

    相关文章

      网友评论

        本文标题:Netty 源码解析 ——— Netty 优雅关闭流程

        本文链接:https://www.haomeiwen.com/subject/roqrbxtx.html