美文网首页sofabolt
SOFABolt 源码分析22 - 优雅停机机制的设计

SOFABolt 源码分析22 - 优雅停机机制的设计

作者: 原水寒 | 来源:发表于2018-10-22 09:41 被阅读96次

    SOFABolt 的停机机制分为服务端 RpcServer 停机 和客户端 RpcClient 停机。

    • RpcServer 停机步骤
    1. 关闭 netty 服务端 boss channel
    2. 关闭 bossGroup,workerGroup 不能关掉(因为 workerGroup 是多个 RpcServer 共享的)
    3. 如果服务端连接管理器是开启的,遍历所有的连接池 ConnectionPool,关闭池内的所有 Connection(进而关闭 Connection 内部的 netty 服务端 worker channel)
    • RpcClient 停机步骤
    1. 遍历所有的连接池 ConnectionPool,关闭池内的所有 Connection(进而关闭 Connection 内部的 netty 服务端 worker channel)
    2. 关闭 Scannable 任务扫描线程
    3. 关闭重连线程 + 清空重连任务
    4. 取消连接监听器内的任务 + 销毁连接监听器的线程池

    服务端停机 RpcServer

    ======================================= AbstractRemotingServer =======================================
        public boolean stop() {
            if (started.compareAndSet(true, false)) {
                return this.doStop();
            } else {
                throw new IllegalStateException("ERROR: The server has already stopped!");
            }
        }
    
    ======================================= RpcServer =======================================
        protected boolean doStop() {
            // 1. 关闭 netty 服务端 boss channel
            if (null != this.channelFuture) {
                this.channelFuture.channel().close();
            }
            // 2. 关闭 bossGroup,workerGroup 不能关掉(因为是共享的)
            if (this.switches().isOn(GlobalSwitch.SERVER_SYNC_STOP)) {
                this.bossGroup.shutdownGracefully().awaitUninterruptibly();
            } else {
                this.bossGroup.shutdownGracefully();
            }
            // 3. 如果服务端连接管理器是开启的,遍历所有的连接池 ConnectionPool,关闭连接(内部关闭 netty 服务端 worker channel)
            if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH) && null != this.connectionManager) {
                this.connectionManager.removeAll();
            }
            return true;
        }
    
    ======================================= DefaultConnectionManager =======================================
        // connection pool initialize tasks
        protected ConcurrentHashMap<String, RunStateRecordedFutureTask<ConnectionPool>> connTasks;
    
        public void removeAll() {
            if (null == this.connTasks || this.connTasks.isEmpty()) {
                return;
            }
            if (null != this.connTasks && !this.connTasks.isEmpty()) {
                Iterator<String> iter = this.connTasks.keySet().iterator();
                // 遍历所有的连接池 ConnectionPool,关闭所有 Connection(包括 Connection 内的 netty worker channel)
                while (iter.hasNext()) {
                    String poolKey = iter.next();
                    this.removeTask(poolKey);
                    iter.remove();
                }
            }
        }
    
        private void removeTask(String poolKey) {
            RunStateRecordedFutureTask<ConnectionPool> task = this.connTasks.remove(poolKey);
            if (null != task) {
                ConnectionPool pool = FutureTaskUtil.getFutureTaskResult(task, logger);
                if (null != pool) {
                    pool.removeAllAndTryClose();
                }
            }
        }
    
    ======================================= ConnectionPool =======================================
        // remove all connections
        public void removeAllAndTryClose() {
            for (Connection conn : this.conns) {
                removeAndTryClose(conn);
            }
            this.conns.clear();
        }
    
        public void removeAndTryClose(Connection connection) {
            if (null == connection) {
                return;
            }
            boolean res = this.conns.remove(connection);
            if (res) {
                connection.decreaseRef();
            }
            if (connection.noRef()) {
                connection.close();
            }
        }
    
    ======================================= Connection =======================================
        // Close the connection.
        public void close() {
            if (closed.compareAndSet(false, true)) {
                if (this.getChannel() != null) {
                    this.getChannel().close();
                }
            }
        }
    

    客户端停机 RpcClient

    ======================================= RpcClient =======================================
        public void shutdown() {
            // 1. 遍历所有的连接池 ConnectionPool,关闭池内的所有 Connection(进而关闭 Connection 内部的 netty 服务端 worker channel)
            // 这一步与服务端开启了连接管理一样,不再赘述
            this.connectionManager.removeAll();
            // 2. 关闭 Scannable 任务扫描线程
            this.taskScanner.shutdown();
            // 3. 关闭重连线程 + 清空重连任务
            if (reconnectManager != null) {
                reconnectManager.stop();
            }
            // 4. 销毁连接监听器的线程池
            if (connectionMonitor != null) {
                connectionMonitor.destroy();
            }
        }
    
    ======================================= RpcTaskScanner =======================================
        private ScheduledExecutorService scheduledService = new ScheduledThreadPoolExecutor(1,new NamedThreadFactory("RpcTaskScannerThread", true));
    
        // Shutdown the scheduled service.
        public void shutdown() {
            scheduledService.shutdown();
        }
    
    ======================================= ReconnectManager =======================================
        private final Thread healConnectionThreads; // 重连线程
        // stop reconnect thread
        public void stop() {
            if (!this.started) {
                return;
            }
            this.started = false;
            healConnectionThreads.interrupt();
            this.tasks.clear();
            this.canceled.clear();
        }
    
    ======================================= DefaultConnectionMonitor =======================================
        private ScheduledThreadPoolExecutor executor;
        // cancel task and shutdown executor
        public void destroy() {
            // Tries to remove from the work queue all Future tasks that have been cancelled
            executor.purge();
            executor.shutdown();
        }
    
    

    相关文章

      网友评论

        本文标题:SOFABolt 源码分析22 - 优雅停机机制的设计

        本文链接:https://www.haomeiwen.com/subject/qdiqzftx.html