美文网首页
Netty心跳检测代码实例及源码分析

Netty心跳检测代码实例及源码分析

作者: Zak1 | 来源:发表于2020-08-04 01:32 被阅读0次

    背景:今天在研读项目netty相关代码时,发现有设备有心跳机制(尽管在本项目中没啥左右),本着要不试一下的方式,调用下Netty提供的IdleStatHandler这个handler来实现一下心跳检测功能。

    • 尝试:
    1. 在网上搜索了一下netty的心跳检测api,光看到IdleStatHandler就直接下手写代码了,想着也就一套调用链的方式,写完测一下没问题就ok了,便写下了如下代码:

      Netty服务端代码:

      public class MyServer {
          public static void main(String[] args) {
              EventLoopGroup bossGroup = new NioEventLoopGroup();
              EventLoopGroup workerGroup = new NioEventLoopGroup();
      
              try {
      
                  ServerBootstrap serverBootstrap = new ServerBootstrap();
                  serverBootstrap
                          .group(bossGroup, workerGroup)
                          .channel(NioServerSocketChannel.class)
                          .handler(new LoggingHandler(LogLevel.INFO)) 
                          .childHandler(new ChannelInitializer<SocketChannel>() {
                              @Override
                              protected void initChannel(SocketChannel ch) throws Exception {
                                  ChannelPipeline channelPipeline = ch.pipeline();
                                  channelPipeline.addLast(new HeartBeatHandler(3, 0, 0));
                                  channelPipeline.addLast(new MyServerHandler());
                       
                              }
                          });
      
      
                  ChannelFuture channelFuture = serverBootstrap.bind(10005).sync();
                  channelFuture.channel().closeFuture().sync();
      
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } finally {
                  bossGroup.shutdownGracefully();
                  workerGroup.shutdownGracefully();
              }
          }
      }
      ------------------------------------------------------------------------
      public class HeartBeatHandler  extends IdleStateHandler {
          public HeartBeatHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
              super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds);
          }
          @Override
          public void read(ChannelHandlerContext ctx) throws Exception {
              System.out.println("HeartBeatHandler----->"+ctx);
              super.read(ctx);
          }
      
          @Override
          public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
              System.out.println("HeartBeatHandler 中的userEventTriggered被触发");
              //空闲状态转换
              if (evt instanceof IdleStateEvent) {
                  IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
                  String evenType = null;
      
                  switch (idleStateEvent.state()) {
                      case READER_IDLE:
                          evenType = "读空闲";
                          break;
                      case WRITER_IDLE:
                          evenType = "写空闲";
                          break;
                      case ALL_IDLE:
                          evenType = "读写空闲";
                          break;
                  }
                  System.out.println(ctx.channel().remoteAddress() + "超时事件:" + evenType);
              }
          }
      }
      ---------------------------
      public class MyServerHandler extends ChannelInboundHandlerAdapter {
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              System.out.println(msg);
              super.channelRead(ctx, msg);
          }
      }
      

      Socket测试代码:

      public class Test {
          public static void socketTest() throws IOException, InterruptedException {
              Socket socket=new Socket("127.0.0.1",10005);
              PrintWriter pw = new PrintWriter(socket.getOutputStream());
              for (int i=0;i<100;i++){
                  pw.println("HelloWorld");
                  pw.flush();
                  TimeUnit.SECONDS.sleep(5);
              }
              pw.close();
              socket.close();
          }
          public static void main(String[] args) throws IOException, InterruptedException {
              socketTest();
          }
      }
      
      1. 开始自信的运行代码,结果发现光顾着输出helloworld相关的内容了(为什么不直接是helloWorld,因为这里没有做编解码操作,这不是本文讨论重点)

      2. 尝试百度,stackoverflow,也没能查到原由,也没能看到示例代码,基本给的解决方案都是指将IdlestatHandler调用链放置在第一位置(我本来就这样放的ORZ),顺便吐槽一下csdn : )

    • 一探究竟

      心有不死,虽不是项目必须实现功能,但是勾起了好奇心,这不得探个究竟怎么睡得着。

      1. Debug 调试:

        跟着断点一步一步进入调用方法链(这里只列出核心代码):

               //当数据链上的handler中的channelRead方法被调用时,reading 标志位-->true
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
                    reading = true;
                    firstReaderIdleEvent = firstAllIdleEvent = true;
                }
                ctx.fireChannelRead(msg);
            }
               //根据构造函数中的三个参数设定时间 readerIdleTimeSeconds writerIdleTimeSeconds allIdleTimeSeconds 
               //IdleStatHandler会启动对应的检测线程,这里以读超时距离,线程通过判断是否已读,以及是否超时组合判断是否需要调用userEventTriggered()函数,这里只提供超时检测线程代码,其他生成相关检测池代码可自行断点调试.
                protected void run(ChannelHandlerContext ctx) {
                    long nextDelay = allIdleTimeNanos;
                    if (!reading) {
                        nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
                    }
                    if (nextDelay <= 0) {
                        allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
                        boolean first = firstAllIdleEvent;
                        firstAllIdleEvent = false;
                        try {
                            //进入触发调用流程
                            channelIdle(ctx, newIdleStateEvent(IdleState.ALL_IDLE, first));
                        } catch (Throwable t) {
                            ctx.fireExceptionCaught(t);
                        }
                    } 
                }
           
               //以下三段代码为触发调用流程中的代码:
            protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
                ctx.fireUserEventTriggered(evt);
            }
        
            @Override
            public ChannelHandlerContext fireUserEventTriggered(final Object event) {
                invokeUserEventTriggered(findContextInbound(), event);
                return this;
            }
               //把这里读懂就明白了为什么上面的示例代码无法正常触发userEventTriggered()函数了
            private AbstractChannelHandlerContext findContextInbound() {
                AbstractChannelHandlerContext ctx = this;
               //此处的do方法无论条件如何都会先进行一次向后传递,变成next值
               //因此示例代码中的HeartBeatHandler()虽然存在userEventTriggered(),但是在这个函数中,找的是下一个Handler的ChannelHandlerContext,那可以猜想一下,如果此时MyServerHandler()复写了userEventTriggered(),会被触发吗?
                do {
                    ctx = ctx.next;
                } while (!ctx.inbound);
                return ctx;
            }
        
        
               //注意在上面的fireUserEventTriggered()函数中,最外层函数是本函数,稍微读一下,也能看出来,这是一个递归函数不断的进行链式递归,直到满足上面的 ctx.inbound=true 即handler处理链中的Inbound已经被调用完毕(如果存在userEventTriggered()的话),文章最后会提供channelPipeline的Handler调用图。
            static void invokeUserEventTriggered(final AbstractChannelHandlerContext next, final Object event) {
                ObjectUtil.checkNotNull(event, "event");
                EventExecutor executor = next.executor();
                if (executor.inEventLoop()) {
                    next.invokeUserEventTriggered(event);
                } else {
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            next.invokeUserEventTriggered(event);
                        }
                    });
                }
            }
        
        
      2. debug结束,思路大概理清了,Idlestathandler通过新开线程来进行耗时检测,通过耗时配合表示位,来决定是否调用userEventTriggered()函数,并且在findContextInbound由于使用的是do while循环,所以是不会出现调用自己本身的情况,采用这样的编写方式我想不仅仅是不调用自身的触发函数,而是在Inbound找寻到最深处时,可以将ctx自动转换为outbound相关的handlerContext.最后使用递归函数不断递归inbound链,进行链式调用,所有该链上的handler的userEventTriggered() 都将被调用(当然,除了第一个,因为 do while的原因 : )

      3. 分析完毕,编写新的调用代码示例。

        //避免代码重复,只提供调用链代码,其他代码不变
        ChannelPipeline channelPipeline = ch.pipeline();
        channelPipeline.addLast(new HeartBeatHandler(5, 0, 0));
        channelPipeline.addLast(new MyServerHandler());
        //偷个懒加个匿名内部类
        channelPipeline.addLast(new ChannelInboundHandlerAdapter(){
           @Override
           public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws    Exception {
              System.out.println("last Trigger触发");
              super.userEventTriggered(ctx, evt);
          }
        ---------------
          //注意这里的MyServerHandler复写了触发函数,用来观察是否被触发
        public class MyServerHandler extends ChannelInboundHandlerAdapter {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println(msg);
                super.channelRead(ctx, msg);
            }
            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                System.out.println("MyServerHandler userEventTriggered 触发");
                super.userEventTriggered(ctx,evt);
            }
        }
          
         //最后,对于后两个调用链的代码,读者可以自行替换位置,尝试运行,观察输出效果,相信一定会对IdlestatHandler触发流程有更深刻的了解。
         
        

    ChannelPipeline handler调用图:

    *  +---------------------------------------------------+---------------+
    *  |                           ChannelPipeline         |               |
    *  |                                                  \|/              |
    *  |    +---------------------+            +-----------+----------+    |
    *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
    *  |    +----------+----------+            +-----------+----------+    |
    *  |              /|\                                  |               |
    *  |               |                                  \|/              |
    *  |    +----------+----------+            +-----------+----------+    |
    *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
    *  |    +----------+----------+            +-----------+----------+    |
    *  |              /|\                                  .               |
    *  |               .                                   .               |
    *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
    *  |        [ method call]                       [method call]         |
    *  |               .                                   .               |
    *  |               .                                  \|/              |
    *  |    +----------+----------+            +-----------+----------+    |
    *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
    *  |    +----------+----------+            +-----------+----------+    |
    *  |              /|\                                  |               |
    *  |               |                                  \|/              |
    *  |    +----------+----------+            +-----------+----------+    |
    *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
    *  |    +----------+----------+            +-----------+----------+    |
    *  |              /|\                                  |               |
    *  +---------------+-----------------------------------+---------------+
    *                  |                                  \|/
    *  +---------------+-----------------------------------+---------------+
    *  |               |                                   |               |
    *  |       [ Socket.read() ]                    [ Socket.write() ]     |
    *  |                                                                   |
    *  |  Netty Internal I/O Threads (Transport Implementation)            |
    *  +-------------------------------------------------------------------+
    
    • 总结:保持好奇心,别怕失败,不要气馁。

    相关文章

      网友评论

          本文标题:Netty心跳检测代码实例及源码分析

          本文链接:https://www.haomeiwen.com/subject/pkdirktx.html