美文网首页
基于Netty实现webStocket搭建消息推送

基于Netty实现webStocket搭建消息推送

作者: KAImportad | 来源:发表于2020-11-30 17:16 被阅读0次

    1 添加netty依赖

     implementation rootProject.ext.dependencies["netty"]    
    "netty"   : "io.netty:netty-all:4.1.9.Final"
    

    2 创建AIDL文件用于服务器推送消息回调

    interface IPCInteract {
                /** 发起连接 传入service端推送数据时的接收类 */
               void connect(IPCServicePush iPCServicePush);
    
               /** 请求数据  */
               void reqData(String sub);
    
               /** 重连 */
               void doReConnect();
    
               /** 强制重连 */
               void forReConnect();
    }
    
    
    
    
    interface IPCServicePush {
              /** 服务端推送数据 */
               void servicePushData(String data);
    
               /** 通知客户端设置服务连接状态 */
               void serviceConnectStatus(boolean status);
    
               void sendPendingData();
    }
    

    3 创建处理WebStock请求的 ChannelHandler

    ** channelRead0方法中的msg即为服务器推送数据 **

      private static final String TAG = "WebSocketClientHandler";
        /**
         * 用于 WebSocket 的握手
         */
        private WebSocketClientHandshaker handShaker;
        private ChannelPromise channelPromise;
    
        public ChannelHandle() {
            try {
                Log.e(TAG, Config.scheme.get() + "://" + Config.address.get() + ":" + Config.port.get() + Config.path.get() + Config.token.get());
                final URI uri = new URI(Config.scheme.get() + "://" + Config.address.get() + ":" + Config.port.get() + Config.path.get() + "/" + Config.token.get());
                this.handShaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
            } catch (URISyntaxException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            Channel ch = ctx.channel();
    
            if (!handShaker.isHandshakeComplete()) {
                try {
                    handShaker.finishHandshake(ch, (FullHttpResponse) msg);
                    channelPromise.setSuccess();
                    Log.i(TAG, "WebSocket Client handShaker!" + Thread.currentThread());
                } catch (WebSocketHandshakeException e) {
                    channelPromise.setFailure(e);
                    Log.i(TAG, "WebSocket Client failed to handShaker");
                }
            }
    
            handleTextWebSocketFrame((TextWebSocketFrame) msg);
        }
    
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            channelPromise = ctx.newPromise();
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            //进行http握手
            handShaker.handshake(ctx.channel());
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
            Log.i(TAG, "channelInactive");
        }
    
        private void handleTextWebSocketFrame(TextWebSocketFrame frame) {
            NettyClient.getInstance().pushDataToClient(frame.text());
        }
    

    4 心跳处理Handler 一段时间内未进行读写操作 触发 userEventTriggered

    public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
        private static final String TAG = "HeartBeatHandler";
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            super.channelRead(ctx, msg);
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            super.userEventTriggered(ctx, evt);
    
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent ev = (IdleStateEvent) evt;
                Logger.d(TAG, "userEventTriggered  ----> " + ev.state() + " address:" + ctx.channel().remoteAddress() + " / " + ctx.channel().isActive());
    
                NettyClient.getInstance().setClientConnectStatus(ctx.channel().isActive());
                switch (ev.state()) {
                    case ALL_IDLE:
                        break;
                    case READER_IDLE:
                        break;
                    case WRITER_IDLE:
                        break;
                }
            }
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
            Log.d(TAG, "channelInactive " + ctx.channel().remoteAddress());
            if (NettyClient.getInstance().isConnectService()) {
                NettyClient.getInstance().notifyChannelInactive();
            }
            //设置服务器连接状态为false
            NettyClient.getInstance().setConnectService(false);
            NettyClient.getInstance().setClientConnectStatus(false);
            //当与服务器断开连接时 1s后重新连接
            ctx.channel().eventLoop().schedule(() -> {
                Logger.d("HeartBeatHandler  -----   与服务器断开连接,准备重新连接服务器");
                NettyClient.getInstance().doReConnect();
            }, 1000, TimeUnit.MILLISECONDS);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            Log.d(TAG, "channelActive " + ctx.channel().remoteAddress());
            //设置服务器连接状态为false
            NettyClient.getInstance().setConnectService(true);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            Log.e(TAG, "exceptionCaught ---->" + cause.getMessage());
        }
    }
    
    

    5 单例NettyClient

    public class NettyClient {
    
        private static final String TAG = "NettyClient";
    
    
        /**
         * 是否链接到服务器
         */
        private boolean connectService;
        private Bootstrap bootstrap;
        private ChannelFuture channelFuture;
        private ChannelFutureListener listener;
        private Channel channel;
        private final NioEventLoopGroup nioEventLoopGroup;
    
    
        private              IPCServicePush        ipcServicePush;
    
    
        private IPCInteract.Stub  mBinder =  new IPCInteract.Stub() {
            @Override
            public void connect(IPCServicePush iPCServicePush) throws RemoteException {
                ipcServicePush = iPCServicePush;
            }
    
            @Override
            public void reqData(String sub) throws RemoteException {
                sendDataToServer(sub);
            }
    
            @Override
            public void doReConnect() throws RemoteException {
                NettyClient.getInstance().doReConnect();
            }
    
            @Override
            public void forReConnect() throws RemoteException {
    
            }
        };
    
    
        public NettyClient() {
    
            //进行初始化 初始化线程组
            nioEventLoopGroup = new NioEventLoopGroup();
            bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class).group(nioEventLoopGroup);
            bootstrap.option(ChannelOption.TCP_NODELAY, true); //无阻塞
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true); //保持长连接
            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
            bootstrap.handler(new LoggingHandler());
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                            .addLast(new IdleStateHandler(0, 0, 10))
                            .addLast(new HttpClientCodec())  //HttpClientCodec将请求和应答消息编码或解码为HTTP消息
                            .addLast(new HttpObjectAggregator(8192))//通常接收到的http是一个片段,如果想要完整接受一次请求所有数据,我们需要绑定HttpObjectAggregator
                            .addLast(new WebSocketFrameAggregator(10 * 1048576))
                            .addLast(new ChannelHandle())//处理数据接收
                            .addLast(new HeartBeatHandler()); //检测心跳
    
                }
            });
    
    
            listener = future -> {
                if (future.isSuccess()) {
                    Logger.d(TAG, "ChannelFutureListener:operationComplete status is success");
                    future.channel().flush();
                } else {
                    Logger.e(TAG, "ChannelFutureListener:operationComplete status is failed");
                    //与服务器连接失败 1s后重现连接一次
                    future.channel().eventLoop().schedule(() -> {
                        Logger.d("与服务器连接失败,准备重新连接服务器(listener)");
                        doReConnect();
                    }, 1000, TimeUnit.MILLISECONDS);
                }
            };
    
        }
    
    
        /**
         * 发送数据
         */
        private void sendDataToServer(String sub) {
            if (getChannel() != null && getChannel().isActive()) {
                getChannel().writeAndFlush(new TextWebSocketFrame(sub));
            }
        }
    
    
    
        /**
         * 连接服务
         */
        public void connect(final String intentHost, final int intentPort) {
    
            new Thread(() -> {
                try {
                    Logger.w(TAG, "开始连接服务器,服务器地址:" + Config.address.get() + ":" + Config.port.get());
                    channelFuture = bootstrap.connect(intentHost, intentPort).sync();//sync 通过同步方法阻塞直到连接服务器完成
                    channelFuture.addListener(listener);
                    channel = channelFuture.channel();
                } catch (Exception e) {
                    Logger.d(TAG, "connect Exception -----> \n" + e);
    
                    ToastUtil.toast(e.toString());
    
                    e.printStackTrace();
                } finally {
                }
            }).start();
        }
    
    
        /**
         * 重新链接
         */
        public synchronized void doReConnect() {
            //判断是否断开连接,如果服务还连接,就跳过重连操作
            if (connectService) {
                return;
            }
    
            new Thread(() -> {
                try {
                    if (channel != null) {
                        channel.close();
                    }
                    Logger.w(TAG, "开始重新连接服务器,服务器地址:" + Config.address.get() + ":" + Config.port.get());
                    channelFuture = bootstrap.connect(Config.address.get(), Config.port.get()).sync();//sync 通过同步方法阻塞直到连接服务器完成
                    channelFuture.addListener(listener);
                    channel = channelFuture.channel();
                } catch (Exception e) {
                    Log.d(TAG, "connect Exception -----> \n" + e);
                    e.printStackTrace();
    
                } finally {
    
    
                }
            }).start();
        }
    
    
        /** 推送数据到client */
        public void pushDataToClient(String data) {
            try {
                ipcServicePush.servicePushData(data);
            } catch (RemoteException e) {
                e.printStackTrace();
            }
        }
    
    
    
        /**
         * 提示前台服务断开链接
         */
        public void notifyChannelInactive() {
    
        }
    
    
        /** 通知客户端设置连接状态 */
        public void setClientConnectStatus(boolean status) {
            //        Log.e(TAG,"setClientConnectStatus "+ ipcServicePush);
            if (ipcServicePush != null) {
                try {
                    ipcServicePush.serviceConnectStatus(status);
                } catch (RemoteException e) {
                    e.printStackTrace();
                }
            }
        }
    
    
    
        /** 关闭服务 */
        public void close() {
            if (channel != null) {
                channel.close();
            }
        }
    
    
        /**
         * 是否链接到服务器
         */
        public boolean isConnectService() {
            return connectService;
        }
    
    
        public void setConnectService(boolean connectService) {
            this.connectService = connectService;
        }
    
    
        public Channel getChannel() {
            return channel;
        }
    
    
        public IPCInteract.Stub getMBinder() {
            return mBinder;
        }
    
        public static NettyClient getInstance() {
            return NettyInstance.instance;
        }
    
        private static class NettyInstance {
            private static NettyClient instance = new NettyClient();
        }
    
    
    
    
    }
    
    

    6 封装ipc服务的方法

    public class IPCUtil {
    
    
       private static final String TAG = "IPCUtil";
    
       private Gson gson = new Gson();
    
    
       /**
        * 是否已经绑定服务
        */
       private boolean isBound = false;
    
       /**
        * 服务是否连接
        */
       private boolean connect;
    
       /**
        * 处理服务端推送过来的数据的处理类
        */
       private IPCHandler ipcHandler;
    
       /**
        * 与远程服务交互的ipc对象
        */
       private IPCInteract ipcInteract;
    
    
       /**
        * 绑定服务时需要用到的回调类
        */
       private ServiceConnection connection = new ServiceConnection() {
           @Override
           public void onServiceConnected(ComponentName name, IBinder service) {
               Log.d(TAG, "ServiceConnection  / " + Thread.currentThread() + "/" + System.currentTimeMillis());
               isBound = true;
               ipcInteract = IPCInteract.Stub.asInterface(service);
               initBinder();
           }
    
           @Override
           public void onServiceDisconnected(ComponentName name) {
               isBound = false;
           }
       };
    
    
       /**
        * 绑定服务端后初始化
        */
       private void initBinder() {
           try {
               ipcInteract.connect(ipcServicePush);
           } catch (RemoteException e) {
               e.printStackTrace();
           }
       }
    
    
       /**
        * 调度线程用 ipc传递过来的数据并非在ui线程中
        */
       @SuppressLint("HandlerLeak")
       private Handler mHandler = new Handler() {
           @Override
           public void handleMessage(Message msg) {
               handlerRecData((String) msg.obj);
           }
       };
    
    
       /**
        * service推送数据时 client接收类
        */
       private IPCServicePush ipcServicePush = new IPCServicePush.Stub() {
           @Override
           public void servicePushData(String data) throws RemoteException {
               //注意此时处理的线程是Binder线程,并不是ui线程,所以要用Handler进行线程切换
               Message message = Message.obtain();
               message.obj = data;
               mHandler.sendMessage(message);
           }
    
           @Override
           public void serviceConnectStatus(boolean status) throws RemoteException {
               connect = status;
           }
    
           @Override
           public void sendPendingData() throws RemoteException {
    
           }
       };
    
    
       private void handlerRecData(String data) {
    
           if (ipcHandler == null || TextUtils.isEmpty(data)) {
               Logger.e(TAG, "IPCUtil  ------ ipcHandler or serviceRecData cannot be empty");
               return;
           }
    
           ResponseBodyRec<BasePointRec> listDataRec = gson.fromJson(data, new TypeToken<ResponseBodyRec<BasePointRec>>() {
           }.getType());
    
    
           ipcHandler.onPointStatusReceive(listDataRec);
       }
    
    
       //******************************* 对外开放的方法 **************************************/
    
       /**
        * 服务已解绑
        */
       public void unBound() {
           isBound = false;
       }
    
    
       /**
        * 向服务端请求数据
        *
        * @param sub 请求参数类
        */
       public void reqData(Object sub) {
           try {
               if (ipcInteract != null || checkConnectAndDoReConnect()) {
                   ipcInteract.reqData((String) sub);
               }
           } catch (RemoteException e) {
               e.printStackTrace();
           }
       }
    
       /**
        * 断网重连
        */
       public void doReConnect() {
           if (ipcInteract == null) {
               return;
           }
           try {
               ipcInteract.doReConnect();
           } catch (RemoteException e) {
               e.printStackTrace();
           }
       }
    
    
       /**
        * 判断是否连接服务器 false就重新连接
        */
       private boolean checkConnectAndDoReConnect() {
           if (!connect) {
               Log.e(TAG, "connect is broken");
               doReConnect();
               return false;
           }
           return true;
       }
    
    
       public void switchIPCHandler(IPCHandler ipcHandler) {
           this.ipcHandler = ipcHandler;
       }
    
    
       //********************************** get方法 ****************************************/
    
    
       public boolean isBound() {
           return isBound;
       }
    
    
       public ServiceConnection getConnection() {
           return connection;
       }
    
    
       //*********************************** 单例化 ****************************************/
    
       public static IPCUtil getInstance() {
           return IPCUtilInstance.instance;
       }
    
       private static class IPCUtilInstance {
           private static IPCUtil instance = new IPCUtil();
       }
    
    
    }
    
    

    7 创建服务

    public class NettyService extends Service {
       private static final String TAG = "NettyService";
       //service端推送数据时的 client接收类
    
       @Override
       public int onStartCommand(Intent intent, int flags, int startId) {
           Log.d(TAG, "onStartCommand  / " + Thread.currentThread() + "/" + System.currentTimeMillis() + "/flags is " + flags);
           if (intent != null) {
    
               String token = (String) SharedInfo.getInstance().getValue(Constant.TOKEN, "");
    
               Config.address.set(intent.getStringExtra(BundleKeys.WS_HOST));
               Config.port.set(intent.getIntExtra(BundleKeys.WS_PORT, BaseParams.WS_PORT));
               Config.scheme.set(intent.getStringExtra(BundleKeys.WS_SCHEME));
               Config.path.set(intent.getStringExtra(BundleKeys.WS_PATH));
               Config.token.set(token);
    
               NettyClient.getInstance().connect(Config.address.get(), Config.port.get());
           }
           return START_STICKY;
       }
    
       @Nullable
       @Override
       public IBinder onBind(Intent intent) {
           return NettyClient.getInstance().getMBinder();
       }
    
       @Override
       public boolean onUnbind(Intent intent) {
           Log.d(TAG, "unbindService");
           NettyClient.getInstance().close();
           return super.onUnbind(intent);
       }
    
       @Override
       public void onDestroy() {
           Log.d(TAG, "onDestroy");
           NettyClient.getInstance().close();
           super.onDestroy();
       }
    }
    
    

    8 Activity中绑定服务 以及指定IpcHandler对象

    在Activity销毁时需要注意注销服务以及断开Stocket连接 否则会导致oom

    
    @Route(path = RouterUrl.MAIN, extras = RouterExtras.EXTRA_COMMON)
    public class MainAct extends BaseActivity {
    
        private ActMainBinding mainBinding;
        private MainCtrl mainCtrl;
    
        @Override
        protected void onCreate(@Nullable Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            mainBinding = DataBindingUtil.setContentView(this, R.layout.act_main);
            mainCtrl = new MainCtrl(mainBinding);
            mainBinding.setViewCtrl(mainCtrl);
    
            startService();
    
        }
    
        @Override
        protected void onStart() {
            super.onStart();
            bindService();
        }
    
    
        @Override
        protected void onResume() {
            super.onResume();
            IPCUtil.getInstance().switchIPCHandler(mainCtrl.getIpcHandler());
        }
    
    
    
        @Override
        protected void onDestroy() {
            if (IPCUtil.getInstance().isBound()) {
                unbindService(IPCUtil.getInstance().getConnection());
                stopService(new Intent(this, NettyService.class));
                IPCUtil.getInstance().unBound();
            }
            super.onDestroy();
        }
    
    
    
    
        /**
         * 启动服务
         */
        private void startService() {
            Intent intent = new Intent(this, NettyService.class);
            intent.putExtra(BundleKeys.WS_SCHEME, BaseParams.WS_SCHEME);
            intent.putExtra(BundleKeys.WS_HOST, BaseParams.WS_HOST);
            intent.putExtra(BundleKeys.WS_PORT,BaseParams.WS_PORT);
            intent.putExtra(BundleKeys.WS_PATH, BaseParams.WS_PATH);
            intent.putExtra(BundleKeys.WS_TOKEN, BaseParams.WS_TOKEN);
            startService(intent);
        }
    
        /**
         * 绑定服务
         */
        private void bindService() {
            Intent intent = new Intent(this, NettyService.class);
            bindService(intent, IPCUtil.getInstance().getConnection(), BIND_AUTO_CREATE);
        }
    
    
        @Override
        public void onBackPressed() {
            ActivityManage.onExit();
        }
    
    }
    
    
    

    相关文章

      网友评论

          本文标题:基于Netty实现webStocket搭建消息推送

          本文链接:https://www.haomeiwen.com/subject/pkhptktx.html