美文网首页
(五)IdleStateHandler(模拟心跳)

(五)IdleStateHandler(模拟心跳)

作者: guessguess | 来源:发表于2021-05-12 17:23 被阅读0次

    由于后面想自己做一个rpc,所以有一个很重要的场景就是心跳。
    下面就通过netty来模拟心跳。
    这里实现的功能比较简单。满足以下场景

    1.如果服务一段时间内,没有收到心跳,则会对客户端发送消息。
    2.客户端如果是存活状态,则按一定的频率对服务端发送消息。
    

    先说说IdleStateHandler这个类
    通过这个类的构造方法,看出存在3个入参,读超时时间,写超时时间,所有的超时时间。
    其实就是,在某个时间段内,没读到数据/没写到数据/既没有读也没有写到,就会有相关操作(通过发生事件的方式)。

    public class IdleStateHandler extends ChannelDuplexHandler {
        public IdleStateHandler(
                int readerIdleTimeSeconds,
                int writerIdleTimeSeconds,
                int allIdleTimeSeconds) {
    
            this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
                 TimeUnit.SECONDS);
        }
    }
    

    那么服务端是如何感知到多久时间内,没有读或者写数据呢?
    看一下实现。在IdleStateHandler 类中有一个初始化方法,这个方法调用的位置,有注册,以及激活。当channel注册完 或者 处于激活状态,则会进行初始化。
    原理也是比较简单,通过定时器轮训的方式去判断在规定时间内是否读/写数据。

        private void initialize(ChannelHandlerContext ctx) {
            // Avoid the case where destroy() is called before scheduling timeouts.
            // See: https://github.com/netty/netty/issues/143
            switch (state) {
            case 1:
            case 2:
                return;
            }
    
            state = 1;
            initOutputChanged(ctx);
    
            lastReadTime = lastWriteTime = ticksInNanos();
            if (readerIdleTimeNanos > 0) {
                readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                        readerIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
            if (writerIdleTimeNanos > 0) {
                writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                        writerIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
            if (allIdleTimeNanos > 0) {
                allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                        allIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
        }
    
    
    

    从上面代码中可以看到三个任务。其实我们看到重点都是 到最后会去发送事件
    所以服务端监控到什么,比如很久时间channel处于闲置状态,想去做点什么,可以通过对事件的处理。
    ReaderIdleTimeoutTask,WriterIdleTimeoutTask, AllIdleTimeoutTask
    源码如下

    private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
    
            ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
                super(ctx);
            }
    
            @Override
            protected void run(ChannelHandlerContext ctx) {
                long nextDelay = readerIdleTimeNanos;
                if (!reading) {
                    nextDelay -= ticksInNanos() - lastReadTime;
                }
    
                if (nextDelay <= 0) {
                    readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    
                    boolean first = firstReaderIdleEvent;
                    firstReaderIdleEvent = false;
    
                    try {
                        发事件。=================================================
                        IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                        channelIdle(ctx, event);
                    } catch (Throwable t) {
                        ctx.fireExceptionCaught(t);
                    }
                } else {
                    readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
                }
            }
        }
    
        private final class WriterIdleTimeoutTask extends AbstractIdleTask {
    
            WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
                super(ctx);
            }
    
            @Override
            protected void run(ChannelHandlerContext ctx) {
    
                long lastWriteTime = IdleStateHandler.this.lastWriteTime;
                long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
                if (nextDelay <= 0) {
                    writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    
                    boolean first = firstWriterIdleEvent;
                    firstWriterIdleEvent = false;
    
                    try {
                        if (hasOutputChanged(ctx, first)) {
                            return;
                        }
                        发事件。=================================================
                        IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
                        channelIdle(ctx, event);
                    } catch (Throwable t) {
                        ctx.fireExceptionCaught(t);
                    }
                } else {
                    // Write occurred before the timeout - set a new timeout with shorter delay.
                    writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
                }
            }
        }
    
        private final class AllIdleTimeoutTask extends AbstractIdleTask {
    
            AllIdleTimeoutTask(ChannelHandlerContext ctx) {
                super(ctx);
            }
    
            @Override
            protected void run(ChannelHandlerContext ctx) {
    
                long nextDelay = allIdleTimeNanos;
                if (!reading) {
                    nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
                }
                if (nextDelay <= 0) {
                    allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
                    boolean first = firstAllIdleEvent;
                    firstAllIdleEvent = false;
    
                    try {
                        if (hasOutputChanged(ctx, first)) {
                            return;
                        }
                        发事件。=================================================
                        IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
                        channelIdle(ctx, event);
                    } catch (Throwable t) {
                        ctx.fireExceptionCaught(t);
                    }
                } else {
                    allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
                }
            }
        }
    

    如何发送事件呢?其实也是通过传播的方式。
    我们现在想自己处理了,所以最简单的方式就是实现一个channelHandler,加入到channel的channelPipeline中去即可,然后覆写fireUserEventTriggered方法。

        protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
            ctx.fireUserEventTriggered(evt);
        }
    

    接下来还是做一个demo吧

    客户端部分

    定义一个channelHandler,用于回复服务端

    public class CustomChannelHandler extends ChannelInboundHandlerAdapter{
        
        private static String RECEIVE_TMP = "收到消息:%s";
        
        private static String SEND = "在的在的";
        
        private static String SEND_SUCCESS = "发送成功";
        
        private static String SEND_FAIL = "发送失败";
        
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String info = (String)msg;
            System.out.println(String.format(RECEIVE_TMP, info));
            
            ByteBuf byteBuf = Unpooled.copiedBuffer(SEND.getBytes());
            ctx.channel().writeAndFlush(byteBuf).addListener(new GenericFutureListener<ChannelPromise>() {
                public void operationComplete(ChannelPromise future) throws Exception {
                    boolean isDone = future.isDone();
                    if(!isDone) {
                        System.out.println(SEND_FAIL);
                    }else {
                        System.out.println(SEND_SUCCESS);
                    }
                }
            });
        }
    }
    

    客户端,里面用一个线程,循环去发送心跳。

    public class Client {
        private static final int port = 9527;
        private static final String host = "127.0.0.1";
        public static void main(String args[]) {
            connect();
        }
    
        public static void connect() {
            NioEventLoopGroup work = new NioEventLoopGroup();
            Bootstrap bs = new Bootstrap();
            bs.group(work);
            bs.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                            ch.pipeline().addLast(new CustomChannelHandler());
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    });
            ChannelFuture cf = null;
            try {
                cf = bs.connect(host, port).sync();
                ChannelFuture tmp = cf;
                模拟心跳,频繁给服务端发送消息。
                new Thread(() -> {
                    while (true) {
                        try {
                            Thread.sleep(1000);
                            tmp.channel().writeAndFlush("heart beat" + System.getProperty("line.separator"));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
                cf.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                work.shutdownGracefully();
            }
        }
    }
    

    服务端部分

    这个handler用于处理当NioSocketChannel处于闲置状态。
    当处于闲置状态会给客户端连接发送消息。如果是注册中心的话,下一步实现应该就是,没有得到回复,若干次之后就会将该客户端移除出注册表。

    public class HandlerHeartbeatHandler extends ChannelDuplexHandler{
        
        private static byte[] HEART_BEAT_MSG = "滴滴滴".getBytes();
        
        private static String SEND_SUCCESS = "发送心跳成功";
        
        private static String SEND_FAIL = "发生心跳失败";
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            boolean matchType = evt instanceof IdleStateEvent;
            if(!matchType) {
                不是想要处理的世界,就继续传播吧。
                ctx.fireUserEventTriggered(evt);
            }
            IdleStateEvent ise = (IdleStateEvent)evt;
            if (ise.state() == IdleState.READER_IDLE) {
                ByteBuf byteBuf = Unpooled.copiedBuffer(HEART_BEAT_MSG);
                ctx.channel().writeAndFlush(byteBuf).addListener(new GenericFutureListener<ChannelPromise>() {
                    public void operationComplete(ChannelPromise future) throws Exception {
                        boolean isDone = future.isDone();
                        if(!isDone) {
                            System.out.println(SEND_FAIL);
                        }else {
                            System.out.println(SEND_SUCCESS);
                        }
                    }
                });
            }
            if(ise.state() == IdleState.WRITER_IDLE) {
                
            }
            if(ise.state() == IdleState.ALL_IDLE) {
                
            }
        }
    }
    

    服务端.
    这里需要注意的点,对于心跳的处理器,要加在childHandler中。

    public class Server {
        private static int port = 9527;
        private static String host = "127.0.0.1";
        
        private static int readTimeOut = 5;
        private static int writeTimeOut = 0;
        private static int idleTimeOut = 0;
        
        public static void main(String args[]) {
            start();
        }
        
        private static void start() {
            EventLoopGroup work = new NioEventLoopGroup();
            EventLoopGroup boss = new NioEventLoopGroup();
            ServerBootstrap sb = new ServerBootstrap();
            sb.group(work, boss).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<NioSocketChannel>() {
    
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            // 一秒钟的心跳
                            ch.pipeline().addLast(new IdleStateHandler(readTimeOut, writeTimeOut, idleTimeOut));
                            ch.pipeline().addLast(new LineBasedFrameDecoder(128));
                            ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                            ch.pipeline().addLast(new StringEncoder());
                            ch.pipeline().addLast(new HandlerHeartbeatHandler());
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    System.out.println("收到客户端的消息" + msg);
                                }
                            });
                        }
                    });
            ChannelFuture cf = null;
            try {
                cf = sb.bind(host, port).sync();
                cf.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                work.shutdownGracefully();
                boss.shutdownGracefully();
            }
        }
        
    }
    

    进行简单的测试
    先将客户端进行简单的调整,不发送消息。启用服务端,一段时间之后,服务端就会频繁发送消息。

    public class Client {
        private static final int port = 9527;
        private static final String host = "127.0.0.1";
        public static void main(String args[]) {
            connect();
        }
    
        public static void connect() {
            NioEventLoopGroup work = new NioEventLoopGroup();
            Bootstrap bs = new Bootstrap();
            bs.group(work);
            bs.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                            ch.pipeline().addLast(new CustomChannelHandler());
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    });
            ChannelFuture cf = null;
            try {
                cf = bs.connect(host, port).sync();
                ChannelFuture tmp = cf;
    //            new Thread(() -> {
    //                while (true) {
    //                    try {
    //                        Thread.sleep(1000);
    //                        tmp.channel().writeAndFlush("heart beat" + System.getProperty("line.separator"));
    //                    } catch (InterruptedException e) {
    //                        e.printStackTrace();
    //                    }
    //                }
    //            }).start();
                cf.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                work.shutdownGracefully();
            }
        }
    }
    
    服务端
    客户端

    现在将客户端恢复。定时发送心跳
    运行结果如下
    服务端


    服务端

    客户端则没有输出任何信息。
    这样子基本的功能就完成了。

    相关文章

      网友评论

          本文标题:(五)IdleStateHandler(模拟心跳)

          本文链接:https://www.haomeiwen.com/subject/teiadltx.html