美文网首页sofabolt
SOFABolt 源码分析17 - Heartbeat 心跳机制

SOFABolt 源码分析17 - Heartbeat 心跳机制

作者: 原水寒 | 来源:发表于2018-10-19 21:03 被阅读116次
    image.png

    如上图所示,SOFABolt 中与心跳机制相关的为绿色的类。

    类组成

    • 心跳命令
    • HeartbeatCommand:心跳请求命令
    • HeartbeatAckCommand:心跳响应命令
    • RpcHeartBeatProcessor:心跳命令处理器,处理 HeartbeatCommand 和 HeartbeatAckCommand 两种心跳请求
    • IdleStateHandler:Netty 提供的空闲检测处理器,在指定的时间内没有读或写请求,那么发布一个 IdleStateEvent 事件
    • ServerIdleHandler:服务端 IdleStateEvent 事件处理器,进行服务端空闲逻辑处理(SOFABolt 中服务端直接关闭连接)
    • HeartbeatHandler:客户端 IdleStateEvent 事件处理器,进行客户端空闲逻辑处理(SOFABolt 中客户端使用 RpcHeartbeatTrigger 进行空闲处理)
    • RpcHeartbeatTrigger:客户端真正的空闲处理器,也叫心跳触发器
    • ConnectionHeartbeatManager:用于开启或关闭 Connection 的心跳逻辑


      image.png

      注意:CommandHandler 和 HeartbeatTrigger 实例都属于 Protocol 实现类的一个属性,是在创建 Protocol 实现类的时候创建的。

    客户端基本流程

    1. 在 15s 内没有读或者写事件,IdleStateHandler 就会发布一个 IdleStateEvent 事件
    2. HeartbeatHandler 进行该事件的处理:
    • 首先从当前 Channel 的附属属性中获取相关的 ProtocolCode
    • 再从 ProtocolManager 中获取 ProtocolCode 的 Protocol 实现类
    • 再从 Protocol 实现类获取 HeartbeatTrigger 实例,最终调用该实例进行 IdleStateEvent 的处理
    1. HeartbeatTrigger 处理 IdleStateEvent 事件
    • 首先从当前 Channel 的附属属性中获取已经发送心跳但是没有接收到响应的次数 heartbeatTimes,如果 heartbeatTimes 已经大于 3 次,则直接关闭连接,否则
    • 从当前 Channel 的附属属性中获取心跳开关,如果关闭了心跳,则直接返回,表示对 IdleStateEvent 不做任何处理;如果开启了心跳
    • 创建心跳请求命令 HeartbeatCommand + 创建本次请求的 InvokeFuture 对象 + 将 InvokeFuture 对象加入到当前的 Connection 中

    InvokeFuture 中会设置心跳响应回调函数:当接收到了正常的心跳响应后,将 heartbeatTimes 置为 0;否则,将该连接的heartbeatTimes+1

    • 使用 Netty 发送 HeartbeatCommand 到服务端
    • 设置超时任务(1s内没有接收到心跳响应,则直接返回超时失败响应,实现快速失败)

    服务端基本流程

    1. 在 90s 内没有读或者写事件,IdleStateHandler 就会发布一个 IdleStateEvent 事件(如果客户端还正常,那么在 90s 内,会发送至少 6 次心跳,那么服务端将不会触发 IdleStateEvent 事件)
    2. ServerIdleHandler 进行该事件的处理:直接关闭连接

    心跳处理流程

    • 心跳请求的处理:服务端接收到 HeartbeatCommand 后,构造心跳响应 HeartbeatAckCommand,之后使用 Netty 返回 HeartbeatAckCommand 给客户端
    • 心跳响应的处理:客户端接收到 HeartbeatAckCommand 后,设置心跳响应消息到 InvokeFuture + 取消超时任务 + 执行 InvokeFuture 中的回调方法

    注意

    • 只有客户端会主动发送心跳请求;但是双端都会开启空闲检测
    • 心跳除了上述应用端提供的这种之外,还有 tcp 提供的 keepAlive

    一、客户端启动设置

    ============================== AbstractConnectionFactory ==============================
        // 客户端心跳处理器
        private final ChannelHandler heartbeatHandler = new HeartbeatHandler();
        public void init(final ConnectionEventHandler connectionEventHandler) {
            ...
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel channel) {
                    ...
                    // 心跳检测开关 -Dbolt.tcp.heartbeat.switch=true
                    boolean idleSwitch = ConfigManager.tcp_idle_switch();
                    if (idleSwitch) {
                        // 读或者写空闲,默认为 15s -Dbolt.tcp.heartbeat.interval=15000
                        pipeline.addLast("idleStateHandler", new IdleStateHandler(ConfigManager.tcp_idle(), ConfigManager.tcp_idle(), 0, TimeUnit.MILLISECONDS));
                        // 心跳处理器
                        pipeline.addLast("heartbeatHandler", heartbeatHandler);
                    }
                    ...
                }
            });
        }
    
    ============================== HeartbeatHandler ==============================
    @Sharable
    public class HeartbeatHandler extends ChannelDuplexHandler {
        @Override
        public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                ProtocolCode protocolCode = ctx.channel().attr(Connection.PROTOCOL).get();
                Protocol protocol = ProtocolManager.getProtocol(protocolCode);
                // 调用 HeartbeatTrigger 做真正的心跳处理业务
                protocol.getHeartbeatTrigger().heartbeatTriggered(ctx);
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
    

    二、服务端启动设置

    ============================== RpcServer ==============================
        protected void doInit() {
            ...
            // 开启心跳检测开关
            final boolean idleSwitch = ConfigManager.tcp_idle_switch();
            // 服务端的心跳空闲时间(默认90s)-Dbolt.tcp.server.idle.interval=90000
            final int idleTime = ConfigManager.tcp_server_idle();
            // 服务端心跳处理器(直接关闭连接)
            final ChannelHandler serverIdleHandler = new ServerIdleHandler();
            ...
            this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel channel) {
                    ...
                    if (idleSwitch) {
                        // 空闲检测处理器
                        pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime, TimeUnit.MILLISECONDS));
                        // 服务端心跳处理器
                        pipeline.addLast("serverIdleHandler", serverIdleHandler);
                    }
                    ...
                }
        }
    
    ============================== ServerIdleHandler ==============================
    @Sharable
    public class ServerIdleHandler extends ChannelDuplexHandler {
        @Override
        public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                // 关闭连接
                ctx.close();
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
    

    三、心跳触发器 HeartbeatTrigger(客户端)

    public interface HeartbeatTrigger {
        void heartbeatTriggered(final ChannelHandlerContext ctx) throws Exception;
    }
    
    public class RpcHeartbeatTrigger implements HeartbeatTrigger {
        // max trigger times,心跳最多多少次没响应,则关闭连接,默认为3
        // -Dbolt.tcp.heartbeat.maxtimes=3
        public static final Integer maxCount = ConfigManager.tcp_idle_maxtimes();
        // 心跳响应返回的超时时间,发送请求后1s内没有接收到响应就触发超时逻辑
        private static final long heartbeatTimeoutMillis = 1000;
    
        public void heartbeatTriggered(final ChannelHandlerContext ctx) throws Exception {
            // 已经心跳的次数,默认为0
            Integer heartbeatTimes = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
            final Connection conn = ctx.channel().attr(Connection.CONNECTION).get();
            // 心跳次数已经超过3次,直接关闭连接
            if (heartbeatTimes >= maxCount) {
                conn.close();
            } else {
                // 检测该连接的心跳开关是否打开(只针对当前的 Connection 实例,不是全局的)
                boolean heartbeatSwitch = ctx.channel().attr(Connection.HEARTBEAT_SWITCH).get();
                if (!heartbeatSwitch) {
                    return;
                }
                // 创建心跳命令
                final HeartbeatCommand heartbeat = new HeartbeatCommand();
                // 创建 InvokeFuture
                final InvokeFuture future = new DefaultInvokeFuture(heartbeat.getId(),
                    new InvokeCallbackListener() {
                        @Override
                        public void onResponse(InvokeFuture future) {
                            ResponseCommand response;
                            // 获取响应
                            response = (ResponseCommand) future.waitResponse(0);
                            if (response != null && response.getResponseStatus() == ResponseStatus.SUCCESS) {
                                // 接收到正常心跳响应,将该连接的已心跳次数置为0
                                ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(0);
                            } else {
                                // 接收到错误的心跳响应,将该连接的已心跳次数+1
                                Integer times = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
                                ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(times + 1);
                            }
                        }
                    }, null, heartbeat.getProtocolCode().getFirstByte(), this.commandFactory);
                final int heartbeatId = heartbeat.getId();
                // 将 InvokeFuture 加入连接
                conn.addInvokeFuture(future);
                // 发送 heartbeat
                ctx.writeAndFlush(heartbeat);
                // 设置超时任务(1s内没有接收到心跳响应,则直接返回超时失败响应,实现快速失败)
                TimerHolder.getTimer().newTimeout(new TimerTask() {
                    @Override
                    public void run(Timeout timeout) throws Exception {
                        InvokeFuture future = conn.removeInvokeFuture(heartbeatId);
                        if (future != null) {
                            // 构造超时响应
                            future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
                            // 回调
                            future.tryAsyncExecuteInvokeCallbackAbnormally();
                        }
                    }
                }, heartbeatTimeoutMillis, TimeUnit.MILLISECONDS);
            }
        }
    }
    

    四、心跳处理器 RpcHeartBeatProcessor

    public class RpcHeartBeatProcessor extends AbstractRemotingProcessor {
        public void doProcess(final RemotingContext ctx, RemotingCommand msg) {
            // 如果是心跳请求
            if (msg instanceof HeartbeatCommand) {
                final int id = msg.getId();
                // 构造心跳响应
                HeartbeatAckCommand ack = new HeartbeatAckCommand();
                ack.setId(id);
                // 发送心跳响应
                ctx.writeAndFlush(ack);
            } else if (msg instanceof HeartbeatAckCommand) { // 如果是心跳响应
                Connection conn = ctx.getChannelContext().channel().attr(Connection.CONNECTION).get();
                InvokeFuture future = conn.removeInvokeFuture(msg.getId());
                // 设置心跳响应消息
                future.putResponse(msg);
                // 取消超时任务
                future.cancelTimeout();
                // 回调
                future.executeInvokeCallback();
            } else {
                throw new RuntimeException("Cannot process command: " + msg.getClass().getName());
            }
        }
    }
    
    

    注意:心跳的处理和响应消息的处理都是一样的,如果 RemotingProcessor#executor 存在,则使用该线程池执行,否则使用 ProcessorManager#defaultExecutor 执行

    五、全局心跳开关和指定连接心跳开关

    全局心跳开关

    SOFABolt 提供了一个全局开关:在客户端和服务端启动设置的过程中,已经看到了 boolean idleSwitch = ConfigManager.tcp_idle_switch()
    该值是通过系统属性进行设置的:-Dbolt.tcp.heartbeat.switch=true,所以是多实例共享的。全局心跳开关是一个静态开关

    指定连接心跳开关

    注意:只有当全局开关打开的时候,指定连接心跳开关才起作用。实际上,指定连接心跳开关通常只用来关闭指定 Connection 上发送心跳消息。指定连接心跳开关由 ConnectionHeartbeatManager 提供的两个接口来设定。指定连接心跳开关是一个动态开关,即运行时开关

    public interface ConnectionHeartbeatManager {
        void disableHeartbeat(Connection connection);
        void enableHeartbeat(Connection connection);
    }
    
    public class DefaultConnectionManager implements ConnectionManager, ConnectionHeartbeatManager
        public void disableHeartbeat(Connection connection) {
            if (null != connection) {
                // 向 Connection 中的 Channel 添加附属属性:Connection.HEARTBEAT_SWITCH,后续在 RpcHeartbeatTrigger 中会判断该附属属性,如果是 false,则不再处理 IdleStateEvent,即不再发送心跳请求消息。
                connection.getChannel().attr(Connection.HEARTBEAT_SWITCH).set(false);
            }
        }
    
        public void enableHeartbeat(Connection connection) {
            if (null != connection) {
                connection.getChannel().attr(Connection.HEARTBEAT_SWITCH).set(true);
            }
        }
    }
    
    ============================== 程序使用入口 ==============================
    public class RpcClient extends AbstractConfigurableInstance {
        public void enableConnHeartbeat(String addr) {
            Url url = this.addressParser.parse(addr);
            this.enableConnHeartbeat(url);
        }
    
        public void enableConnHeartbeat(Url url) {
            if (null != url) {
                this.connectionManager.enableHeartbeat(this.connectionManager.get(url.getUniqueKey()));
            }
        }
    
        public void disableConnHeartbeat(String addr) {
            Url url = this.addressParser.parse(addr);
            this.disableConnHeartbeat(url);
        }
    
        public void disableConnHeartbeat(Url url) {
            if (null != url) {
                this.connectionManager.disableHeartbeat(this.connectionManager.get(url.getUniqueKey()));
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:SOFABolt 源码分析17 - Heartbeat 心跳机制

        本文链接:https://www.haomeiwen.com/subject/ncsnoftx.html