美文网首页
利用 RxJava2 和 OkHttp 搭建 WebSocket

利用 RxJava2 和 OkHttp 搭建 WebSocket

作者: 明月几时有__ | 来源:发表于2019-01-06 13:10 被阅读25次

背景

对于金融交易软件,行情信息的实时更新是一个重要需求。后端团队已经提供了 WebSocket api,现在需求在安卓客户端使用 WebSocket 来接收后端实时推送的行情更新。

平台

Android

语言环境

Kolin

目标

  • 在异步线程里建立 WebSocket 连接
  • WebSocket 所有事件回调到主线程处理
  • 支持消息背压(行情可能在短时间内有剧烈波动,消息量会激增)

实现

建立连接

WebSocket 采用了 okHttp 的内置实现 - RealWebSocket,其中 WebSocketListener 提供了相应事件回调:

public abstract class WebSocketListener {
  /**
   * Invoked when a web socket has been accepted by the remote peer and may begin transmitting
   * messages.
   */
  public void onOpen(WebSocket webSocket, Response response) {
  }

  /** Invoked when a text (type {@code 0x1}) message has been received. */
  public void onMessage(WebSocket webSocket, String text) {
  }

  /** Invoked when a binary (type {@code 0x2}) message has been received. */
  public void onMessage(WebSocket webSocket, ByteString bytes) {
  }

  /**
   * Invoked when the remote peer has indicated that no more incoming messages will be
   * transmitted.
   */
  public void onClosing(WebSocket webSocket, int code, String reason) {
  }

  /**
   * Invoked when both peers have indicated that no more messages will be transmitted and the
   * connection has been successfully released. No further calls to this listener will be made.
   */
  public void onClosed(WebSocket webSocket, int code, String reason) {
  }

  /**
   * Invoked when a web socket has been closed due to an error reading from or writing to the
   * network. Both outgoing and incoming messages may have been lost. No further calls to this
   * listener will be made.
   */
  public void onFailure(WebSocket webSocket, Throwable t, @Nullable Response response) {
  }
}

这里有两个地方需要注意,onClosedonFailure,当两个对等方都指示不再传输消息并且连接已成功释放时,onClosed 会被调用,之后不会再有任何回调,而由于从网络读取或写入错误而关闭 WebSocket 时,onFailure 会被调用,同样之后不会再有任何回调,注意此时 WebSocket 会被关闭而且 onClosed 不会再被调用,这是 RealWebSocket 的实现:

public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {
  ...
  public void failWebSocket(Exception e, @Nullable Response response) {
    Streams streamsToClose;
    synchronized (this) {
      if (failed) return; // Already failed.
      failed = true;
      streamsToClose = this.streams;
      this.streams = null;
      if (cancelFuture != null) cancelFuture.cancel(false);
      if (executor != null) executor.shutdown(); //停止 executor
    }

    try {
      listener.onFailure(this, e, response);
    } finally {
      //关闭流
      closeQuietly(streamsToClose);
    }
  }
  ...
}

另外双方还需要一个心跳机制来检测连接状态,RealWebSocket 内心跳是一个空字节,即 0 byte,心跳逻辑与后端 api 一致,因此心跳不做修改,如果实际业务中心跳机制不同于此,就需要做修改了,这里暂且跳过。

下面我们来建立一个 WebSocket 连接(部分代码省略):

val okHttpClient = OkHttpClient.Builder()..........build()
val request = Request.Builder().url("这里输入连接地址").build()
val listener = object: WebSocketListener () {
        ...
}
val pingIntervalMillis = 1000 * 60 * 5 //心跳间隔 5 分钟
val realWss = RealWebSocket(request, listener, SecureRandom(), pingIntervalMillis)
realWss.connect(client) //建立连接

这样我们就能建立起一个 WebSocket 连接了,所有消息都会回调到我们定义的 listener 里。

异步回调

大家都知道 Android 网络操作一定不能放在主线程中,这一点就不做赘述了。这里我们使用了 RxJava2 的 Flowable 操作符以及 Scheduler 来进行线程调度,我们可以手动调用 onNext 来向订阅者发送 WebSocket 消息事件,同时 Flowable 也支持背压:

public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}

以上是 RxJava2 提供的几种背压策略,根据业务这里选择了 BUFFER 缓存所有收到的消息直到被消费。同时 BUFFER 也是默认的背压策略:

public final class FlowableCreate<T> extends Flowable<T> {
    ...
    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;

        switch (backpressure) {
        ...
          default: {
            //默认 BufferAsyncEmitter
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
          }
        }
        ...
    }
    ...
}

这里我们注意到,BufferAsyncEmitter 实例化的过程中传入了 bufferSize() ,通过查看 Flowable 我们看到默认的 buffer size 是 128:

public abstract class Flowable<T> implements Publisher<T> {
        /** The default buffer size. */
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
    }
    ...
}

这里先使用默认容量,不做修改。

接下来将 WebSocket 所有事件密封在一个类里,用作 Flowable 的消息类型:

sealed class WebSocketEvent {

    class Open(webSocket: WebSocket?, val response: Response?) : WebSocketEvent()

    class BinaryMessage(webSocket: WebSocket?, val bytes: ByteString?) : WebSocketEvent()

    class StringMessage(webSocket: WebSocket?, val text: String?) : WebSocketEvent()

    class Closing(webSocket: WebSocket?, val code: Int, val reason: String?) : WebSocketEvent()

    class Closed(webSocket: WebSocket?, val code: Int, val reason: String?) : WebSocketEvent()


}

WebSocketListener 中向订阅者发送对应的事件:

class FlowableWebSocketListener(private val emitter: FlowableEmitter<WebSocketEvent>) : WebSocketListener() {

    override fun onOpen(webSocket: WebSocket?, response: Response?) {
        emitter.onNext(WebSocketEvent.Open(webSocket, response))
    }

    override fun onMessage(webSocket: WebSocket?, text: String?) {
        emitter.onNext(WebSocketEvent.StringMessage(webSocket, text))
    }

    override fun onMessage(webSocket: WebSocket?, bytes: ByteString?) {
        emitter.onNext(WebSocketEvent.BinaryMessage(webSocket, bytes))
    }

    override fun onClosing(webSocket: WebSocket?, code: Int, reason: String?) {
        emitter.onNext(WebSocketEvent.Closing(webSocket, code, reason))
    }

    override fun onClosed(webSocket: WebSocket?, code: Int, reason: String?) {
        emitter.onComplete()
    }

    override fun onFailure(webSocket: WebSocket?, t: Throwable?, response: Response?) {
        if (!emitter.isCancelled) {
            emitter.onError(t ?: IOException("WebSocket unknown error"))
            emitter.onComplete()
        }
    }
}

接下来我们创建一个 Flowable,在异步线程建立连接,消息发送到主线程做处理:

val flowable: Flowable<WebSocketEvent> = Flowable.create({ emitter ->
      ...
      val listener = RxWebSocketListener(emitter)
      val realWss = RealWebSocket(request, listener, SecureRandom(), pingIntervalMillis)
      realWss.connect(client) //建立连接
}, BackpressureStrategy.BUFFER)

flowable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object: ResourceSubscriber<WebSocketEvent> {
      ...
      override fun onStart() {
         //ResourceSubscriber 会一次拉取 Long.MAX_VALUE 个消息,这里我们覆写先拉取一条
         request(1)
      }

      override fun onNext(t: WebSocketEvent?) {
          when(t){
              WebSocketEvent.Open -> {...}
              WebSocketEvent.BinaryMessage -> {...}
              WebSocketEvent.StringMessage -> {...}
              WebSocketEvent.Closing -> {...}
          }

          if (判断是否需要继续拉取消息) {
              request(1)
          }
      }

      override fun onComplete() {
           onClosed()
           dispose()
      }
      ...
})

这里要注意 Schedulers.io() 创建工作线程的数量是没有上限的,因此在 WebSocket 关闭之后应该立即 dispose() 来释放,否则可能引发 OutOfMemory。

...
static final class CachedWorkerPool implements Runnable {
    ThreadWorker get() {
          if (allWorkers.isDisposed()) {
              return SHUTDOWN_THREAD_WORKER;
          }
          while (!expiringWorkerQueue.isEmpty()) {
              ThreadWorker threadWorker = expiringWorkerQueue.poll();
              if (threadWorker != null) {
                  return threadWorker;
              }
          }

          // No cached worker found, so create a new one.
          // 缓存中没有可复用的 ThreadWorker 时,创建一个新的 ThreadWorker
          ThreadWorker w = new ThreadWorker(threadFactory);
          allWorkers.add(w);
          return w;
    }
    ...
}
...

到此一个异步 WebSocket 连接就搭建完成了。

(转载请注明出处)

相关文章

网友评论

      本文标题:利用 RxJava2 和 OkHttp 搭建 WebSocket

      本文链接:https://www.haomeiwen.com/subject/cqrjrqtx.html