美文网首页IT@程序员猿媛
Netty的重连机制实现

Netty的重连机制实现

作者: 叫我不矜持 | 来源:发表于2019-05-03 16:53 被阅读0次

前言

Netty心跳重连的代码,需要解决以下几个问题

1)ChannelPipeline中的ChannelHandlers的维护,首次连接和重连都需要对ChannelHandlers进行管理
2)重连对象的管理,也就是bootstrap对象的管理
3)重连机制编写

下面是代码实现...

一.客户端

重连机制主要是在客户端来实现,下面直接上代码

public class HeartClient {
    public static void main(String[] args){
        new HeartClient().init("localhost",8080);
    }

    public void init(String address,int port){
        NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);
        Bootstrap bootstrap = new Bootstrap();

        //触发发送心跳包的类
        ConnectorIdleStateTrigger idleStateTrigger = new ConnectorIdleStateTrigger();

        //netty提供的HashedWheelTimer 主要用来高效处理大量定时任务
        // 且任务对时间精度要求相对不高, 比如链接超时管理等场景, 缺点是内存占用相对较高.
        HashedWheelTimer timer = new HashedWheelTimer();
        bootstrap.group(workerGroup).
                option(ChannelOption.TCP_NODELAY,true).
                channel(NioSocketChannel.class).
                handler(new LoggingHandler());

        //顾名思义 监视连接的类,重连机制主要靠他
        ConnectionWatchdog connectionWatchdog = new ConnectionWatchdog(bootstrap,timer,address,port) {
            @Override
            public ChannelHandler[] handler() {
                return new ChannelHandler[]{
                        this,
                        new IdleStateHandler(0, 5, 0),
                        new MessageEncoder(),
                        new MessageDecoder(1 << 20, 9, 4, 0, 0, false),
                        idleStateTrigger,
                        new HeartClientHanlder()
                };
            }
        };
        ChannelFuture future;
        try {
            synchronized (bootstrap){
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        pipeline.addLast(connectionWatchdog.handler());
                    }
                });
                //进行连接
                future = bootstrap.connect(new InetSocketAddress(address,port));
            }
            future.sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

主要用于重连的时候复用ChannelHandler

public interface ChannelHandlerHolder {
    ChannelHandler[] handler();
}

最最关键的来了,ConnectionWatchdog 可以去观察链路是否断了,如果断了,进行循环的断线重连操作

@ChannelHandler.Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements ChannelHandlerHolder , TimerTask {
    //尝试次数
    private  int attempts;
    //bootstrap对象,重连的时候依旧需要这个对象
    private Bootstrap bootstrap;
    //是否重连
    boolean reconnect = true;
    //执行重连任务的调度器
    private Timer timer;
    //地址 端口号
    private String address;
    private int port;

    public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, String address, int port) {
        this.bootstrap = bootstrap;
        this.timer = timer;
        this.address = address;
        this.port = port;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("当前channel以及激活,尝试次数重置为0");
        attempts=0;
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("当前channel关闭");
        if (reconnect){

            if (attempts<12){
                attempts++;
                int timeouts = 1<<attempts;
                System.out.println("正在尝试重新建立连接:"+"第"+attempts+"次");
                timer.newTimeout(this,timeouts, TimeUnit.SECONDS);
            }
        }
        ctx.fireChannelInactive();
    }


    @Override
    public void run(Timeout timeout) throws Exception {
        ChannelFuture future = null;
        //线程同步 
        synchronized (bootstrap){
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    pipeline.addLast(handler());
                }
            });
            future = bootstrap.connect(new InetSocketAddress(this.address, this.port));
        }
        //增加监听器 判断是否成功
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                //判断是否重新连接成功
                boolean successd = channelFuture.isSuccess();
                if (successd){
                    System.out.println("重新连接成功");
                }else{
                    //如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连
                    channelFuture.channel().pipeline().fireChannelInactive();
                }
            }
        });
    }
}

定时发送心跳包

@ChannelHandler.Sharable
public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter {

    private final Message heartMsg = new Message((byte)Integer.parseInt("AF",16), System.currentTimeMillis(),"Heartbeat");

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent stateEvent = (IdleStateEvent) evt;
            IdleState state = stateEvent.state();
            if (state == IdleState.WRITER_IDLE){
                ctx.writeAndFlush(heartMsg);
            }
        }else{
            super.userEventTriggered(ctx, evt);
        }
    }
}

这里是客户端用于处理和服务端的通信

@ChannelHandler.Sharable
public class HeartClientHanlder extends ChannelInboundHandlerAdapter {
    private Scanner scanner = new Scanner(System.in);;
    private ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    private ExecutorService executorService = Executors.newFixedThreadPool(10);;
    private Thread thread;
    private FutureTask futureTask;
    //表示业务包 非心跳包
    private final int flag = Integer.parseInt("CF", 16);
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client active:"+new Date());
        ctx.writeAndFlush(new Message((byte)flag,System.currentTimeMillis(),"hello,I am common client!"));
        ctx.fireChannelActive();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Message message = (Message) msg;
        System.out.println("from server >>>"+message.getType()+":"+message.getRequestId()+":"+message.getBody());
        //这里异步执行,才能看到效果,不然线程都被scanner阻塞住了
        singleThreadExecutor.execute(()->{
            //这里再套一个线程,是用来处理scanner从控制台读取信息的
            this.futureTask = new FutureTask<>(new MsgThread(ctx));
            this.executorService.execute(futureTask);
            try {
                //这个get方法会阻塞线程,所有才在外面又套了一层,完成异步操作
                this.futureTask.get();
            } catch (Exception e) {
            }
        });

    }

    class MsgThread implements Callable {
        ChannelHandlerContext ctx;

        public MsgThread(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public Object call() throws Exception {
            System.out.println("**********:");
            ctx.writeAndFlush(new Message((byte)flag,System.currentTimeMillis(),scanner.nextLine()));
            return null;
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client inactive:"+new Date());
        //取消任务的执行
        this.futureTask.cancel(true);
    }

}
重连机制实现.png

二.服务端

首先是服务端的代码HeartServer

public class HeartServer {

    public static void main(String[] args){
        HeartServer server = new HeartServer();
        server.init(8080);
    }

    public void init (int port){

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);

        NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);

        ServerBootstrap serverBootstrap = new ServerBootstrap();
        AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();
        serverBootstrap.option(ChannelOption.SO_BACKLOG, 128).
                group(bossGroup,workerGroup).
                channel(NioServerSocketChannel.class).
                childOption(ChannelOption.SO_KEEPALIVE, true).
                childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline pipeline = socketChannel.pipeline();
                pipeline.addLast(new IdleStateHandler(5,0,0));
                pipeline.addLast(new MessageDecoder(Integer.MAX_VALUE,9,4,0,0,false));
                pipeline.addLast(new MessageEncoder());
                pipeline.addLast(idleStateTrigger);
                pipeline.addLast(new HeartServerHandler());
            }
        });

        try {
            ChannelFuture future = serverBootstrap.bind(port).sync();
            System.out.println("server start ...");
            //服务器同步连接断开时,这句代码才会往下执行
            future.channel().closeFuture().sync();
            System.out.println("server stop ...");
        } catch (InterruptedException e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            e.printStackTrace();
        }

    }
}

HeartServerHandler代码

public class HeartServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Server Active");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Message message= (Message) msg;
        if (message.getType()==(byte)Integer.parseInt("CF", 16)){
            System.out.println(ctx.channel().remoteAddress()+">>>"+message.getType()+":"+message.getRequestId()+":"+message.getBody());
            ctx.writeAndFlush(new Message(message.getType(),message.getRequestId(),"server time->"+new Date()));
        }else if (message.getType()==(byte)Integer.parseInt("AF", 16)){
            System.out.println(ctx.channel().remoteAddress()+">>>>>>>>>>>>>>>>"+message.getBody());
        }
        ctx.fireChannelRead(msg);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("已断开客户端"+ctx.channel().remoteAddress());
        ctx.channel().close();
    }
}

这里如果长时间没有触发读操作,就会自动和客户端断开连接,这种实现是基于IdleStateHandler来实现了,具体可以见我的上一篇文章。

@ChannelHandler.Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent stateEvent = (IdleStateEvent) evt;
            IdleState state = stateEvent.state();
            if (state==IdleState.READER_IDLE){
                throw new Exception("客户端空闲时间长,自动断开连接");
            }
        }else {
            super.userEventTriggered(ctx,evt);
        }
    }
}

三.自定义的拆包粘包规则

Message

public class Message {
    //消息类型
    private byte type;
    //请求ID
    private long requestId;
    //长度
    private int messageLength;
    //请求体
    private String body;

    public Message(byte type, long requestId, String body) {
        this.type = type;
        this.requestId = requestId;
        this.messageLength = body.getBytes(Charset.forName("UTF-8")).length;
        this.body = body;
    }

    public Message(byte type, long requestId, byte[] data) {
        this.type = type;
        this.requestId = requestId;
        this.messageLength = data.length;
        this.body = new String(data, Charset.forName("UTF-8"));
    }

    public byte getType() {
        return type;
    }

    public void setType(byte type) {
        this.type = type;
    }

    public long getRequestId() {
        return requestId;
    }

    public void setRequestId(long requestId) {
        this.requestId = requestId;
    }

    public int getMessageLength() {
        return messageLength;
    }

    public void setMessageLength(int messageLength) {
        this.messageLength = messageLength;
    }

    public String getBody() {
        return body;
    }

    public void setBody(String body) {
        this.body = body;
    }
}

MessageDecoder

public class MessageDecoder extends LengthFieldBasedFrameDecoder {

    private final byte HEADER_SIZE=13;

    public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in == null || in.readableBytes()<HEADER_SIZE)return null;
        Object res = super.decode(ctx, in);
        ByteBuf processed = null;
        if (res!=null && res instanceof ByteBuf){
             processed = (ByteBuf) res;
        }else {
            throw new RuntimeException("协议异常");
        }
        byte type = processed.readByte();
        long requestId = processed.readLong();
        int length = processed.readInt();

        if (processed.readableBytes()<length){
            in.resetReaderIndex();
            return null;
        }
        byte[] data = new byte[length];
        processed.readBytes(data);
        return new Message(type,requestId,data);
    }
}

MessageEncoder

public class MessageEncoder extends MessageToByteEncoder<Message> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
        byteBuf.writeByte(message.getType());
        byteBuf.writeLong(message.getRequestId());
        byte[] data = message.getBody().getBytes();
        byteBuf.writeInt(data.length);
        byteBuf.writeBytes(data);
    }
}

相关文章

网友评论

    本文标题:Netty的重连机制实现

    本文链接:https://www.haomeiwen.com/subject/awqznqtx.html