美文网首页安卓开发博客
Netty android移动端通讯简单封装

Netty android移动端通讯简单封装

作者: guozhk | 来源:发表于2018-06-22 17:03 被阅读10次

    由于公司最近做即时通讯,该文章是基于netty-all-4.1.25.Final.jar 版本5 在android中有问题 运行时候加载不到相关类 很是纳闷哪位同学 有解决方案 可以交流下。minaDemo可看另一片文章。
    代码地址:https://github.com/mygzk/NettyAndroidDemo.git
    先看效果图:

    device-2018-06-22-170316.png

    客户端 基于android移动的:

    /**
     * netty client
     */
    public class NettyClient {
        private String TAG = NettyClient.class.getSimpleName();
        /**
         * 重连间隔时间
         */
        private long reconnectIntervalTime = 5000;
        /**
         * 连接状态
         */
        private volatile boolean isConnect = false;
        /**
         * 是否需要重连
         */
        private boolean isNeedReconnect = true;
        /**
         * 重连次数
         */
        private static int reconnectNum = Integer.MAX_VALUE;
    
        private EventLoopGroup mEventLoopGroup;
        private Channel mChannel;
        private NettyClientHandler mNettyClientHandler;
        private DispterMessage mDispterMessage;
        private Thread mClientThread;
    
        private NettyConnectListener mNettyConnectListener;
        private List<NettyReceiveListener> mNettyReceiveListeners = new ArrayList<>();
        private NettyReceiveListener mNettyReceiveListener;
    
    
        private static class NettyClientHint {
            private static final NettyClient INSTANCE = new NettyClient();
        }
    
        private NettyClient() {
            mNettyClientHandler = new NettyClientHandler();
            mDispterMessage = new DispterMessage();
        }
    
        public static NettyClient getInstance() {
            return NettyClientHint.INSTANCE;
        }
    
        public void connect(NettyConnectListener listener) {
            if (isConnect) {
                return;
            }
            mNettyReceiveListeners.clear();
    
            mNettyConnectListener = listener;
            mClientThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    connectServer();
                }
            });
            mClientThread.start();
        }
    
        /**
         * 连接到netty服务器
         */
        private void connectServer() {
            if (mChannel != null) {
                mChannel = null;
            }
            try {
                mEventLoopGroup = new NioEventLoopGroup();
                Bootstrap mBootstrap = new Bootstrap();
                mBootstrap.group(mEventLoopGroup)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                //粘包处理
                                pipeline.addLast("line", new LineBasedFrameDecoder(1024));
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast("handler", mNettyClientHandler);
                            }
                        });
    
                ChannelFuture mChannelFuture = mBootstrap
                        .connect(new InetSocketAddress(NettyConstant.HOST, NettyConstant.PORT)).sync();
                mChannel = mChannelFuture.channel();
                mChannelFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (channelFuture.isSuccess()) {
                            isConnect = true;
                            mChannel = channelFuture.channel();
                            if (mNettyConnectListener != null) {
                                //   mNettyConnectListener.connectSucc();
                                postMsg(null, mNettyConnectListener, null, DispterMessage.MSG_CONN_SUCC);
                            }
    
                        } else {
                            if (mNettyConnectListener != null) {
                                //   mNettyConnectListener.connectFail("连接失败,channelFuture is not success");
                                postMsg("连接失败,channelFuture is not success", mNettyConnectListener, null, DispterMessage.MSG_CONN_FAIL);
                            }
                            isConnect = false;
    
                        }
    
                    }
                });
                mChannel.closeFuture().sync();
            } catch (InterruptedException e) {
                if (mNettyConnectListener != null) {
                    //  mNettyConnectListener.connectFail(e.getMessage());
                    postMsg(e.getMessage(), mNettyConnectListener, null, DispterMessage.MSG_CONN_FAIL);
                }
                isConnect = false;
                e.printStackTrace();
            } catch (Exception e) {
                if (mNettyConnectListener != null) {
                    postMsg(e.getMessage(), mNettyConnectListener, null, DispterMessage.MSG_CONN_FAIL);
                }
                e.printStackTrace();
            } finally {
                isConnect = false;
                if (mNettyConnectListener != null) {
                    postMsg(null, mNettyConnectListener, null, DispterMessage.MSG_CONN_DIS);
                }
                disconnect();
                mEventLoopGroup.shutdownGracefully();
    
            }
    
    
        }
    
        public void disconnect() {
           /* if (mClientThread != null) {
                mClientThread.interrupt();
                mClientThread = null;
            }*/
            if (mNettyConnectListener != null) {
                postMsg(null, mNettyConnectListener, null, DispterMessage.MSG_CONN_DIS);
            }
            clearReceiveLisenter();
    
            isConnect = false;
            isNeedReconnect = false;
            mEventLoopGroup.shutdownGracefully();
        }
    
        public void reconnect() {
            Log.e(TAG, "reconnect");
            if (isNeedReconnect && reconnectNum > 0 && !isConnect) {
                reconnectNum--;
                SystemClock.sleep(reconnectIntervalTime);
                if (isNeedReconnect && reconnectNum > 0 && !isConnect) {
                    disconnect();
                    SystemClock.sleep(reconnectIntervalTime);
                    connectServer();
                }
            }
        }
    
        public synchronized void send(String msg, NettyReceiveListener listener) {
            mNettyReceiveListener = listener;
            if (mChannel == null) {
                postMsg("channel is null", null, mNettyReceiveListener, DispterMessage.MSG_RECEIVE_FAIL);
                return;
            }
    
            if (!mChannel.isWritable()) {
                postMsg("channel is not Writable", null, mNettyReceiveListener, DispterMessage.MSG_RECEIVE_FAIL);
                return;
            }
            if (!mChannel.isActive()) {
                postMsg("channel is not active!", null, mNettyReceiveListener, DispterMessage.MSG_RECEIVE_FAIL);
                return;
            }
            if (mChannel != null) {
                addReceiveLisenter(mNettyReceiveListener);
                mChannel.writeAndFlush(msg + System.getProperty(NettyConstant.MAG_SEPARATOR_1));
            }
        }
    
        public void addReceiveLisenter(NettyReceiveListener listener) {
            if (listener != null && !mNettyReceiveListeners.contains(listener)) {
                mNettyReceiveListeners.add(listener);
            }
        }
    
        public void removeCurrentReceiveLisenter() {
            if (mNettyReceiveListener != null && mNettyReceiveListeners.size() > 0) {
                mNettyReceiveListeners.remove(mNettyReceiveListener);
            }
    
        }
    
        public void removeReceiveLisenter(NettyReceiveListener listener) {
            if (listener != null && mNettyReceiveListeners.contains(listener)) {
                mNettyReceiveListeners.remove(listener);
            }
        }
    
        public void clearReceiveLisenter() {
            mNettyReceiveListeners.clear();
        }
    
        public void handMsg(String msg) {
            for (NettyReceiveListener listener : mNettyReceiveListeners) {
                if (listener != null) {
                    postMsg(msg, null, listener, DispterMessage.MSG_RECEIVE_SUCC);
                }
            }
        }
    
        public void handErrorMsg(String msg) {
            for (NettyReceiveListener listener : mNettyReceiveListeners) {
                if (listener != null) {
                    postMsg(msg, null, listener, DispterMessage.MSG_RECEIVE_FAIL);
                }
            }
        }
    
    
        private void postMsg(String msg, NettyConnectListener connectListener, NettyReceiveListener receiveListener, int type) {
            ReplyMessage message = new ReplyMessage();
            message.setConnectListener(connectListener);
            message.setMsg(msg);
            message.setReceiveListener(receiveListener);
            message.setType(type);
    
            mDispterMessage.handMsg(message);
        }
    
    
    }
    
    

    设计的主要类:
    1.EventLoopGroup 可以理解为将多个EventLoop进行分组管理的一个类,是EventLoop的一个组。
    2.Bootstrap 启动帮助类 参数主要这里配置的
    chnnel()
    handler() 主要用来接收处理消息 本demo里面用到的是netty自带的一种解码器LineBasedFrameDecoder
    option 设置socket相关参数
    测试服务端代码:

    
    public class TestServer {
    
        public static void main(String[] agrs) {
            new TestServer().bind(8080);
        }
    
        private void bind(int port) {
            EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap(); // (2)
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class) // (3)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline pipeline = socketChannel.pipeline();
                                pipeline.addLast("line", new LineBasedFrameDecoder(1024));
                                pipeline.addLast("decoder", new StringDecoder());
                                pipeline.addLast("encoder", new StringEncoder());
                                pipeline.addLast("handler", new TestServerHandler());
                            }
                        })
                        .option(ChannelOption.SO_BACKLOG, 1024);
    
    
                System.out.println("SimpleChatServer 启动");
                // 绑定端口,开始接收进来的连接
                ChannelFuture f = b.bind(port).sync();
                f.channel().closeFuture().sync();
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                workerGroup.shutdownGracefully();
                bossGroup.shutdownGracefully();
                System.out.println("SimpleChatServer 关闭了");
            }
    
        }
    }
    

    TestServerHandler 消息处理器

    public class TestServerHandler extends SimpleChannelInboundHandler<String> {
        String TAG_LINE = "line.separator";
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
            System.out.println("server receive msg:" + s);
            channelHandlerContext.writeAndFlush("[reply]:" + s + System.getProperty(TAG_LINE));
    
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
           cause.printStackTrace();
           ctx.close();
        }
    }
    

    相关文章

      网友评论

        本文标题:Netty android移动端通讯简单封装

        本文链接:https://www.haomeiwen.com/subject/dtzkyftx.html