美文网首页
基于Netty的Android端长连接设计

基于Netty的Android端长连接设计

作者: 范正辰 | 来源:发表于2017-08-28 20:39 被阅读0次

    协议定制与数据序列化

    1、长连接这里我们肯定是基于TCP的,而TCP协议其实默认已经支持长连接,但是socket连接存在随时断开的情况,这就需要有比较好的协议保障连接状态的检测。
    2、定制数据序列化格式,建议使用protobuf或者thrift而不是htttp中常用的json,可以减少序列化与反序列化的开销。当然如果用一些其他的协议,你可能需要自己实现encoder decoder了,TCP是流,上层协议对TCP的流是要做分包粘包处理的,注意好对handler中channelRead和channelReadComplete的方法的复写。

    基于Netty 设计的客户端架构

    1、我们会需要设计一个客户端,就像netty的官方demo中做的那样,定义好bootstrap和nioEventLoopGroup。注意NioEventLoopGroup是可以复用的,线程池复用对客户端比较重要,在断线重连的时候会排上用场。
    我以采用webSocket协议为例

            mClientHandler = new ClientHandler(sURI); //客户端收到分包处理完的数据,然后开始分发
            mMessageHandler = new MessageHandler(mHashMap, mBussinessCodeHelper); // 真正处理业务代码的handler
            bootstrap.group(mWorkGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .remoteAddress(sURI.getHost(), sURI.getPort());
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new TbbLoggerHandler());
                    pipeline.addLast(new IdleStateHandler(200, 180, 0, TimeUnit.SECONDS)); //读超时与写超时检测的handler, 读超时200s比写超时时间长一些,发生读超时的时候直接断开重连了。
                    pipeline.addLast(new HttpClientCodec());
                    pipeline.addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH));
                    pipeline.addLast(mTbbClientHandler);
                    pipeline.addLast(mTbbMessageHandler);
                }
            });
    
            try {
                mChannel = bootstrap.connect().sync().channel();
                mChannel.closeFuture().sync(); // 会阻塞
                XGLog.logger_d(mChannel);
            } catch (Exception e) {
                XGLog.logger_d("exception " + e);
                e.printStackTrace();
            } finally {
                XGLog.logger_d("workerGroup shall shutdown " + TextUtils.isEmpty(mToken));
                if (!TextUtils.isEmpty(mToken)) {
                    mWorkGroup.schedule(new Runnable() {
                        @Override
                        public void run() {
                            connect();  // 断线重连,这里简单处理,就是断了以后每隔2s 尝试连接一次,其实为了省电需要限制次数并倍增间隔时间的
                        }
                    }, 2, TimeUnit.SECONDS);
                }
            }
    

    2、设计好你的handler, netty框架的运用精髓基本都在handler当中,包括处理流解包然后处理业务最后发送数据,几乎全可以包含在handler当中,客户端主动发送数据依赖于channel,简单点讲就是channel 的 writeAndFlush,向缓冲区写数据并刷新缓冲区,刷新的操作其实就是发送数据了,socket的操作本质上都抽象成IO动作。一个简单的handler的例子,不一定能正常运行,只是作为例子,最为关键的几个方法

    (1) channelRead0(ChannelHandlerContext ctx, Object msg)
    处理解包后的数据,也可以分发数据包给下个handler
    (2) channelActivie(ChannelHandlerContext ctx)
    通道建立了,这个时候相当于tcp握手了
    (3) channelInActive(ChannelHandlerContext ctx)
    tcp断开连接
    (4) excepitonCaught(ChannelHandlerContext ctx, Throwable cause)
    异常处理,最好要处理,不处理也别忘了吧throwable发给下handler,这个一定得做
    (5) userEventTriggered(final ChannelHandlerContext ctx, Object evt)
    处理一些自定义的事件,包括读超时写超时这样的事件,充分体现了netty事件驱动的特点

    @Sharable
    public class ClientHandler extends SimpleChannelInboundHandler<Object> {
        private static final int BLOCKING_QUEUE_SIZE = 1 << 12;
        private static final Queue<MCProtocolPB.MCProtocol> mQueue = new LinkedList<>();
        private static final long IDLE_TIME = (long) (5 * 1e9);
     
        /**
         * 用于 WebSocket 的握手
         */
        private WebSocketClientHandshaker mHandshaker;
        /**
         *
         */
        private ChannelPromise mChannelPromise;
        private final PingWebSocketFrame mPingWebSocketFrame = new PingWebSocketFrame();
        private final CloseWebSocketFrame mCloseWebSocketFrame = new CloseWebSocketFrame();
        private ChannelHandlerContext mChannelHandlerContext;
      
    
        /**
         * 唯一的构造类
         *
         * @param uri WebSocket uri
         */
        public ClientHandler(URI uri) {
            mHandshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
        }
    
    
        @Override
        protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception {
            if (!mHandshaker.isHandshakeComplete()) {
                try {
                    mHandshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
                    mChannelPromise.setSuccess();
                    while (!mQueue.isEmpty()) {
                        ctx.writeAndFlush(mQueue.poll());
                    }
                    ctx.fireUserEventTriggered(Event.CONNECTED); //发送websocket协议连接正式建立的事件
                } catch (WebSocketHandshakeException e) {
                    mChannelPromise.setFailure(e);
                }
            }
         
            if (msg instanceof WebSocketFrame) {
                ctx.fireChannelRead(((WebSocketFrame) msg).retain());
            }
    
    
        }
    
        /**
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            mChannelHandlerContext = ctx;
            mHandshaker.handshake(ctx.channel());
            ctx.writeAndFlush(mPingWebSocketFrame.retain());
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
            ctx.fireUserEventTriggered(Event.DISCONNECTED);
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            super.channelUnregistered(ctx);
            XGLog.logger_e("channel unregistered");
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            XGLog.logger_e(cause.toString());
            super.exceptionCaught(ctx, cause);
            if (!mChannelPromise.isDone()) {
                mChannelPromise.setFailure(cause);
            }
    
    
            cause.printStackTrace();
            ctx.close();
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            super.handlerRemoved(ctx);
            XGLog.logger_d("handler removed");
        }
    
        /**
         * 
         * 
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            super.handlerAdded(ctx);
            XGLog.logger_i("handler added");
            mChannelPromise = ctx.newPromise();
        }
    
        /**
         * 端口闲时 发送心跳包 处理的方法
         * 
         */
        @Override
        public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
    
            if (evt instanceof IdleStateEvent) {
                final IdleStateEvent event = (IdleStateEvent) evt;
                ctx.executor().execute(new Runnable() {
                    @Override
                    public void run() {
                        handleIdleEvent(ctx, event);
                    }
                });
                super.userEventTriggered(ctx, evt);
            } else if (Event.REQUEST_TIME_OUT.equals(evt)) {
                XGLog.logger_i("REQUEST triggered already");
            }
        }
      
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            super.channelWritabilityChanged(ctx);
        }
    
        /**
         * 处理{@link IdleStateEvent}
         *
         * @param ctx
         * @param event
         */
        private void handleIdleEvent(final ChannelHandlerContext ctx, IdleStateEvent event) {
            IdleState state = event.state();
            if (IdleState.READER_IDLE.equals(state)) {
                XGLog.logger_e("READ IDLE");
            } else if (IdleState.WRITER_IDLE.equals(state)) {
                XGLog.logger_e("WRITE IDLE");
             ctx.writeAndFlush(mPingWebSocketFrame.retain()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else if (IdleState.ALL_IDLE.equals(state)) {
                XGLog.logger_e("ALL IDLE");
            }
        }
    
    
        long ticksInNanos() {
            return System.nanoTime();
        }
    }
    

    3、考虑好你的断线重连的情况,建议每次客户端发送数据后,服务端都给回包,如果链路长时间空闲,那么触发写超时事件,发送心跳包给服务端,其实也可以反过来服务端给客户端发数据,然后如果还发生读超时事件,相当于对方没有给回包,那么断开连接,尝试重连。

    public class MyLoggerHandler extends LoggingHandler {
        private static final long IDLE_TIME = (long) (9.9 * 1e9);
        private long mLastWriteTime = -1;
        private ScheduledFuture mScheduledFuture;
    
        public MyLoggerHandler() {
            super(LogLevel.INFO);
        }
       
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            super.channelRead(ctx, msg);
            XGLog.logger_i("read message " + msg);
            long current = ticksInNanos();
            long delta = Math.abs(current - mLastWriteTime);
            if (delta < IDLE_TIME) {
                if (mScheduledFuture != null) {
                    mScheduledFuture.cancel(false);
                }
            }
        }
    
        @Override
        public void write(final ChannelHandlerContext ctx, final Object msg, ChannelPromise promise) throws Exception {
            super.write(ctx, msg, promise);
            XGLog.logger_i("TbbLoggerHandler write message ");
            mScheduledFuture = ctx.executor().schedule(new Runnable() {
                @Override
                public void run() {
                    long current = ticksInNanos();
                    long delta = Math.abs(current - mLastWriteTime);
                    XGLog.logger_i("current " + current + " last " + mLastWriteTime + " delta " + delta);
                    if (delta > IDLE_TIME) {
                        ctx.close();
                    }
                }
            }, 10, TimeUnit.SECONDS);  // 10s 内没有收到服务端回执,断线重连
            mLastWriteTime = ticksInNanos();
    
        }
    
        long ticksInNanos() {
            return System.nanoTime();
        }
    }
    

    4、如果客户端主动发起请求,那么通过我们的Client的channel引用,可以向服务端发送数据。
    5、由于netty可以主动发起事件,在netty里处理完了数据如果要更新UI或者数据库,那么你需要设计一个简单的适配层,通过事件机制来触发事情就会变得简单。

    针对网络波动情况的处理

    1、如果发生可以主动检测到的链路断开的情况,一定会触发channelRemoved,然后channel会变成inActive,然后那个connect().sync()也就不再阻塞了,然后往下走,我们的代码中其实已经可以主动间隔2s去重连了。NioEventLoopGroup.exectue()类似于jdk的线程池,可以定时触发一个事件。

     try {
                mChannel = bootstrap.connect().sync().channel();
                mChannel.closeFuture().sync(); // 会阻塞
                XGLog.logger_d(mChannel);
            } catch (Exception e) {
                XGLog.logger_d("exception " + e);
                e.printStackTrace();
            } finally {
                XGLog.logger_d("workerGroup shall shutdown " + TextUtils.isEmpty(mToken));
                if (!TextUtils.isEmpty(mToken)) {
                    mWorkGroup.schedule(new Runnable() {
                        @Override
                        public void run() {
                            connect();  // 断线重连,这里简单处理,就是断了以后每隔2s 尝试连接一次,其实为了省电需要限制次数并倍增间隔时间的
                        }
                    }, 2, TimeUnit.SECONDS);
                }
            }
    

    2、如果发生延时很长的情况,如果发送请求10s内没有读事件发生,那么你需要考虑重新建立连接了,简单的做法就是ChannelHandlerContext.close(),利用 1 中的NioEventLoopGroup线程池 mWorkGroup定时尝试连接,如果连接成功,该线程就阻塞,只有断开的时候才会跑到需要重连的地方。
    3、如果打过电话或者检测到网络切换,那么你也需要断开然后重连,因为你的在移动网IP地址基本就变了,所以重连吧,谁让我们基于TCP/IP呢。这种情况需要借助Android的一些组件比如BroadCastReceiver来检测,与netty关系不大。

    以上3点针对3/4G移动、电信、联通网基本就够用了,至于2G,反正这3家公司都不用2G了,在2017年开发长连接真的是比几年前幸福太多了

    相关文章

      网友评论

          本文标题:基于Netty的Android端长连接设计

          本文链接:https://www.haomeiwen.com/subject/ozfjkxtx.html