美文网首页
基于Rxjava2与OkHttp中WebSocket长连接封装(

基于Rxjava2与OkHttp中WebSocket长连接封装(

作者: 设计失 | 来源:发表于2018-12-11 23:11 被阅读188次

WebSocket 在开发中遇到的情况很少,导致在使用的时候可能遇到很多的问题,比如它的重连机制、发送数据的统一、结合Service使用;下面的文章将使用OkHttp中的WebSocket以及Rxjava2结合Service实现断开重连,发送和接收服务器的数据。

WebSocket

我们使用WebSocket的库有很多,类似 AndroidAsync,作者对它的介绍是一个更加底层的异步网络库,使用这个库也非常简单,传一个地址和协议再加一个回调接口,就能简单的使用:

AsyncHttpClient.getDefaultInstance()
    .websocket(get, "my-protocol", new WebSocketConnectCallback() {
    @Override
    public void onCompleted(Exception ex, WebSocket webSocket) {
        if (ex != null) {
            ex.printStackTrace();
            return;
        }
        webSocket.send("a string");
        webSocket.send(new byte[10]);
        webSocket.setStringCallback(new StringCallback() {
            public void onStringAvailable(String s) {
                System.out.println("I got a string: " + s);
            }
        });
        webSocket.setDataCallback(new DataCallback() {
            public void onDataAvailable(DataEmitter emitter, 
                              ByteBufferList byteBufferList) {
                System.out.println("I got some bytes!");
                // note that this data has been read
                byteBufferList.recycle();
            }
        });
    }
});

但是,我们看到这个库却是有非常多的类,通常的,我们的网络请求库都只有一个,现在大部分Android开发都会使用类似OkHttp或者Volley来请求网络,如果再加一个库的话,无疑增加了应用的代码量,这里我们使用OkHttp中内置的WebSocket,基于它的一个封装,我们也能写一个轻量级的WebSocket应用; 而OkHttp中的WebSocket使用也是封装方便:

// 创建一个Request
Request request = new Request.Builder()
        .url(socketUrl)
        .build();
OkHttpClient client = new OkHttpClient();
// 使用OkHttpClient 来创建一个WebSocket
client.newWebSocket(request, new WebSocketListener() {
    @Override
    public void onOpen(WebSocket webSocket, Response response) {
        super.onOpen(webSocket, response);
    }
    @Override
    public void onMessage(WebSocket webSocket, String text) {
        super.onMessage(webSocket, text);
    }
    @Override
    public void onMessage(WebSocket webSocket, ByteString bytes) {
        super.onMessage(webSocket, bytes);
    }
    @Override
    public void onClosing(WebSocket webSocket, int code, String reason) {
        super.onClosing(webSocket, code, reason);
    }
    @Override
    public void onClosed(WebSocket webSocket, int code, String reason) {
        super.onClosed(webSocket, code, reason);
    }
    @Override
    public void onFailure(WebSocket webSocket, Throwable t,
                @javax.annotation.Nullable Response response) {
        super.onFailure(webSocket, t, response);
    }
});
client.dispatcher().executorService().shutdown();

可以看到,先创建一个Request,我们在调用接口的时候,也是创建一个Request对象,然后使用OkHttpClient来创建一个WebSocket,回调到WebSocketListener监听接口之后,再做逻辑业务处理。(接口方法意思都很简单,使用的时候再说明)

Rxjava2 使用

前面的博客都简单的介绍了Rxjava2的使用,最近在项目中使用的也比较频繁,后面再写一些项目中Rxjava2的实战:

Rxjava2 学习创建型操作符
Rxjava2 学习变换操作符
Rxjava2 学习过滤操作符

项目开发:

既然是使用到逻辑业务上的操作,和界面无关,自然会想到Service; 将其直接封装成Service,在使用的时候直接bindService或者startService会方便很多,不知不觉中将逻辑业务和页面分隔开; 如果我们直接在页面中使用也未尝不可,但是这样就不便复用了。

所以开始我们先创建一个Service:

public WebSocketService extends Service {
    public static final String LOG_TAG = "WebSocketTest";
   
    @Nullable
    @Override
    public IBinder onBind(Intent intent) {
        Log.v(LOG_TAG, "----- onBind -----");
        return new ServiceBinder();
    }
    public class ServiceBinder extends Binder {
        public WebSocketService getService() {
            return WebSocketService.this;
        }
    }
    @Override
    public void onCreate() {
        super.onCreate();
        Log.i(LOG_TAG, "----- onCreate -----");
    }
    @Override
    public int onStartCommand(Intent intent, int flags, int startId) {
        //Use this to force restart service
        return START_STICKY;
    }
    @Override
    public void onDestroy() {
        super.onDestroy();
        Log.i(LOG_TAG, "----- onDestroy -----");
    }
}

Service常用的步骤,使用其中的方法,这里我们使用bindService回调一个ServiceConnect接口,因为我们需要使用到这个WebSocketService实例;接下来,我们就需要在onCreate方法中做一些初始化的操作:

/**
 * 初始化
 *
 * @param startReason
 * @param isFirstConnect
 */
private void initSocketWrapper(String startReason, boolean isFirstConnect) {
    // 拿到Reason,打印log
    Observable.just(startReason)
            .filter(new Predicate<String>() {
                @Override
                public boolean test(String s) throws Exception {
                    // 判断当前是否正在连接
                    if (isAttemptConnecting) {
                        Log.v(LOG_TAG, startReason + " : Should reconnect but"+
                                       "already in process, skip.");
                        return Boolean.FALSE;
                    }
                    return Boolean.TRUE;
                }
            })
            .subscribeOn(AndroidSchedulers.mainThread())
            .doOnNext(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    if ((mWebSocket == null)     // 如果已经为空
                        && (!isFirstConnect)         // 不是第一次连接
                        && (!isAttemptConnecting)) {    // 当前没有在尝试连接
                        showUiWebSocketStatus("与服务器失去连接!!!");
                    }
                }
            })
            .observeOn(Schedulers.io())
            .subscribe(s -> initSocket());
}    

上面写清楚了注释,这个方法主要是初始化,拿到当前的原因和是否是第一次连接,还有一个全局变量 isAttemptConnecting 来判断当前WebSocket是否在连接中,然后在这之间先判断 如果websocket为空并且不是第一次连接,而且还没有尝试连接,则toast提示用户断开连接!!!

接下来就是使用OkHttp中WebSocket创建连接了:

/**
 * 初始化WebSocket
 */
private void initSocket() {
    // ... 省略一些状态切换代码
    ...    

    // 开始初始化
    Observable.create(new ObservableOnSubscribe<WebSocket>() {
        @Override
        public void subscribe(ObservableEmitter<WebSocket> emitter) 
                                            throws Exception {
            //TODO 这里可以进行登录业务判断
            Request request = new Request.Builder()
                    .url(socketUrl)
                    .build();
            OkHttpClient client = new OkHttpClient();
            client.newWebSocket(request, new WebSocketListener() {
                @Override
                public void onOpen(WebSocket webSocket, Response response) {
                    super.onOpen(webSocket, response);
                    isAttemptConnecting = false;
                    connectionAttemptCount = 0;
                    // 连接成功之后
                    mWebSocket = webSocket;
                    dispatchStringMessage("连接成功!!!");
                    emitter.onNext(mWebSocket);
                    emitter.onComplete();
                }
                @Override
                public void onMessage(WebSocket webSocket, String text) {
                    super.onMessage(webSocket, text);
                    dispatchStringMessage(text);
                }
                @Override
                public void onMessage(WebSocket webSocket, ByteString bytes) {
                    super.onMessage(webSocket, bytes);
                }
                @Override
                public void onClosing(WebSocket webSocket, int code, String reason) {
                    super.onClosing(webSocket, code, reason);
                }
                @Override
                public void onClosed(WebSocket webSocket, int code, String reason) {
                    super.onClosed(webSocket, code, reason);
                    Log.i(LOG_TAG, "ClosedCallback: WebSocket closed.");
                    // 等待自检重启,或者自然关闭
                    if ((!preparedShutdown) && (shouldAutoReconnect)) {
                        initSocketWrapper("onClose");
                    }
                }
                @Override
                public void onFailure(WebSocket webSocket, Throwable t, 
                                  @Nullable Response response) {
                    super.onFailure(webSocket, t, response);
                    dispatchStringMessage("连接失败!!!");
                    emitter.onError(t != null ? t 
                        : new ConnectException("Cannot connect we service!!!"));
                }
            });
            client.dispatcher().executorService().shutdown();
        }
    }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
      .subscribe(new Consumer<WebSocket>() {
        @Override
        public void accept(WebSocket webSocket) throws Exception {
            if (pongService == null) {
                startPongDaemonService();
            }
        }
    }, 
    new Consumer<Throwable>() {
    @Override
    public void accept(Throwable throwable) throws Exception {
        throwable.printStackTrace();
        // 判断是否需要执行诊断服务
        if (connectionAttemptCount >= ATTEMPT_TOLERANCE) {
            // 强制开始诊断服务
            startService(new Intent(WebSocketService.this, 
                                    NetworkDiagnosisService.class));
            // 重置标记
            connectionAttemptCount = 0;
        }
    }
}

使用create操作符创建一个被观察者对象发射器,在其中使用OkHttp的创建WebSocket方式创建WebSocket,然后根据连接的结果进行emitter发射 onNext()、onError()、onComplete() 方法,连接成功之后,开始发送 自检服务 Pong; 连接失败之后,开始检测网络是否有连接:NetWorkDiagnosisService 网络诊断服务。

发送Pong守护进程,先创建一个单线程线程池,然后发送消息:

/**
 * 给服务器发送Pong自检
 */
private void startPongDaemonService() {
    pongService = Executors.newSingleThreadScheduledExecutor();
    pongService.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            if (mWebSocket != null) {
                sendRequest(WsObjectPool.newPongRequest());
            }
        }
    }, 10, 10, TimeUnit.SECONDS);
    Log.i(LOG_TAG, "Pong service has been scheduled at " + 10 + " seconds delay.");
}

延迟十秒执行,使用一个WebSocketObjectPool对象池,为了方便取出对象数据。

整个WebSocket初始化就在上面,流程还是比较简单的,接下来就是从服务器拿到数据之后分发数据了, 因为我这里测试只用到String数据,如果需要用到Json的数据,则分发json数据即可:

    /**
     * 方法字符串
     *
     * @param message
     */
    private void dispatchStringMessage(String message) {
        Observable.just(message)
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {
                        WsListener<String> listener =
                         (WsListener<String>)activeListener.get(
                 SocketConstants.ResponseType.RESPONSE_STRING_MESSAGE);
                        Log.d(LOG_TAG, "Msg entity: " + s + ".");
                        if (listener != null) {
                            listener.handleData(s);
                        }
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

拿到message String数据之后,或者到外部监听器,因为是使用到String字符串类型,所以这里直接获取到一个监听器,然后分发给方法。

如果是方法json的话,就去GitHub下载代码查看详细代码:代码

自检服务

连接成功之后,我们需要自检,因为可能在连接过程中出现断连的情况,网络不稳定情况,所以需要使用循环的自检:

/**
 * 启动自检服务,按照周期执行
 */
private void startSelfCheckService() {
    // 自检服务器打开
    mSelfCheckDispose = Observable
            .interval(10, 10, TimeUnit.SECONDS)
            .filter(new Predicate<Long>() {
                @Override
                public boolean test(Long aLong) throws Exception {
                    if (!shouldAutoReconnect) {
                        Log.i(LOG_TAG, "Auto reconnect has been disabled,"
                                       "maybe kicked?");
                    }
                    return shouldAutoReconnect;
                }
            })
            .map(new Function<Long, Boolean>() {
                @Override
                public Boolean apply(Long aLong) throws Exception {
                    return checkSocketAvailable();
                }
            })
            .subscribeOn(Schedulers.computation())
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    Log.i(LOG_TAG, "Self check task has been scheduled per " 
                                    + 10 + " seconds.");
                    shouldAutoReconnect = true;
                    Log.i(LOG_TAG, "Auto reconnect feature has been enabled.");
                }
            })
            .subscribe(new Consumer<Boolean>() {
                           @Override
                           public void accept(Boolean webSocketAlive)
                                             throws Exception {
                               if (webSocketAlive) {
                                   Log.v(LOG_TAG, "WebSocket self check: is alive.");
                                   return;
                               }
                               // 自检服务器打开
                               initSocketWrapper("SelfCheckService");
                           }
                       },
                    new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Log.e(LOG_TAG, "Error while executing self check!" 
                                         + throwable);
                        }
                    });
}

这里我没使用lambda表达式,为了让方法清楚,所以代码比较长,但是结构比较清晰;

首先使用 interval 创建一个延时的周期被观察者,然后根据当前设置是否需要自动连接来过滤是否进行下面的操作,接下来判断当前的连接是否存在和连接,接下来判断是否存活,如果断连则调用初始化的方法,在上面解释了什么情况下会调用自检服务。

总结

整篇文章写了OkHttp的WebSocket使用,断连重试机制,Service使用等,主要的一些细节在文章中可能没体现出来,如果有需要则下载源码自己修改运行。 最后贴上Github地址,喜欢的话给个start! 🤗

项目的github地址

相关文章

网友评论

      本文标题:基于Rxjava2与OkHttp中WebSocket长连接封装(

      本文链接:https://www.haomeiwen.com/subject/drhehqtx.html