美文网首页
(五)IdleStateHandler(模拟心跳)

(五)IdleStateHandler(模拟心跳)

作者: guessguess | 来源:发表于2021-05-12 17:23 被阅读0次

由于后面想自己做一个rpc,所以有一个很重要的场景就是心跳。
下面就通过netty来模拟心跳。
这里实现的功能比较简单。满足以下场景

1.如果服务一段时间内,没有收到心跳,则会对客户端发送消息。
2.客户端如果是存活状态,则按一定的频率对服务端发送消息。

先说说IdleStateHandler这个类
通过这个类的构造方法,看出存在3个入参,读超时时间,写超时时间,所有的超时时间。
其实就是,在某个时间段内,没读到数据/没写到数据/既没有读也没有写到,就会有相关操作(通过发生事件的方式)。

public class IdleStateHandler extends ChannelDuplexHandler {
    public IdleStateHandler(
            int readerIdleTimeSeconds,
            int writerIdleTimeSeconds,
            int allIdleTimeSeconds) {

        this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
             TimeUnit.SECONDS);
    }
}

那么服务端是如何感知到多久时间内,没有读或者写数据呢?
看一下实现。在IdleStateHandler 类中有一个初始化方法,这个方法调用的位置,有注册,以及激活。当channel注册完 或者 处于激活状态,则会进行初始化。
原理也是比较简单,通过定时器轮训的方式去判断在规定时间内是否读/写数据。

    private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        switch (state) {
        case 1:
        case 2:
            return;
        }

        state = 1;
        initOutputChanged(ctx);

        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }


从上面代码中可以看到三个任务。其实我们看到重点都是 到最后会去发送事件
所以服务端监控到什么,比如很久时间channel处于闲置状态,想去做点什么,可以通过对事件的处理。
ReaderIdleTimeoutTask,WriterIdleTimeoutTask, AllIdleTimeoutTask
源码如下

private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

        ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {
            long nextDelay = readerIdleTimeNanos;
            if (!reading) {
                nextDelay -= ticksInNanos() - lastReadTime;
            }

            if (nextDelay <= 0) {
                readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstReaderIdleEvent;
                firstReaderIdleEvent = false;

                try {
                    发事件。=================================================
                    IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

    private final class WriterIdleTimeoutTask extends AbstractIdleTask {

        WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {

            long lastWriteTime = IdleStateHandler.this.lastWriteTime;
            long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
            if (nextDelay <= 0) {
                writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstWriterIdleEvent;
                firstWriterIdleEvent = false;

                try {
                    if (hasOutputChanged(ctx, first)) {
                        return;
                    }
                    发事件。=================================================
                    IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Write occurred before the timeout - set a new timeout with shorter delay.
                writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

    private final class AllIdleTimeoutTask extends AbstractIdleTask {

        AllIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {

            long nextDelay = allIdleTimeNanos;
            if (!reading) {
                nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
            }
            if (nextDelay <= 0) {
                allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
                boolean first = firstAllIdleEvent;
                firstAllIdleEvent = false;

                try {
                    if (hasOutputChanged(ctx, first)) {
                        return;
                    }
                    发事件。=================================================
                    IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

如何发送事件呢?其实也是通过传播的方式。
我们现在想自己处理了,所以最简单的方式就是实现一个channelHandler,加入到channel的channelPipeline中去即可,然后覆写fireUserEventTriggered方法。

    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

接下来还是做一个demo吧

客户端部分

定义一个channelHandler,用于回复服务端

public class CustomChannelHandler extends ChannelInboundHandlerAdapter{
    
    private static String RECEIVE_TMP = "收到消息:%s";
    
    private static String SEND = "在的在的";
    
    private static String SEND_SUCCESS = "发送成功";
    
    private static String SEND_FAIL = "发送失败";
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String info = (String)msg;
        System.out.println(String.format(RECEIVE_TMP, info));
        
        ByteBuf byteBuf = Unpooled.copiedBuffer(SEND.getBytes());
        ctx.channel().writeAndFlush(byteBuf).addListener(new GenericFutureListener<ChannelPromise>() {
            public void operationComplete(ChannelPromise future) throws Exception {
                boolean isDone = future.isDone();
                if(!isDone) {
                    System.out.println(SEND_FAIL);
                }else {
                    System.out.println(SEND_SUCCESS);
                }
            }
        });
    }
}

客户端,里面用一个线程,循环去发送心跳。

public class Client {
    private static final int port = 9527;
    private static final String host = "127.0.0.1";
    public static void main(String args[]) {
        connect();
    }

    public static void connect() {
        NioEventLoopGroup work = new NioEventLoopGroup();
        Bootstrap bs = new Bootstrap();
        bs.group(work);
        bs.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                        ch.pipeline().addLast(new CustomChannelHandler());
                        ch.pipeline().addLast(new StringEncoder());
                    }
                });
        ChannelFuture cf = null;
        try {
            cf = bs.connect(host, port).sync();
            ChannelFuture tmp = cf;
            模拟心跳,频繁给服务端发送消息。
            new Thread(() -> {
                while (true) {
                    try {
                        Thread.sleep(1000);
                        tmp.channel().writeAndFlush("heart beat" + System.getProperty("line.separator"));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            work.shutdownGracefully();
        }
    }
}

服务端部分

这个handler用于处理当NioSocketChannel处于闲置状态。
当处于闲置状态会给客户端连接发送消息。如果是注册中心的话,下一步实现应该就是,没有得到回复,若干次之后就会将该客户端移除出注册表。

public class HandlerHeartbeatHandler extends ChannelDuplexHandler{
    
    private static byte[] HEART_BEAT_MSG = "滴滴滴".getBytes();
    
    private static String SEND_SUCCESS = "发送心跳成功";
    
    private static String SEND_FAIL = "发生心跳失败";
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        boolean matchType = evt instanceof IdleStateEvent;
        if(!matchType) {
            不是想要处理的世界,就继续传播吧。
            ctx.fireUserEventTriggered(evt);
        }
        IdleStateEvent ise = (IdleStateEvent)evt;
        if (ise.state() == IdleState.READER_IDLE) {
            ByteBuf byteBuf = Unpooled.copiedBuffer(HEART_BEAT_MSG);
            ctx.channel().writeAndFlush(byteBuf).addListener(new GenericFutureListener<ChannelPromise>() {
                public void operationComplete(ChannelPromise future) throws Exception {
                    boolean isDone = future.isDone();
                    if(!isDone) {
                        System.out.println(SEND_FAIL);
                    }else {
                        System.out.println(SEND_SUCCESS);
                    }
                }
            });
        }
        if(ise.state() == IdleState.WRITER_IDLE) {
            
        }
        if(ise.state() == IdleState.ALL_IDLE) {
            
        }
    }
}

服务端.
这里需要注意的点,对于心跳的处理器,要加在childHandler中。

public class Server {
    private static int port = 9527;
    private static String host = "127.0.0.1";
    
    private static int readTimeOut = 5;
    private static int writeTimeOut = 0;
    private static int idleTimeOut = 0;
    
    public static void main(String args[]) {
        start();
    }
    
    private static void start() {
        EventLoopGroup work = new NioEventLoopGroup();
        EventLoopGroup boss = new NioEventLoopGroup();
        ServerBootstrap sb = new ServerBootstrap();
        sb.group(work, boss).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128)
                .childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<NioSocketChannel>() {

                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        // 一秒钟的心跳
                        ch.pipeline().addLast(new IdleStateHandler(readTimeOut, writeTimeOut, idleTimeOut));
                        ch.pipeline().addLast(new LineBasedFrameDecoder(128));
                        ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                        ch.pipeline().addLast(new StringEncoder());
                        ch.pipeline().addLast(new HandlerHeartbeatHandler());
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println("收到客户端的消息" + msg);
                            }
                        });
                    }
                });
        ChannelFuture cf = null;
        try {
            cf = sb.bind(host, port).sync();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            work.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
    
}

进行简单的测试
先将客户端进行简单的调整,不发送消息。启用服务端,一段时间之后,服务端就会频繁发送消息。

public class Client {
    private static final int port = 9527;
    private static final String host = "127.0.0.1";
    public static void main(String args[]) {
        connect();
    }

    public static void connect() {
        NioEventLoopGroup work = new NioEventLoopGroup();
        Bootstrap bs = new Bootstrap();
        bs.group(work);
        bs.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                        ch.pipeline().addLast(new CustomChannelHandler());
                        ch.pipeline().addLast(new StringEncoder());
                    }
                });
        ChannelFuture cf = null;
        try {
            cf = bs.connect(host, port).sync();
            ChannelFuture tmp = cf;
//            new Thread(() -> {
//                while (true) {
//                    try {
//                        Thread.sleep(1000);
//                        tmp.channel().writeAndFlush("heart beat" + System.getProperty("line.separator"));
//                    } catch (InterruptedException e) {
//                        e.printStackTrace();
//                    }
//                }
//            }).start();
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            work.shutdownGracefully();
        }
    }
}
服务端
客户端

现在将客户端恢复。定时发送心跳
运行结果如下
服务端


服务端

客户端则没有输出任何信息。
这样子基本的功能就完成了。

相关文章

网友评论

      本文标题:(五)IdleStateHandler(模拟心跳)

      本文链接:https://www.haomeiwen.com/subject/teiadltx.html