美文网首页IT@程序员猿媛
Netty的重连机制实现

Netty的重连机制实现

作者: 叫我不矜持 | 来源:发表于2019-05-03 16:53 被阅读0次

    前言

    Netty心跳重连的代码,需要解决以下几个问题

    1)ChannelPipeline中的ChannelHandlers的维护,首次连接和重连都需要对ChannelHandlers进行管理
    2)重连对象的管理,也就是bootstrap对象的管理
    3)重连机制编写

    下面是代码实现...

    一.客户端

    重连机制主要是在客户端来实现,下面直接上代码

    public class HeartClient {
        public static void main(String[] args){
            new HeartClient().init("localhost",8080);
        }
    
        public void init(String address,int port){
            NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);
            Bootstrap bootstrap = new Bootstrap();
    
            //触发发送心跳包的类
            ConnectorIdleStateTrigger idleStateTrigger = new ConnectorIdleStateTrigger();
    
            //netty提供的HashedWheelTimer 主要用来高效处理大量定时任务
            // 且任务对时间精度要求相对不高, 比如链接超时管理等场景, 缺点是内存占用相对较高.
            HashedWheelTimer timer = new HashedWheelTimer();
            bootstrap.group(workerGroup).
                    option(ChannelOption.TCP_NODELAY,true).
                    channel(NioSocketChannel.class).
                    handler(new LoggingHandler());
    
            //顾名思义 监视连接的类,重连机制主要靠他
            ConnectionWatchdog connectionWatchdog = new ConnectionWatchdog(bootstrap,timer,address,port) {
                @Override
                public ChannelHandler[] handler() {
                    return new ChannelHandler[]{
                            this,
                            new IdleStateHandler(0, 5, 0),
                            new MessageEncoder(),
                            new MessageDecoder(1 << 20, 9, 4, 0, 0, false),
                            idleStateTrigger,
                            new HeartClientHanlder()
                    };
                }
            };
            ChannelFuture future;
            try {
                synchronized (bootstrap){
                    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(connectionWatchdog.handler());
                        }
                    });
                    //进行连接
                    future = bootstrap.connect(new InetSocketAddress(address,port));
                }
                future.sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    主要用于重连的时候复用ChannelHandler

    public interface ChannelHandlerHolder {
        ChannelHandler[] handler();
    }
    

    最最关键的来了,ConnectionWatchdog 可以去观察链路是否断了,如果断了,进行循环的断线重连操作

    @ChannelHandler.Sharable
    public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements ChannelHandlerHolder , TimerTask {
        //尝试次数
        private  int attempts;
        //bootstrap对象,重连的时候依旧需要这个对象
        private Bootstrap bootstrap;
        //是否重连
        boolean reconnect = true;
        //执行重连任务的调度器
        private Timer timer;
        //地址 端口号
        private String address;
        private int port;
    
        public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, String address, int port) {
            this.bootstrap = bootstrap;
            this.timer = timer;
            this.address = address;
            this.port = port;
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("当前channel以及激活,尝试次数重置为0");
            attempts=0;
            ctx.fireChannelActive();
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("当前channel关闭");
            if (reconnect){
    
                if (attempts<12){
                    attempts++;
                    int timeouts = 1<<attempts;
                    System.out.println("正在尝试重新建立连接:"+"第"+attempts+"次");
                    timer.newTimeout(this,timeouts, TimeUnit.SECONDS);
                }
            }
            ctx.fireChannelInactive();
        }
    
    
        @Override
        public void run(Timeout timeout) throws Exception {
            ChannelFuture future = null;
            //线程同步 
            synchronized (bootstrap){
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(handler());
                    }
                });
                future = bootstrap.connect(new InetSocketAddress(this.address, this.port));
            }
            //增加监听器 判断是否成功
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    //判断是否重新连接成功
                    boolean successd = channelFuture.isSuccess();
                    if (successd){
                        System.out.println("重新连接成功");
                    }else{
                        //如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连
                        channelFuture.channel().pipeline().fireChannelInactive();
                    }
                }
            });
        }
    }
    
    

    定时发送心跳包

    @ChannelHandler.Sharable
    public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter {
    
        private final Message heartMsg = new Message((byte)Integer.parseInt("AF",16), System.currentTimeMillis(),"Heartbeat");
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent){
                IdleStateEvent stateEvent = (IdleStateEvent) evt;
                IdleState state = stateEvent.state();
                if (state == IdleState.WRITER_IDLE){
                    ctx.writeAndFlush(heartMsg);
                }
            }else{
                super.userEventTriggered(ctx, evt);
            }
        }
    }
    
    

    这里是客户端用于处理和服务端的通信

    @ChannelHandler.Sharable
    public class HeartClientHanlder extends ChannelInboundHandlerAdapter {
        private Scanner scanner = new Scanner(System.in);;
        private ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        private ExecutorService executorService = Executors.newFixedThreadPool(10);;
        private Thread thread;
        private FutureTask futureTask;
        //表示业务包 非心跳包
        private final int flag = Integer.parseInt("CF", 16);
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client active:"+new Date());
            ctx.writeAndFlush(new Message((byte)flag,System.currentTimeMillis(),"hello,I am common client!"));
            ctx.fireChannelActive();
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Message message = (Message) msg;
            System.out.println("from server >>>"+message.getType()+":"+message.getRequestId()+":"+message.getBody());
            //这里异步执行,才能看到效果,不然线程都被scanner阻塞住了
            singleThreadExecutor.execute(()->{
                //这里再套一个线程,是用来处理scanner从控制台读取信息的
                this.futureTask = new FutureTask<>(new MsgThread(ctx));
                this.executorService.execute(futureTask);
                try {
                    //这个get方法会阻塞线程,所有才在外面又套了一层,完成异步操作
                    this.futureTask.get();
                } catch (Exception e) {
                }
            });
    
        }
    
        class MsgThread implements Callable {
            ChannelHandlerContext ctx;
    
            public MsgThread(ChannelHandlerContext ctx) {
                this.ctx = ctx;
            }
    
            @Override
            public Object call() throws Exception {
                System.out.println("**********:");
                ctx.writeAndFlush(new Message((byte)flag,System.currentTimeMillis(),scanner.nextLine()));
                return null;
            }
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client inactive:"+new Date());
            //取消任务的执行
            this.futureTask.cancel(true);
        }
    
    }
    
    重连机制实现.png

    二.服务端

    首先是服务端的代码HeartServer

    public class HeartServer {
    
        public static void main(String[] args){
            HeartServer server = new HeartServer();
            server.init(8080);
        }
    
        public void init (int port){
    
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
    
            NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);
    
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 128).
                    group(bossGroup,workerGroup).
                    channel(NioServerSocketChannel.class).
                    childOption(ChannelOption.SO_KEEPALIVE, true).
                    childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    pipeline.addLast(new IdleStateHandler(5,0,0));
                    pipeline.addLast(new MessageDecoder(Integer.MAX_VALUE,9,4,0,0,false));
                    pipeline.addLast(new MessageEncoder());
                    pipeline.addLast(idleStateTrigger);
                    pipeline.addLast(new HeartServerHandler());
                }
            });
    
            try {
                ChannelFuture future = serverBootstrap.bind(port).sync();
                System.out.println("server start ...");
                //服务器同步连接断开时,这句代码才会往下执行
                future.channel().closeFuture().sync();
                System.out.println("server stop ...");
            } catch (InterruptedException e) {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
                e.printStackTrace();
            }
    
        }
    }
    
    

    HeartServerHandler代码

    public class HeartServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("Server Active");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Message message= (Message) msg;
            if (message.getType()==(byte)Integer.parseInt("CF", 16)){
                System.out.println(ctx.channel().remoteAddress()+">>>"+message.getType()+":"+message.getRequestId()+":"+message.getBody());
                ctx.writeAndFlush(new Message(message.getType(),message.getRequestId(),"server time->"+new Date()));
            }else if (message.getType()==(byte)Integer.parseInt("AF", 16)){
                System.out.println(ctx.channel().remoteAddress()+">>>>>>>>>>>>>>>>"+message.getBody());
            }
            ctx.fireChannelRead(msg);
        }
    
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("已断开客户端"+ctx.channel().remoteAddress());
            ctx.channel().close();
        }
    }
    
    

    这里如果长时间没有触发读操作,就会自动和客户端断开连接,这种实现是基于IdleStateHandler来实现了,具体可以见我的上一篇文章。

    @ChannelHandler.Sharable
    public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent){
                IdleStateEvent stateEvent = (IdleStateEvent) evt;
                IdleState state = stateEvent.state();
                if (state==IdleState.READER_IDLE){
                    throw new Exception("客户端空闲时间长,自动断开连接");
                }
            }else {
                super.userEventTriggered(ctx,evt);
            }
        }
    }
    

    三.自定义的拆包粘包规则

    Message

    public class Message {
        //消息类型
        private byte type;
        //请求ID
        private long requestId;
        //长度
        private int messageLength;
        //请求体
        private String body;
    
        public Message(byte type, long requestId, String body) {
            this.type = type;
            this.requestId = requestId;
            this.messageLength = body.getBytes(Charset.forName("UTF-8")).length;
            this.body = body;
        }
    
        public Message(byte type, long requestId, byte[] data) {
            this.type = type;
            this.requestId = requestId;
            this.messageLength = data.length;
            this.body = new String(data, Charset.forName("UTF-8"));
        }
    
        public byte getType() {
            return type;
        }
    
        public void setType(byte type) {
            this.type = type;
        }
    
        public long getRequestId() {
            return requestId;
        }
    
        public void setRequestId(long requestId) {
            this.requestId = requestId;
        }
    
        public int getMessageLength() {
            return messageLength;
        }
    
        public void setMessageLength(int messageLength) {
            this.messageLength = messageLength;
        }
    
        public String getBody() {
            return body;
        }
    
        public void setBody(String body) {
            this.body = body;
        }
    }
    
    

    MessageDecoder

    public class MessageDecoder extends LengthFieldBasedFrameDecoder {
    
        private final byte HEADER_SIZE=13;
    
        public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
            super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
        }
    
        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            if (in == null || in.readableBytes()<HEADER_SIZE)return null;
            Object res = super.decode(ctx, in);
            ByteBuf processed = null;
            if (res!=null && res instanceof ByteBuf){
                 processed = (ByteBuf) res;
            }else {
                throw new RuntimeException("协议异常");
            }
            byte type = processed.readByte();
            long requestId = processed.readLong();
            int length = processed.readInt();
    
            if (processed.readableBytes()<length){
                in.resetReaderIndex();
                return null;
            }
            byte[] data = new byte[length];
            processed.readBytes(data);
            return new Message(type,requestId,data);
        }
    }
    
    

    MessageEncoder

    public class MessageEncoder extends MessageToByteEncoder<Message> {
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
            byteBuf.writeByte(message.getType());
            byteBuf.writeLong(message.getRequestId());
            byte[] data = message.getBody().getBytes();
            byteBuf.writeInt(data.length);
            byteBuf.writeBytes(data);
        }
    }
    
    

    相关文章

      网友评论

        本文标题:Netty的重连机制实现

        本文链接:https://www.haomeiwen.com/subject/awqznqtx.html