美文网首页
sofa-bolt 心跳检测

sofa-bolt 心跳检测

作者: heyong | 来源:发表于2019-06-14 14:29 被阅读0次

    一、概述

    soft-bolt心跳机制是基于Netty,因此在分析soft-bolt的心跳之前,先分析一下netty心跳实现。

    二、netty心跳实现

    Netty提供了IdleStateHandler类,用于支持心跳检查,其构造参数

    /**
     * Creates a new instance firing {@link IdleStateEvent}s.
     *
     * @param readerIdleTime 读超时时间
     * @param writerIdleTime 写超时时间
     * @param allIdleTime    读写超时时间
     * @param unit           时间单位
     */
    public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) 
    

    该类是一个ChannelHandler,需要加入到ChannelPipeline里面,参考代码如下:

    this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    ​
        @Override
        protected void initChannel(SocketChannel channel) {
            ChannelPipeline pipeline = channel.pipeline();
            pipeline.addLast("decoder", codec.newDecoder());
            pipeline.addLast("encoder", codec.newEncoder());
            if (idleSwitch) {
                pipeline.addLast("idleStateHandler", new IdleStateHandler(5, 0, 0, TimeUnit.MILLISECONDS));
                pipeline.addLast("serverIdleHandler", serverIdleHandler);
            }
            pipeline.addLast("connectionEventHandler", connectionEventHandler);
            pipeline.addLast("handler", rpcHandler);
            createConnection(channel);
        }
    

    在channel链中加入了IdleSateHandler,第一个参数是5,单位是秒,服务器端会每隔5秒来检查一下channelRead方法被调用的情况,如果在5秒内该链上的channelRead方法都没有被触发,就会调用userEventTriggered方法 ,下面看一下IdleStateHandler中的channelRead方法。

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
            reading = true;
            firstReaderIdleEvent = firstAllIdleEvent = true;
        }
        ctx.fireChannelRead(msg);
    }
    

    该方法只是记录了一下调用时间,然后将请求往下透传,接下来看一下channelActive方法。

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // This method will be invoked only if this handler was added
        // before channelActive() event is fired.  If a user adds this handler
        // after the channelActive() event, initialize() will be called by beforeAdd().
        initialize(ctx);
        super.channelActive(ctx);
    }
    

    在客户端与服务端建立连接以后,会调用channelActive方法,在IdleSateHandler的channelActive方法中调用initialize()方法进行连接心跳的初始化操作,具体实现如下:

    private void initialize(ChannelHandlerContext ctx) {
            // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        switch (state) {
        case 1:
        case 2:
            return;
        }
    ​
        state = 1;
        initOutputChanged(ctx);
    ​
        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
            //创建定时任务处理读超时
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            //创建定时任务,处理写超时
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            // 创建定时任务,处理读写超时
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }
    

    上面启动了定时任务,来处理心跳问题,下面具体来分析ReaderIdleTimeoutTask定时任务做了什么操作?

    protected void run(ChannelHandlerContext ctx) {
        long nextDelay = readerIdleTimeNanos;
        if (!reading) {
            nextDelay -= ticksInNanos() - lastReadTime;
        }
    ​
        if (nextDelay <= 0) {
            // Reader is idle - set a new timeout and notify the callback.
            readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    ​
            boolean first = firstReaderIdleEvent;
            firstReaderIdleEvent = false;
    ​
            try {
                IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                channelIdle(ctx, event);
            } catch (Throwable t) {
                ctx.fireExceptionCaught(t);
            }
        } else {
            // Read occurred before the timeout - set a new timeout with shorter delay.
            readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        }
    }
    

    当前时间减去最后一次channelRead方法调用的时间,如果该时间间隔大于设置的读超时时间,就触发读空闲时间,并且创建定时任务继续检查。

    上面的代码分析了读超时问题,写超时和读写超时的代码类似,可以自行分析。

    三、soft-bolt 心跳实现

    在soft-bolt实现了通过HeartbeatHandler来处理连接心跳。具体实现如下:

    public class HeartbeatHandler extends ChannelDuplexHandler {
        public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                ProtocolCode protocolCode = ctx.channel().attr(Connection.PROTOCOL).get();
                Protocol protocol = ProtocolManager.getProtocol(protocolCode);
                protocol.getHeartbeatTrigger().heartbeatTriggered(ctx);
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
    

    上面判断如果事件类型IdleStateEvent,马上就进行心跳处理,心跳处理是heartbeatTriggered方法实现的

    public void heartbeatTriggered(final ChannelHandlerContext ctx) throws Exception {
        Integer heartbeatTimes = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
        final Connection conn = ctx.channel().attr(Connection.CONNECTION).get();
      
        //如果心跳超过设定次数没有响应,就断开连接
        if (heartbeatTimes >= maxCount) {
            try {
                conn.close();
                logger.error(
                    "Heartbeat failed for {} times, close the connection from client side: {} ",
                    heartbeatTimes, RemotingUtil.parseRemoteAddress(ctx.channel()));
            } catch (Exception e) {
                logger.warn("Exception caught when closing connection in SharableHandler.", e);
            }
        } else {
            boolean heartbeatSwitch = ctx.channel().attr(Connection.HEARTBEAT_SWITCH).get();
            if (!heartbeatSwitch) {
                return;
            }
            final HeartbeatCommand heartbeat = new HeartbeatCommand();
            //添加回调listener
            final InvokeFuture future = new DefaultInvokeFuture(heartbeat.getId(),
                new InvokeCallbackListener() {
                    @Override
                    public void onResponse(InvokeFuture future) {
                        ResponseCommand response;
                        try {
                            response = (ResponseCommand) future.waitResponse(0);
                        } catch (InterruptedException e) {
                            logger.error("Heartbeat ack process error! Id={}, from remoteAddr={}",
                                heartbeat.getId(), RemotingUtil.parseRemoteAddress(ctx.channel()),
                                e);
                            return;
                        }
                        if (response != null
                            && response.getResponseStatus() == ResponseStatus.SUCCESS) {
                            if (logger.isDebugEnabled()) {
                                logger.debug("Heartbeat ack received! Id={}, from remoteAddr={}",
                                    response.getId(),
                                    RemotingUtil.parseRemoteAddress(ctx.channel()));
                            }
                            // 如果心跳请求被成功响应,设置心跳次数为0
                            ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(0);
                        } else {
                            if (response == null) {
                                logger.error("Heartbeat timeout! The address is {}",
                                    RemotingUtil.parseRemoteAddress(ctx.channel()));
                            } else {
                                logger.error(
                                    "Heartbeat exception caught! Error code={}, The address is {}",
                                    response.getResponseStatus(),
                                    RemotingUtil.parseRemoteAddress(ctx.channel()));
                            }
                            // 心跳请求响应异常或者超时,心跳次数加1
                            Integer times = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
                            ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(times + 1);
                        }
                    }
                    @Override
                    public String getRemoteAddress() {
                        return ctx.channel().remoteAddress().toString();
                    }
                }, null, heartbeat.getProtocolCode().getFirstByte(), this.commandFactory);
            final int heartbeatId = heartbeat.getId();
            conn.addInvokeFuture(future);
            if (logger.isDebugEnabled()) {
                logger.debug("Send heartbeat, successive count={}, Id={}, to remoteAddr={}",
                    heartbeatTimes, heartbeatId, RemotingUtil.parseRemoteAddress(ctx.channel()));
            }
            // 发送心跳请求
            ctx.writeAndFlush(heartbeat).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        if (logger.isDebugEnabled()) {
                            logger.debug("Send heartbeat done! Id={}, to remoteAddr={}",
                                heartbeatId, RemotingUtil.parseRemoteAddress(ctx.channel()));
                        }
                    } else {
                        logger.error("Send heartbeat failed! Id={}, to remoteAddr={}", heartbeatId,
                            RemotingUtil.parseRemoteAddress(ctx.channel()));
                    }
                }
            });
            // 处理心跳请求超时
            TimerHolder.getTimer().newTimeout(new TimerTask() {
                @Override
                public void run(Timeout timeout) throws Exception {
                    InvokeFuture future = conn.removeInvokeFuture(heartbeatId);
                    if (future != null) {
                        future.putResponse(commandFactory.createTimeoutResponse(conn
                            .getRemoteAddress()));
                        future.tryAsyncExecuteInvokeCallbackAbnormally();
                    }
                }
            }, heartbeatTimeoutMillis, TimeUnit.MILLISECONDS);
        }
    }
    

    相关文章

      网友评论

          本文标题:sofa-bolt 心跳检测

          本文链接:https://www.haomeiwen.com/subject/oizufctx.html