美文网首页
Netty集成心跳检测及其源码

Netty集成心跳检测及其源码

作者: hcq0514 | 来源:发表于2021-03-15 15:22 被阅读0次
    netty集成心跳检测
    1. 需要在pipline里面添加netty自带的心跳检测
    // 三个参数,第一个是当多少s没有读的时候执行读空闲,
    // 第二个是当多少秒没有写的时候执行写空闲
    // 第三个是当多少秒都没用读写的时候执行读写空闲
    pipeline.addLast(new IdleStateHandler(1,3,5));
    
    1. 实现一个方法来处理当发生上述几种空闲的情况并添加到pipline里
    public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
    
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;//强制类型转化
                if (event.state() == IdleState.READER_IDLE) {
    //当发生读空闲时会执行这个
    //                System.out.println("进入读空闲......");
                } else if (event.state() == IdleState.WRITER_IDLE) {
    //当发生写空闲时会执行这个
    //                System.out.println("进入写空闲......");
                } else if (event.state() == IdleState.ALL_IDLE) {
    //当发生读写空闲时会执行这个
                    Channel channel = ctx.channel();
                    //资源释放
                    channel.close();
                }
            }
        }
    }
    
    添加HeartBeatHandler到server的pipline里面
    pipeline.addLast(new HeartBeatHandler());
    
    源码分析

    netty为什么添加了一个IdleStateHandler的拦截器之后就可以检测了呢 我们先从IdleStateHandler看起

        public IdleStateHandler(
                int readerIdleTimeSeconds,
                int writerIdleTimeSeconds,
                int allIdleTimeSeconds) {
    
            this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
                 TimeUnit.SECONDS);
        }
    
    这边主要就是初始化我们传进来的数赋值给readerIdleTime等
        public IdleStateHandler(boolean observeOutput,
                long readerIdleTime, long writerIdleTime, long allIdleTime,
                TimeUnit unit) {
            this.observeOutput = observeOutput;
            if (readerIdleTime <= 0) {
                readerIdleTimeNanos = 0;
            } else {
                readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
            }
            if (writerIdleTime <= 0) {
                writerIdleTimeNanos = 0;
            } else {
                writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
            }
            if (allIdleTime <= 0) {
                allIdleTimeNanos = 0;
            } else {
                allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
            }
        }
    当初始化好之后,此时我们已经把该handler添加到服务端的pipline中,
    当服务端初始化的时候会执行该handler的channelRegistered,channelActive等方法
    这边都会调用到IdleStateHandler的initialize方法,我们看一下这个方法
    
        private void initialize(ChannelHandlerContext ctx) {
    //这个state主要是用来判断是否初始化
            switch (state) {
            case 1:
            case 2:
                return;
            }
    
            state = 1;
            initOutputChanged(ctx);
    //这边这个是干啥的
            lastReadTime = lastWriteTime = ticksInNanos();
    //初始化三个定时任务来检测,这边是关键,我们下面来看看这几个定时任务做了啥事
            if (readerIdleTimeNanos > 0) {
                readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                        readerIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
            if (writerIdleTimeNanos > 0) {
                writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                        writerIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
            if (allIdleTimeNanos > 0) {
                allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                        allIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
        }
    
    • 三个的实现方法基本一致,我们以ReaderIdleTimeoutTask为例
        private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
    
            ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
                //主要就是this.ctx = ctx;赋值一下
                super(ctx);
            }
    
            @Override
            protected void run(ChannelHandlerContext ctx) {
                //我们配置的那个超时时间
                long nextDelay = readerIdleTimeNanos;
                if (!reading) {
                //计算最后一次的读时间到现在是否超过了我们设置的值
                // nextDelay = nextDelay  - (ticksInNanos() - lastReadTime);
                    nextDelay -= ticksInNanos() - lastReadTime;
                }
                //空闲时间已经超过我们设置的值
                if (nextDelay <= 0) {
                    //重新执行该定时任务
                    readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
                    //判断是否为第一次发生空闲事件
                    boolean first = firstReaderIdleEvent;
                    firstReaderIdleEvent = false;
                    try {
                        //新建一个IdleStateEvent事件
                        IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                        //这个方法是触发我们上面写的处理心跳的方法userEventTriggered
                        channelIdle(ctx, event);
                    } catch (Throwable t) {
                        ctx.fireExceptionCaught(t);
                    }
                } else {
                    //如果没有空闲,则重新执行该定时任务
                    readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
                }
            }
        }
    
    包装成一个IdleStateEvent事件
        protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
            switch (state) {
                case ALL_IDLE:
                    return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
                case READER_IDLE:
                    return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
                case WRITER_IDLE:
                    return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
                default:
                    throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
            }
        }
    
    主要是这个方法调用
        protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
            ctx.fireUserEventTriggered(evt);
        }
    
    用MASK_USER_EVENT_TRIGGERED来查找哪个是实现类
        @Override
        public ChannelHandlerContext fireUserEventTriggered(final Object event) {
            invokeUserEventTriggered(findContextInbound(MASK_USER_EVENT_TRIGGERED), event);
            return this;
        }
    
    用MASK_USER_EVENT_TRIGGERED来查找哪个是实现类,handler初始化时有根据是否有该方法有执行这个
        private AbstractChannelHandlerContext findContextInbound(int mask) {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;
            } while ((ctx.executionMask & mask) == 0);
            return ctx;
        }
    
    
        static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
            ObjectUtil.checkNotNull(event, "event");
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeUserEventTriggered(event);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeUserEventTriggered(event);
                    }
                });
            }
        }
    
    主要是在这边,调用真正的实现方法
        private void invokeUserEventTriggered(Object event) {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireUserEventTriggered(event);
            }
        }
    
    • 其他写空闲与读写空闲与读空闲几乎都是一样,只是事件不一样

    相关文章

      网友评论

          本文标题:Netty集成心跳检测及其源码

          本文链接:https://www.haomeiwen.com/subject/gvahcltx.html