美文网首页
Netty心跳检测代码实例及源码分析

Netty心跳检测代码实例及源码分析

作者: Zak1 | 来源:发表于2020-08-04 01:32 被阅读0次

背景:今天在研读项目netty相关代码时,发现有设备有心跳机制(尽管在本项目中没啥左右),本着要不试一下的方式,调用下Netty提供的IdleStatHandler这个handler来实现一下心跳检测功能。

  • 尝试:
  1. 在网上搜索了一下netty的心跳检测api,光看到IdleStatHandler就直接下手写代码了,想着也就一套调用链的方式,写完测一下没问题就ok了,便写下了如下代码:

    Netty服务端代码:

    public class MyServer {
        public static void main(String[] args) {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
    
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap
                        .group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .handler(new LoggingHandler(LogLevel.INFO)) 
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline channelPipeline = ch.pipeline();
                                channelPipeline.addLast(new HeartBeatHandler(3, 0, 0));
                                channelPipeline.addLast(new MyServerHandler());
                     
                            }
                        });
    
    
                ChannelFuture channelFuture = serverBootstrap.bind(10005).sync();
                channelFuture.channel().closeFuture().sync();
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    ------------------------------------------------------------------------
    public class HeartBeatHandler  extends IdleStateHandler {
        public HeartBeatHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
            super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds);
        }
        @Override
        public void read(ChannelHandlerContext ctx) throws Exception {
            System.out.println("HeartBeatHandler----->"+ctx);
            super.read(ctx);
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            System.out.println("HeartBeatHandler 中的userEventTriggered被触发");
            //空闲状态转换
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
                String evenType = null;
    
                switch (idleStateEvent.state()) {
                    case READER_IDLE:
                        evenType = "读空闲";
                        break;
                    case WRITER_IDLE:
                        evenType = "写空闲";
                        break;
                    case ALL_IDLE:
                        evenType = "读写空闲";
                        break;
                }
                System.out.println(ctx.channel().remoteAddress() + "超时事件:" + evenType);
            }
        }
    }
    ---------------------------
    public class MyServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println(msg);
            super.channelRead(ctx, msg);
        }
    }
    

    Socket测试代码:

    public class Test {
        public static void socketTest() throws IOException, InterruptedException {
            Socket socket=new Socket("127.0.0.1",10005);
            PrintWriter pw = new PrintWriter(socket.getOutputStream());
            for (int i=0;i<100;i++){
                pw.println("HelloWorld");
                pw.flush();
                TimeUnit.SECONDS.sleep(5);
            }
            pw.close();
            socket.close();
        }
        public static void main(String[] args) throws IOException, InterruptedException {
            socketTest();
        }
    }
    
    1. 开始自信的运行代码,结果发现光顾着输出helloworld相关的内容了(为什么不直接是helloWorld,因为这里没有做编解码操作,这不是本文讨论重点)

    2. 尝试百度,stackoverflow,也没能查到原由,也没能看到示例代码,基本给的解决方案都是指将IdlestatHandler调用链放置在第一位置(我本来就这样放的ORZ),顺便吐槽一下csdn : )

  • 一探究竟

    心有不死,虽不是项目必须实现功能,但是勾起了好奇心,这不得探个究竟怎么睡得着。

    1. Debug 调试:

      跟着断点一步一步进入调用方法链(这里只列出核心代码):

             //当数据链上的handler中的channelRead方法被调用时,reading 标志位-->true
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
                  reading = true;
                  firstReaderIdleEvent = firstAllIdleEvent = true;
              }
              ctx.fireChannelRead(msg);
          }
             //根据构造函数中的三个参数设定时间 readerIdleTimeSeconds writerIdleTimeSeconds allIdleTimeSeconds 
             //IdleStatHandler会启动对应的检测线程,这里以读超时距离,线程通过判断是否已读,以及是否超时组合判断是否需要调用userEventTriggered()函数,这里只提供超时检测线程代码,其他生成相关检测池代码可自行断点调试.
              protected void run(ChannelHandlerContext ctx) {
                  long nextDelay = allIdleTimeNanos;
                  if (!reading) {
                      nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
                  }
                  if (nextDelay <= 0) {
                      allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
                      boolean first = firstAllIdleEvent;
                      firstAllIdleEvent = false;
                      try {
                          //进入触发调用流程
                          channelIdle(ctx, newIdleStateEvent(IdleState.ALL_IDLE, first));
                      } catch (Throwable t) {
                          ctx.fireExceptionCaught(t);
                      }
                  } 
              }
         
             //以下三段代码为触发调用流程中的代码:
          protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
              ctx.fireUserEventTriggered(evt);
          }
      
          @Override
          public ChannelHandlerContext fireUserEventTriggered(final Object event) {
              invokeUserEventTriggered(findContextInbound(), event);
              return this;
          }
             //把这里读懂就明白了为什么上面的示例代码无法正常触发userEventTriggered()函数了
          private AbstractChannelHandlerContext findContextInbound() {
              AbstractChannelHandlerContext ctx = this;
             //此处的do方法无论条件如何都会先进行一次向后传递,变成next值
             //因此示例代码中的HeartBeatHandler()虽然存在userEventTriggered(),但是在这个函数中,找的是下一个Handler的ChannelHandlerContext,那可以猜想一下,如果此时MyServerHandler()复写了userEventTriggered(),会被触发吗?
              do {
                  ctx = ctx.next;
              } while (!ctx.inbound);
              return ctx;
          }
      
      
             //注意在上面的fireUserEventTriggered()函数中,最外层函数是本函数,稍微读一下,也能看出来,这是一个递归函数不断的进行链式递归,直到满足上面的 ctx.inbound=true 即handler处理链中的Inbound已经被调用完毕(如果存在userEventTriggered()的话),文章最后会提供channelPipeline的Handler调用图。
          static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
              ObjectUtil.checkNotNull(event, "event");
              EventExecutor executor = next.executor();
              if (executor.inEventLoop()) {
                  next.invokeUserEventTriggered(event);
              } else {
                  executor.execute(new Runnable() {
                      @Override
                      public void run() {
                          next.invokeUserEventTriggered(event);
                      }
                  });
              }
          }
      
      
    2. debug结束,思路大概理清了,Idlestathandler通过新开线程来进行耗时检测,通过耗时配合表示位,来决定是否调用userEventTriggered()函数,并且在findContextInbound由于使用的是do while循环,所以是不会出现调用自己本身的情况,采用这样的编写方式我想不仅仅是不调用自身的触发函数,而是在Inbound找寻到最深处时,可以将ctx自动转换为outbound相关的handlerContext.最后使用递归函数不断递归inbound链,进行链式调用,所有该链上的handler的userEventTriggered() 都将被调用(当然,除了第一个,因为 do while的原因 : )

    3. 分析完毕,编写新的调用代码示例。

      //避免代码重复,只提供调用链代码,其他代码不变
      ChannelPipeline channelPipeline = ch.pipeline();
      channelPipeline.addLast(new HeartBeatHandler(5, 0, 0));
      channelPipeline.addLast(new MyServerHandler());
      //偷个懒加个匿名内部类
      channelPipeline.addLast(new ChannelInboundHandlerAdapter(){
         @Override
         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws    Exception {
            System.out.println("last Trigger触发");
            super.userEventTriggered(ctx, evt);
        }
      ---------------
        //注意这里的MyServerHandler复写了触发函数,用来观察是否被触发
      public class MyServerHandler extends ChannelInboundHandlerAdapter {
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              System.out.println(msg);
              super.channelRead(ctx, msg);
          }
          @Override
          public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
              System.out.println("MyServerHandler userEventTriggered 触发");
              super.userEventTriggered(ctx,evt);
          }
      }
        
       //最后,对于后两个调用链的代码,读者可以自行替换位置,尝试运行,观察输出效果,相信一定会对IdlestatHandler触发流程有更深刻的了解。
       
      

ChannelPipeline handler调用图:

*  +---------------------------------------------------+---------------+
*  |                           ChannelPipeline         |               |
*  |                                                  \|/              |
*  |    +---------------------+            +-----------+----------+    |
*  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  |               |                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  .               |
*  |               .                                   .               |
*  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
*  |        [ method call]                       [method call]         |
*  |               .                                   .               |
*  |               .                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  |               |                                  \|/              |
*  |    +----------+----------+            +-----------+----------+    |
*  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
*  |    +----------+----------+            +-----------+----------+    |
*  |              /|\                                  |               |
*  +---------------+-----------------------------------+---------------+
*                  |                                  \|/
*  +---------------+-----------------------------------+---------------+
*  |               |                                   |               |
*  |       [ Socket.read() ]                    [ Socket.write() ]     |
*  |                                                                   |
*  |  Netty Internal I/O Threads (Transport Implementation)            |
*  +-------------------------------------------------------------------+
  • 总结:保持好奇心,别怕失败,不要气馁。

相关文章

网友评论

      本文标题:Netty心跳检测代码实例及源码分析

      本文链接:https://www.haomeiwen.com/subject/pkdirktx.html