美文网首页
定时断线重连

定时断线重连

作者: yongguang423 | 来源:发表于2018-09-29 06:58 被阅读26次

    客户端断线重连机制。
    客户端数量多,且需要传递的数据量级较大。可以周期性的发送数据的时候,使用。要求对数据的即时性不高的时候,才可使用。
    优点: 可以使用数据缓存。不是每条数据进行一次数据交互。可以定时回收资源,对资源利用率高。相对来说,即时性可以通过其他方式保证。如: 120秒自动断线。数据变化1000次请求服务器一次。300秒中自动发送不足1000次的变化数据。


    image.png
    /**
     * 1. 双线程组
     * 2. Bootstrap配置启动信息
     * 3. 注册业务处理Handler
     * 4. 绑定服务监听端口并启动服务
     */
    package com.bjsxt.socket.netty.timer;
    
    
    
    import com.bjsxt.socket.utils.SerializableFactory4Marshalling;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    
    public class Server4Timer {
        // 监听线程组,监听客户端请求
        private EventLoopGroup acceptorGroup = null;
        // 处理客户端相关操作线程组,负责处理与客户端的数据通讯
        private EventLoopGroup clientGroup = null;
        // 服务启动相关配置信息
        private ServerBootstrap bootstrap = null;
        public Server4Timer(){
            init();
        }
        private void init(){
            acceptorGroup = new NioEventLoopGroup();
            clientGroup = new NioEventLoopGroup();
            bootstrap = new ServerBootstrap();
            // 绑定线程组
            bootstrap.group(acceptorGroup, clientGroup);
            // 设定通讯模式为NIO
            bootstrap.channel(NioServerSocketChannel.class);
            // 设定缓冲区大小
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            // SO_SNDBUF发送缓冲区,SO_RCVBUF接收缓冲区,SO_KEEPALIVE开启心跳监测(保证连接有效)
            bootstrap.option(ChannelOption.SO_SNDBUF, 16*1024)
                .option(ChannelOption.SO_RCVBUF, 16*1024)
                .option(ChannelOption.SO_KEEPALIVE, true);
            // 增加日志Handler,日志级别为info
            // bootstrap.handler(new LoggingHandler(LogLevel.INFO));
        }
        public ChannelFuture doAccept(int port) throws InterruptedException{
            
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
                    ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
                    // 定义一个定时断线处理器,当多长时间内,没有任何的可读取数据,自动断开连接。
                    // 构造参数,就是间隔时长。 默认的单位是秒。
                    // 自定义间隔时长单位。 new ReadTimeoutHandler(long times, TimeUnit unit);
                    ch.pipeline().addLast(new ReadTimeoutHandler(3));
                    ch.pipeline().addLast(new Server4TimerHandler());
                }
            });
            ChannelFuture future = bootstrap.bind(port).sync();
            return future;
        }
        public void release(){
            this.acceptorGroup.shutdownGracefully();
            this.clientGroup.shutdownGracefully();
        }
        
        public static void main(String[] args){
            ChannelFuture future = null;
            Server4Timer server = null;
            try{
                server = new Server4Timer();
                future = server.doAccept(9999);
                System.out.println("server started.");
                
                future.channel().closeFuture().sync();
            }catch(InterruptedException e){
                e.printStackTrace();
            }finally{
                if(null != future){
                    try {
                        future.channel().closeFuture().sync();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                
                if(null != server){
                    server.release();
                }
            }
        }
        
    }
    
    
    /**
     * @Sharable注解 - 
     *  代表当前Handler是一个可以分享的处理器。也就意味着,服务器注册此Handler后,可以分享给多个客户端同时使用。
     *  如果不使用注解描述类型,则每次客户端请求时,必须为客户端重新创建一个新的Handler对象。
     *  
     */
    package com.bjsxt.socket.netty.timer;
    
    import com.bjsxt.socket.utils.ResponseMessage;
    import io.netty.channel.ChannelHandler.Sharable;
    
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    @Sharable
    public class Server4TimerHandler extends ChannelHandlerAdapter {
        
        // 业务处理逻辑
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("from client : ClassName - " + msg.getClass().getName()
                    + " ; message : " + msg.toString());
            ResponseMessage response = new ResponseMessage(0L, "test response");
            ctx.writeAndFlush(response);
        }
    
        // 异常处理逻辑
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("server exceptionCaught method run...");
            // cause.printStackTrace();
            ctx.close();
        }
    
    }
    
    
    /**
     * 1. 单线程组
     * 2. Bootstrap配置启动信息
     * 3. 注册业务处理Handler
     * 4. connect连接服务,并发起请求
     */
    package com.bjsxt.socket.netty.timer;
    
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    
    
    import com.bjsxt.socket.utils.RequestMessage;
    import com.bjsxt.socket.utils.SerializableFactory4Marshalling;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.timeout.WriteTimeoutHandler;
    
    public class Client4Timer {
        
        // 处理请求和处理服务端响应的线程组
        private EventLoopGroup group = null;
        // 服务启动相关配置信息
        private Bootstrap bootstrap = null;
        private ChannelFuture future = null;
        
        public Client4Timer(){
            init();
        }
        
        private void init(){
            group = new NioEventLoopGroup();
            bootstrap = new Bootstrap();
            // 绑定线程组
            bootstrap.group(group);
            // 设定通讯模式为NIO
            bootstrap.channel(NioSocketChannel.class);
            // bootstrap.handler(new LoggingHandler(LogLevel.INFO));
        }
        
        public void setHandlers() throws InterruptedException{
            this.bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingDecoder());
                    ch.pipeline().addLast(SerializableFactory4Marshalling.buildMarshallingEncoder());
                    // 写操作自定断线。 在指定时间内,没有写操作,自动断线。
                    ch.pipeline().addLast(new WriteTimeoutHandler(3));
                    ch.pipeline().addLast(new Client4TimerHandler());
                }
            });
        }
        
        public ChannelFuture getChannelFuture(String host, int port) throws InterruptedException{
            if(future == null){
                future = this.bootstrap.connect(host, port).sync();
            }
            if(!future.channel().isActive()){
                future = this.bootstrap.connect(host, port).sync();
            }
            return future;
        }
        
        public void release(){
            this.group.shutdownGracefully();
        }
        
        public static void main(String[] args) {
            Client4Timer client = null;
            ChannelFuture future = null;
            try{
                client = new Client4Timer();
                client.setHandlers();
                
                future = client.getChannelFuture("localhost", 9999);
                for(int i = 0; i < 3; i++){
                    RequestMessage msg = new RequestMessage(new Random().nextLong(),
                            "test"+i, new byte[0]);
                    future.channel().writeAndFlush(msg);
                    TimeUnit.SECONDS.sleep(2);
                }
                TimeUnit.SECONDS.sleep(5);
                
                future = client.getChannelFuture("localhost", 9999);
                RequestMessage msg = new RequestMessage(new Random().nextLong(), 
                        "test", new byte[0]);
                future.channel().writeAndFlush(msg);
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                if(null != future){
                    try {
                        future.channel().closeFuture().sync();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if(null != client){
                    client.release();
                }
            }
        }
        
    }
    
    
    package com.bjsxt.socket.netty.timer;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class Client4TimerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("from server : ClassName - " + msg.getClass().getName()
                    + " ; message : " + msg.toString());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("client exceptionCaught method run...");
            cause.printStackTrace();
            ctx.close();
        }
    
        /**
         * 当连接建立成功后,出发的代码逻辑。
         * 在一次连接中只运行唯一一次。
         * 通常用于实现连接确认和资源初始化的。
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client channel active");
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:定时断线重连

          本文链接:https://www.haomeiwen.com/subject/ssxvoftx.html