美文网首页
为okhttp的WebSocket添加心跳回调

为okhttp的WebSocket添加心跳回调

作者: jh352160 | 来源:发表于2021-09-03 14:41 被阅读0次

    最近在项目的开发中,碰到了这样一个需求:需要在长连接的心跳发送时执行一些业务上的逻辑。那么,问题就在于如何在现有的长连接的基础上,以尽可能小的改动,实现这个需求。故事也就由此开始了。

    确定okhttp是否有提供相应的API

    首先肯定是要确定okhttp中是否有类似的API可以使用,或者是否可以通过更新版本来解决这个问题。刚好,我找到了GitHub中有人提出了类似的问题,可以来看看官方的说法:

    WebSocket ping logic is not customizable · Issue #3197 · square/okhttp

    可以看到,开发者明确表示了并不希望让应用层自定义ping方法的逻辑,那么看来只能另想办法了。

    okhttp中的心跳的使用方法与实现原理

    首先,我来简单梳理一下okhttp中心跳的实现原理,如果只是想要解决方法的朋友可以直接跳过这一部分。

    在okhttp中,实现心跳的方式非常简单,只需要在OkHttpClient创建时添加相应的配置即可:

      OkHttpClient.Builder()
          .pingInterval(HEART_BEAT_RATE, TimeUnit.SECONDS)
          .build()
    

    那么具体的心跳逻辑是如何实现的呢,一起来看看具体的代码细节。

        //OkHttpClient.java
      @Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
        RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval);
        webSocket.connect(this);
        return webSocket;
      }
    
        //RealWebSocket.java
      public RealWebSocket(Request request, WebSocketListener listener, Random random,long pingIntervalMillis) {
        //...
        this.pingIntervalMillis = pingIntervalMillis;
            //...
      }
    
        public void initReaderAndWriter(String name, Streams streams) throws IOException {
        synchronized (this) {
          //...
          this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
          if (pingIntervalMillis != 0) {
            executor.scheduleAtFixedRate(
                new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
          }
                //...
        }
      }
    
        private final class PingRunnable implements Runnable {
        @Override public void run() {
          writePingFrame();
        }
      }
    
        void writePingFrame() {
            //...
        try {
          writer.writePing(ByteString.EMPTY);
        } catch (IOException e) {
          failWebSocket(e, null);
        }
            //...
      }
    
        //WebSocketWriter.java
        void writePing(ByteString payload) throws IOException {
        writeControlFrame(OPCODE_CONTROL_PING, payload);
      }
    

    上面的代码就是ping的主要发送逻辑了,简单总结一下就是如果pingInterval不为0,那就开启一个的循环任务,定时的去发送代表ping的ControlFrame。

    其中值得一提的就是ControlFrame这个概念,在WebSocket中的frame分为两类,一类叫做MessageFrame,也就是平时客户端与服务端互相通信的部分。另一类叫做ControlFrame,其中包括CONTROL_PING,CONTROL_PONG,CONTROL_CLOSE,可以看出这一类更偏重与功能性的方面。具体为哪一类的Frame可以在Header中进行区分。

    上面已经介绍了心跳的发送逻辑,那么下面就轮到接收的逻辑了,还是先来看看代码:

        //RealWebSocket.java
        public void loopReader() throws IOException {
        while (receivedCloseCode == -1) {
          // This method call results in one or more onRead* methods being called on this thread.
          reader.processNextFrame();
        }
      }
    
        //WebSocketReader.java
        void processNextFrame() throws IOException {
        readHeader();
        if (isControlFrame) {
          readControlFrame();
        } else {
          readMessageFrame();
        }
      }
    
        private void readControlFrame() throws IOException {
            //...
        switch (opcode) {
          case OPCODE_CONTROL_PING:
            frameCallback.onReadPing(controlFrameBuffer.readByteString());
            break;
          case OPCODE_CONTROL_PONG:
            frameCallback.onReadPong(controlFrameBuffer.readByteString());
            break;
          case OPCODE_CONTROL_CLOSE:
            //...
          default:
            throw new ProtocolException("Unknown control opcode: " + toHexString(opcode));
        }
      }
    

    可以看到,接收的部分逻辑也很简单,就是通过一个循环去读取,如果接收到了消息,那就先通过header确定frame的类型,然后再分类进行处理。

    而且值得注意的是,上面代码中出现了一个frameCallback的对象,而这个对象是WebSocketReader.FrameCallback这个接口的实现,而里面的onReadPing和onReadPong就是我们之后能够做文章的地方了。

        WebSocketReader.FrameCallback
        public interface FrameCallback {
        void onReadMessage(String text) throws IOException;
        void onReadMessage(ByteString bytes) throws IOException;
        void onReadPing(ByteString buffer);
        void onReadPong(ByteString buffer);
        void onReadClose(int code, String reason);
      }
    

    添加回调的具体实现

    在上面的源码分析中,我们注意到了WebSocketReader.FrameCallback这个接口,如果我们能够自己实现这个接口,并且注入到websocket的reader中,那么这个需求不就实现了吗。

    那么我们再来看看reader中的frameCallback按照原来的逻辑应该是个什么东西:

        //RealWebSocket.java
        reader = new WebSocketReader(streams.client, streams.source, this);
    
        //WebSocketReader.java
        WebSocketReader(boolean isClient, BufferedSource source, FrameCallback frameCallback) {
        //...
        this.frameCallback = frameCallback;
            //...
      }
    

    原来frameCallback就是RealWebSocket,而我们所持有的webSocket正是RealWebSocket的对象,那么只需要做一个静态代理,然后通过反射将reader替换为我们自己的实现就可以了:

            private fun replaceReaderCallBack() {
            val wsClass = webSocket!!.javaClass
            val callbackClass = wsClass.interfaces.find { it.name.contains("FrameCallback") } ?: return
    
            val readerField = wsClass.getDeclaredField("reader")
            readerField.isAccessible = true
            val reader = readerField.get(webSocket)
    
            val callbackInstance = Proxy.newProxyInstance(reader.javaClass.classLoader, arrayOf(callbackClass)) { proxy, method, args ->
                when (method?.name) {
                    "onReadMessage" -> {
                        if (args!![0] is String) {
                            webSocket?.onReadMessage(args[0] as String)
                        } else {
                            webSocket?.onReadMessage(args[0] as ByteString)
                        }
                    }
                    "onReadPing" -> { webSocket?.onReadPing(args!![0] as ByteString) }
                    "onReadPong" -> { webSocket?.onReadPong(args!![0] as ByteString) }
                    "onReadClose" -> { webSocket?.onReadClose(args!![0] as Int, args[1] as String) }
                }
                0
            }
    
            reader.javaClass.getDeclaredField("frameCallback").apply {
                isAccessible = true
                set(reader, callbackInstance)
            }
        }
    

    至此,回调已经添加完成,只需要在对应的回调中补上自己的业务逻辑,然后在websocket创建完成之后调用一下这个方法就完成了。

    相关文章

      网友评论

          本文标题:为okhttp的WebSocket添加心跳回调

          本文链接:https://www.haomeiwen.com/subject/rarkwltx.html