美文网首页
Android开发中如何使用WebSocket 实现消息通信

Android开发中如何使用WebSocket 实现消息通信

作者: 像程序那样去思考 | 来源:发表于2023-04-05 20:29 被阅读0次

    消息推送功能可以说是手机APP不可或缺的功能之一,一般我们可以使用第三方推送的SDK进行简单推送,比如极光推送、信鸽推等,但是对于消息聊天的时效性或者三方推送不能满足业务需求,我们需要使用WebSocket来实现消息推送功能。

    基本流程

    基于开源协议我们封装实现WebSocket的连接、注册、心跳、消息分发、超时任务功能,基本流程如下:


    连接功能实现

    首先我们新建一个项目,在build.grade中添加配置

    compile 'com.neovisionaries:nv-websocket-client:2.2'
    

    新建websocket管理类WsManger

    public class WsManager {
    
        private volatile static WsManager wsManger;
    
        private WsManager() {
        }
    
        public static WsManager getWsManger() {
            if (wsManger == null) {
                synchronized (WsManager.class) {
                    if (wsManger == null) {
                        wsManger = new WsManager();
                    }
                }
            }
            return wsManger;
        }
    
    
    }
    

    接下来添加连接方法,我们将webSocket的状态分为三种,新建WsStatue枚举类对应起来

    public enum WsStatus {
    
        /**
         * 连接成功
         */
        CONNECT_SUCCESS,
        /**
         * 连接失败
         */
        CONNECT_FAIL,
        /**
         * 正在连接
         */
        CONNECTING;
    }
    

    连接方法如下所示:

    /**
     * 连接方法 这里要判断是否登录 此处省略
     */
    public void connect() {
        //WEB_SOCKET_API 是连接的url地址,
        // CONNECT_TIMEOUT是连接的超时时间 这里是 5秒
        try {
            ws = new WebSocketFactory().createSocket(WEB_SOCKET_API, CONNECT_TIMEOUT)
                    //设置帧队列最大值为5
                    .setFrameQueueSize(5)
                    //设置不允许服务端关闭连接却未发送关闭帧
                    .setMissingCloseFrameAllowed(false)
                    //添加回调监听
                    .addListener(new WsListener())
                    //异步连接
                    .connectAsynchronously();
        } catch (IOException e) {
            e.printStackTrace();
        }
        setStatus(WsStatus.CONNECTING);
    }
    

    调用连接方法后 我们来看连接的回调 也就是WsListener

    /**
     * websocket回调事件
     */
    private class WsListener extends WebSocketAdapter {
    
    
        @Override
        public void onConnected(WebSocket websocket, Map<String, List<String>> headers) throws Exception {
            Log.d(TAG, "onConnected: 连接成功");
        }
    
        @Override
        public void onConnectError(WebSocket websocket, WebSocketException exception) throws Exception {
            Log.d(TAG, "onConnectError: 连接失败");
        }
    
        @Override
        public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame,
                                   WebSocketFrame clientCloseFrame,
                                   boolean closedByServer) throws Exception {
            Log.d(TAG, "onDisconnected: 断开连接");
    
        }
    
        @Override
        public void onTextMessage(WebSocket websocket, String text) throws Exception {
            Log.d(TAG, "onTextMessage: 收到消息:" + text);
        }
    }
    

    下面我们调用连接方法

    WsManager.getWsManger().connect();
    

    运行项目我们可以看到如下打印:


    image.png

    此处我们要做的处理是,如果收到连接失败或者断开连接的回调 需要重新连接,我们重新调用一次连接方法即可,并且如果超过三次重连失败,我们在业务中可以通过调用接口来获取数据,避免数据丢失,此处细节省略。

    协议封装

    此处协议如下所示:

    {
         "action":"",
         "requestChild":{
             "clientType":"",
             "id":""
         }
     }
    

    心跳、发送请求属于客户端 主动发送请求,我们可以将请求结果分为成功失败和超时,发送超时我们是收不到服务器任何回复的,所以我们需要在发送之后将发送放在超时任务队列中,如果请求成功将任务从超时队列中移除,超时从超时队列中获取任务重新请求。

    超时任务队列中的回调成功、失败、超时。

    根据上述协议,我们添加相应的实体类,并采用Builder设计模式

    ublic class Request {
    
        /**
         * 行为
         */
        private String action;
    
        /**
         * 请求体
         */
        private RequestChild req;
    
    
        /**
         * 请求次数
         */
        private transient int reqCount;
    
        /**
         * 超时的时间
         */
        private transient int timeOut;
    
    
        public Request() {
        }
    
    
        public Request(String action, int reqCount, int timeOut, RequestChild req) {
            this.action = action;
            this.req = req;
            this.reqCount = reqCount;
            this.timeOut = timeOut;
        }
        
    
        public static class Builder {
            //action 请求类型
            private String action;
            //请求子类数据 按照具体业务划分
            private RequestChild req;
            //请求次数 便于重试
            private int reqCount;
            //超时时间
            private int timeOut;
    
            public Builder action(String action) {
                this.action = action;
                return this;
            }
    
    
            public Builder req(RequestChild req) {
                this.req = req;
                return this;
            }
    
    
            public Builder reqCount(int reqCount) {
                this.reqCount = reqCount;
                return this;
            }
    
            public Builder timeOut(int timeOut) {
                this.timeOut = timeOut;
                return this;
            }
    
            public Request build() {
                return new Request(action, reqCount, timeOut, req);
            }
    
        }
    }
    
    public class RequestChild {
    
        /**
         * 设备类型
         */
        private String clientType;
    
    
        /**
         * 用于用户注册的id
         */
        private String id;
    
        public RequestChild(String clientType, String id) {
            this.clientType = clientType;
            this.id = id;
        }
    
        public RequestChild() {
        }
    
    
        public static class Builder {
            private String clientType;
            private String id;
    
            public RequestChild.Builder setClientType(String clientType) {
                this.clientType = clientType;
                return this;
            }
    
    
            public RequestChild.Builder setId(String id) {
                this.id = id;
                return this;
            }
    
    
            public RequestChild build() {
                return new RequestChild(clientType, id);
            }
    
        }
    
    
    }
    

    我们添加一个发送请求的方法如下:

    /**
     * 发送请求
     *
     * @param request        请求体
     * @param reqCount       请求次数
     * @param requestListern 请求回调
     */
    private void senRequest(Request request, final int reqCount, final RequestListern requestListern) {
        if (!isNetConnect()) {
            requestListern.requestFailed("网络未连接");
            return;
        }
    
    }
    

    请求回调如下所示

    public interface RequestListern {
    
        /**
         * 请求成功
         */
        void requestSuccess();
    
        /**
         * 请求失败
         *
         * @param message 请求失败消息提示
         */
        void requestFailed(String message);
    }
    

    接着我们要把请求放在超时队列中,新建超时任务类,对应的分别是请求参数、请求回调、任务调度

    public class TimeOutTask {
    
    
        /**
         *  请求主体
         */
        private Request request;
    
        /**
         * 通用返回
         */
        private RequestCallBack requestCallBack;
    
        /**
         * r任务
         */
        private ScheduledFuture scheduledFuture;
    
    
        public TimeOutTask(Request request,
                               RequestCallBack requestCallBack,
                               ScheduledFuture scheduledFuture) {
            this.request = request;
            this.requestCallBack = requestCallBack;
            this.scheduledFuture = scheduledFuture;
        }
    
        public ScheduledFuture getScheduledFuture() {
            return scheduledFuture;
        }
    
        public void setScheduledFuture(ScheduledFuture scheduledFuture) {
            this.scheduledFuture = scheduledFuture;
        }
    
        public Request getRequest() {
            return request;
        }
    
        public void setRequest(Request request) {
            this.request = request;
        }
    
        public RequestCallBack getRequestCallBack() {
            return requestCallBack;
        }
    
        public void setRequestCallBack(RequestCallBack requestCallBack) {
            this.requestCallBack = requestCallBack;
        }
    
    }
    

    RequestCallBack是超时任务的回调,只是比请求回调多了个超时,因为超时的处理机制是一样的,所以这里我们没必要将超时回调到请求中

    public interface RequestCallBack {
    
        /**
         * 请求成功
         */
        void requestSuccess();
    
        /**
         * 请求失败
         *
         * @param request  请求体
         * @param message  请求失败的消息
         */
        void requestFailed(String message, Request request);
    
        /**
         * 请求超时
         *
         * @param request  请求体
         */
        void timeOut(Request request);
    }
    
    /**
     * 添加超时任务
     */
    private ScheduledFuture enqueueTimeout(final Request request, final long timeout) {
        Log.d(TAG, "  " + "enqueueTimeout: 添加超时任务类型为:" + request.getAction());
        return executor.schedule(new Runnable() {
            @Override
            public void run() {
                TimeOutTask timeoutTask = callbacks.remove(request.getAction());
                if (timeoutTask != null) {
                    timeoutTask.getRequestCallBack().timeOut(timeoutTask.getRequest());
                }
            }
        }, timeout, TimeUnit.MILLISECONDS);
    }
    

    超时任务的方法 是通过任务调度定时调用,请求成功后我们会把超时任务移除,当到了超时时间时,任务还存在就说明任务超时了。

    每次的任务我们以action为键值存在hashMap中

    private Map<String, CallbackWrapper> callbacks = new HashMap<>();
    

    将任务放入超时任务代码如下所示:

    final ScheduledFuture timeoutTask = enqueueTimeout(request, request.getTimeOut());
    
    final RequestCallBack requestCallBack = new RequestCallBack() {
        @Override
        public void requestSuccess() {
            requestListern.requestSuccess();
        }
    
        @Override
        public void requestFailed(String message, Request request) {
            requestListern.requestFailed(message);
        }
    
        @Override
        public void timeOut(Request request) {
            timeOutHanlder(request);
        }
    };
    callbacks.put(request.getAction(),
            new CallbackWrapper(request, requestCallBack, timeoutTask));
    

    一般而言,任务超时都是由于连接原因导致,所以我们这里可以尝试重试一次,如果还是超时,通过 timeOutHanlder(request);方法 进行重新连接,重连代码和连接代码一样,这里就省略了,做好这步操作,我们就可以发送消息了。

    /**
     * 超时任务
     */
    private void timeOutHanlder(Request requset) {
        setStatus(WsStatus.CONNECT_FAIL);
        //这里假装有重连
        Log.d(TAG, "timeOutHanlder: 请求超时 准备重连");
    }
    

    到这里我们的流程基本可以走通了。

    心跳

    首先,我们要知道心跳的作用是什么,连接成功后,heartbeat可以固定的时间间隔向服务器发送询问,当前是否还在线,很多人说心跳失败了就重新连接,心跳成功就继续心跳,但是这里需要注意的是,我们一般收不到心跳失败回调的,心跳也是向服务器发送数据,所以我们要将所有的主动请求都放在超时任务队列中,所以对websocket来说 请求结果有三种:成功、失败、超时,对于用户 只有成功、失败即可。

    至于心跳、注册等请求发送的数据是什么,这就得看我们与服务端定的协议是什么样了,通常来说 分为action 和 requestBody,协议格式我们再第二步已经封装好了,这里我们以心跳任务为例验证上面的封装。

    /**
     * 心跳
     */
    void keepAlive() {
    
        Request request = new Request.Builder()
                .reqCount(0)
                .timeOut(REQUEST_TIMEOUT)
                .action(ACTION_KEEPALIVE).build();
    
        WsManager.getWsManger().senRequest(request, request.getReqCount() + 1, new RequestListern() {
            @Override
            public void requestSuccess() {
                Log.d(TAG, "requestSuccess: 心跳发送成功了");
            }
    
            @Override
            public void requestFailed(String message) {
            }
        });
    }
    

    我们每间隔10s中开启一次心跳任务

    /**
     * 开始心跳
     */
    public void startKeepAlive() {
        mHandler.postDelayed(mKeepAliveTask, HEART_BEAT_RATE);
    }
    
    /**
     * 心跳任务
     */
    private Runnable mKeepAliveTask = new Runnable() {
    
        @Override
        public void run() {
            keepAlive();
            mHandler.removeCallbacks(mKeepAliveTask);
            mHandler.postDelayed(mKeepAliveTask, HEART_BEAT_RATE);
        }
    };
    

    为了便于操作演示,在主页面上加个按钮 ,点击按钮调用startKeepAlive方法,运行如下所示:


    image.png

    我们可以看到心跳返回的statue是300 不成功,5秒之后走到了请求超时的方法中,所以如果状态返回成功的话,我们需要回调给调用者。

    /**
     * 处理 任务回调
     *
     * @param action 请求类型
     */
    void disPatchCallbackWarp(String action, boolean isSuccess) {
        CallbackWrapper callBackWarp = callbacks.remove(action);
        if (callBackWarp == null) {
            Logger.d(TAG+"  "+ "disPatchCallbackWarp: 任务队列为空");
        } else {
            callBackWarp.getScheduledFuture().cancel(true);
            if (isSuccess) {
                callBackWarp.getRequestCallBack().requestSuccess();
            } else {
                callBackWarp.getRequestCallBack().requestFailed("", new Request());
            }
    
        }
    }
    

    这样调用者才知道成功或失败。

    发送其他消息与心跳一样,只是请求参数不同而已,修改Request参数即可。这样我们根据协议和业务就实现一个比较规范的webSocket消息推送流程了。

    相关文章

      网友评论

          本文标题:Android开发中如何使用WebSocket 实现消息通信

          本文链接:https://www.haomeiwen.com/subject/eflyddtx.html