美文网首页Android进阶之路Android开发Android开发经验谈
雨露均沾的OkHttp——WebSocket长连接(源码篇)

雨露均沾的OkHttp——WebSocket长连接(源码篇)

作者: 积木zz | 来源:发表于2020-07-13 11:16 被阅读0次

    前言

    雨露均沾的OkHttp—WebSocket长连接(使用篇)
    雨露均沾的OkHttp—WebSocket长连接(源码篇)

    上期我们熟悉了OkHttp中实现WebSocket长连接的接入,并且可以通过OkHttp官方的MockWebSocket服务来模拟服务端,实现整个流程。

    今天我们就来说下具体OkHttp中是怎么实现这些功能的呢?相信看过这篇文章你也能深刻了解WebSocket这个协议。

    使用回顾

    简单贴下WebSocket使用方法,方便下面解析:

           //初始化
            mClient = new OkHttpClient.Builder()
                    .pingInterval(10, TimeUnit.SECONDS)
                    .build();
            Request request = new Request.Builder()
                    .url(mWbSocketUrl)
                    .build();
            mWebSocket = mClient.newWebSocket(request, new WsListener());
            
            //收到消息回调
            @Override
            public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
                super.onMessage(webSocket, text);
                Log.e(TAG,"收到消息!");
                onWSDataChanged(DATE_NORMAL, text);
            }        
            
            //发送消息
            mWebSocket.send(message);
            
            //主动关闭连接
            mWebSocket.close(code, reason);
    

    源码解析

    WebSocket整个流程无非三个功能:连接,接收消息,发送消息。下面我们就从这三个方面分析下具体是怎么实现的。

    连接

    通过上面的代码我们得知,WebSocket连接是通过newWebSocket方法。直接点进去看这个方法:

      override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket {
        val webSocket = RealWebSocket(
            taskRunner = TaskRunner.INSTANCE,
            originalRequest = request,
            listener = listener,
            random = Random(),
            pingIntervalMillis = pingIntervalMillis.toLong(),
            extensions = null, // Always null for clients.
            minimumDeflateSize = minWebSocketMessageToCompress
        )
        webSocket.connect(this)
        return webSocket
      }
    

    这里做了两件事:

    • 初始化RealWebSocket,主要是设置了一些参数(比如pingIntervalMillis心跳包时间间隔,还有监听事件之类的)
    • connect方法进行WebSocket连接

    继续查看connect方法:

    connect(WebSocket连接握手)
      fun connect(client: OkHttpClient) {
        //***
        val webSocketClient = client.newBuilder()
            .eventListener(EventListener.NONE)
            .protocols(ONLY_HTTP1)
            .build()
        val request = originalRequest.newBuilder()
            .header("Upgrade", "websocket")
            .header("Connection", "Upgrade")
            .header("Sec-WebSocket-Key", key)
            .header("Sec-WebSocket-Version", "13")
            .header("Sec-WebSocket-Extensions", "permessage-deflate")
            .build()
        call = RealCall(webSocketClient, request, forWebSocket = true)
        call!!.enqueue(object : Callback {
          override fun onResponse(call: Call, response: Response) {
            
            //得到数据流
            val streams: Streams
            try {
              checkUpgradeSuccess(response, exchange)
              streams = exchange!!.newWebSocketStreams()
            } 
            
            //***
            // Process all web socket messages.
            try {
              val name = "$okHttpName WebSocket ${request.url.redact()}"
              initReaderAndWriter(name, streams)
              listener.onOpen(this@RealWebSocket, response)
              loopReader()
            } catch (e: Exception) {
              failWebSocket(e, null)
            }
          }
        })
      }
    

    上一篇使用篇文章中说过,Websocket连接需要一次Http协议的握手,然后才能把协议升级成WebSocket。所以这段代码就体现出这个功能了。

    首先就new了一个用来进行Http连接的request,其中Header的参数就表示我要进行WebSocket连接了,参数解析如下:

    • Connection:Upgrade,表示客户端要连接升级
    • Upgrade:websocket, 表示客户端要升级建立Websocket连接
    • Sec-Websocket-Key:key, 这个key是随机生成的,服务器会通过这个参数验证该请求是否有效
    • Sec-WebSocket-Version:13, websocket使用的版本,一般就是13
    • Sec-webSocket-Extension:permessage-deflate,客户端指定的一些扩展协议,比如这里permessage-deflate就是WebSocket的一种压缩协议。

    Header设置好之后,就调用了callenqueue方法,这个方法大家应该都很熟悉吧,OkHttp里面对于Http请求的异步请求就是这个方法。
    至此,握手结束,服务器返回响应码101,表示协议升级。

    然后我们继续看看获取服务器响应之后又做了什么?
    在发送Http请求成功之后,onResponse响应方法里面主要表现为四个处理逻辑:

    • Http流转换成WebSocket流,得到Streams对象,这个流后面会转化成输入流和输出流,也就是进行发送和读取的操作流
    • listener.onOpen(this@RealWebSocket, response),回调了接口WebSocketListeneronOpen方法,告诉用户WebSocket已经连接
    • initReaderAndWriter(name, streams)
    • loopReader()

    前两个逻辑还是比较好理解,主要是后两个方法,我们分别解析下。
    首先看initReaderAndWriter方法。

    initReaderAndWriter(初始化输入流输出流)
      //RealWebSocket.kt
    
      @Throws(IOException::class)
      fun initReaderAndWriter(name: String, streams: Streams) {
        val extensions = this.extensions!!
        synchronized(this) {
          //***
          
          //写数据,发送数据的工具类
          this.writer = WebSocketWriter()
          
          //设置心跳包事件
          if (pingIntervalMillis != 0L) {
            val pingIntervalNanos = MILLISECONDS.toNanos(pingIntervalMillis)
            taskQueue.schedule("$name ping", pingIntervalNanos) {
              writePingFrame()
              return@schedule pingIntervalNanos
            }
          }
          //***
        }
    
            //***
            
            //读取数据的工具类
        reader = WebSocketReader(     
          ***
          frameCallback = this,
          ***
        )
      }
      
      internal fun writePingFrame() {
       //***
        try {
          writer.writePing(ByteString.EMPTY)
        } catch (e: IOException) {
          failWebSocket(e, null)
        }
      }  
      
    

    这个方法主要干了两件事:

    • 实例化输出流输入流工具类,也就是WebSocketWriterWebSocketReader,用来处理数据的收发。
    • 设置心跳包事件。如果pingIntervalMillis参数不为0,就通过计时器,每隔pingIntervalNanos发送一个ping消息。其中writePingFrame方法就是发送了ping帧数据。

    接收消息处理消息

    loopReader

    接着看看这个loopReader方法是干什么的,看这个名字我们大胆猜测下,难道这个方法就是用来循环读取数据的?去代码里找找答案:

      fun loopReader() {
        while (receivedCloseCode == -1) {
          // This method call results in one or more onRead* methods being called on this thread.
          reader!!.processNextFrame()
        }
      }
    

    代码很简单,一个while循环,循环条件是receivedCloseCode == -1的时候,做的事情是reader!!.processNextFrame()方法。继续:

      //WebSocketWriter.kt
      fun processNextFrame() {
        //读取头部信息
        readHeader()
        if (isControlFrame) {
          //如果是控制帧,读取控制帧内容
          readControlFrame()
        } else {
          //读取普通消息内容
          readMessageFrame()
        }
      }
      
      //读取头部信息
      @Throws(IOException::class, ProtocolException::class)
      private fun readHeader() {
        if (closed) throw IOException("closed")
        
        try {
         //读取数据,获取数据帧的前8位
          b0 = source.readByte() and 0xff
        } finally {
          source.timeout().timeout(timeoutBefore, TimeUnit.NANOSECONDS)
        }    
        //***
        //获取数据帧的opcode(数据格式)
        opcode = b0 and B0_MASK_OPCODE
        //是否为最终帧
        isFinalFrame = b0 and B0_FLAG_FIN != 0
        //是否为控制帧(指令)
        isControlFrame = b0 and OPCODE_FLAG_CONTROL != 0
    
        //判断最终帧,获取帧长度等等
      }  
      
      
      //读取控制帧(指令)
        @Throws(IOException::class)
      private fun readControlFrame() {
        if (frameLength > 0L) {
          source.readFully(controlFrameBuffer, frameLength)
        }
    
        when (opcode) {
          OPCODE_CONTROL_PING -> {
          //ping 帧
            frameCallback.onReadPing(controlFrameBuffer.readByteString())
          }
          OPCODE_CONTROL_PONG -> {
            //pong 帧
            frameCallback.onReadPong(controlFrameBuffer.readByteString())
          }
          OPCODE_CONTROL_CLOSE -> {
            //关闭 帧
            var code = CLOSE_NO_STATUS_CODE
            var reason = ""
            val bufferSize = controlFrameBuffer.size
            if (bufferSize == 1L) {
              throw ProtocolException("Malformed close payload length of 1.")
            } else if (bufferSize != 0L) {
              code = controlFrameBuffer.readShort().toInt()
              reason = controlFrameBuffer.readUtf8()
              val codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code)
              if (codeExceptionMessage != null) throw ProtocolException(codeExceptionMessage)
            }
            //回调onReadClose方法
            frameCallback.onReadClose(code, reason)
            closed = true
          }
        }
      }
      
      //读取普通消息
      @Throws(IOException::class)
      private fun readMessageFrame() {
        
        readMessage()
    
        if (readingCompressedMessage) {
          val messageInflater = this.messageInflater
              ?: MessageInflater(noContextTakeover).also { this.messageInflater = it }
          messageInflater.inflate(messageFrameBuffer)
        }
    
        if (opcode == OPCODE_TEXT) {
          frameCallback.onReadMessage(messageFrameBuffer.readUtf8())
        } else {
          frameCallback.onReadMessage(messageFrameBuffer.readByteString())
        }
      }  
      
    

    代码还是比较直观,这个processNextFrame其实就是读取数据用的,首先读取头部信息,获取数据帧的类型,判断是否为控制帧,再分别去读取控制帧数据或者普通消息帧数据。

    数据帧格式

    问题来了,什么是数据头部信息,什么是控制帧
    这里就要说下WebSocket的数据帧了,先附上一个数据帧格式:

    
       0 1 2 3 4 5 6 7    0 1 2 3 4 5 6 7  0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
      +-+-+-+-+-------+  +-+-------------+ +-----------------------------+
      |F|R|R|R| OP    |  |M| LENGTH      |   Extended payload length
      |I|S|S|S| CODE  |  |A|             |  (if LENGTH=126)
      |N|V|V|V|       |  |S|             |
      | |1|2|3|       |  |K|             |
      +-+-+-+-+-------+  +-+-------------+
      |                      Extended payload length(if LENGTH=127)
      +                                  +-------------------------------
      |      Extended payload length     | Masking-key,if Mask set to 1
      +----------------------------------+-------------------------------
      |   Masking-key                    |       Data
      +----------------------------------+-------------------------------
      |                                Data
      +----------------------------------+-------------------------------
    
    
    

    我承认,我懵逼了。
    冷静冷静,一步一步分析下吧。

    首先每一行代表4个字节,一共也就是32位数,哦,那也就是几个字节而已嘛,每个字节有他自己的代表意义呗,这样想是不是就很简单了,下面来具体看看每个字节。

    第1个字节:

    • 第一位是FIN码,其实就是一个标示位,因为数据可能多帧操作嘛,所以多帧情况下,只有最后一帧的FIN设置成1,标示结束帧,前面所有帧设置为0。
    • 第二位到第四位是RSV码,一般通信两端没有设置自定义协议,就默认为0。
    • 后四位是opcode,我们叫它操作码。这个就是判断这个数据帧的类型了,一般有以下几个被定义好的类型:

    1) 0x0 表示附加数据帧
    2) 0x1 表示文本数据帧
    3) 0x2 表示二进制数据帧
    4) 0x3-7 保留用于未来的非控制帧
    5) 0x8 表示连接关闭
    6) 0x9 表示ping
    7) 0xA 表示pong
    8) 0xB-F 保留用于未来的非控制帧

    是不是发现了些什么,这不就对应了我们应用中的几种格式吗?2和3对应的是普通消息帧,包括了文本和二进制数据。567对应的就是控制帧格式,包括了close,ping,pong

    第2个字节:

    • 第一位是Mask掩码,其实就是标识数据是否加密混淆,1代表数据经过掩码的,0是没有经过掩码的,如果是1的话,后续就会有4个字节代表掩码key,也就是数据帧中Masking-key所处的位置。
    • 后7位是LENGTH,用来标示数据长度。因为只有7位,所以最大只能储存1111111对应的十进制数127长度的数据,如果需要更大的数据,这个储存长度肯定就不够了。
      所以规定来了,1) 小于126长度则数据用这七位表示实际长度。2) 如果长度设置为126,也就是二进制1111110,就代表取额外2个字节表示数据长度,共是16位表示数据长度。3) 如果长度设置为127,也就是二进制1111111,就代表取额外8个字节,共是64位表示数据长度。

    需要注意的是LENGHT的三种情况在一个数据帧里面只会出现一种情况,不共存,所以在图中是用if表示。同样的,Masking-key也是当Mask为1的时候才存在。

    所以也就有了数据帧里面的Extended payload length(LENGTH=126)所处的2个字节,以及Extended payload length(LENGTH=127)所处的8个字节。

    最后的字节部分自然就是掩码key(Mask为1的时候才存在)和具体的传输数据了。
    还是有点晕吧😷,来张图总结下:

    数据帧格式.jpeg

    好了,了解了数据帧格式后,我们再来读源码就清晰多了。
    先看看怎么读的头部信息并解析的:

      //取数据帧前8位数据
      b0 = source.readByte() and 0xff
      //获取数据帧的opcode(数据格式)
      opcode = b0 and B0_MASK_OPCODE(15)
      //是否为最终帧
      isFinalFrame = b0 and B0_FLAG_FIN(128) != 0
      //是否为控制帧(指令)
      isControlFrame = b0 and OPCODE_FLAG_CONTROL(8) != 0  
    
    • 第一句获取头信息,and是按位与计算,and 0xff意思就是按位与11111111,所以头部信息其实就是取了数据帧的前8位数据,一个字节。
    • 第二句获取opcodeand 15也就是按位与00001111,其实也就是取了后四位数据,刚好对应上opcode的位置,第一个字节的后四位。
    • 第三句获取是否为最终帧,刚才数据帧格式中说过,第一位FIN标识了是否为最后一帧数据,1代表结束帧,所以这里and 128也就是按位与10000000,也就是取的第一位数。
    • 第四句获取是否为控制帧,and 8也就是按位与00001000,取得是第五位,也就是opcode的第一位,这是什么意思呢?我们看看刚才的数据帧格式,发现从0x8开始就是所谓的控制帧了。0x8对应的二进制是1000,0x7对应的二进制是0111。发现了吧,如果为控制帧的时候,opcode第一位肯定是为1的,所以这里就判断的第五位。

    后面还有读取第二个字节的代码,大家可以自己沿着这个思路自己看看,包括了读取MASK,读取数据长度的三种长度等。

    所以这个processNextFrame方法主要做了三件事:

    • readHeader方法中,判断了是否为控制帧,是否为结束帧,然后获取了Mask标识,帧长度等参数
    • readControlFrame方法中,主要处理了该帧数据为ping,pong,close三种情况,并且在收到close关闭帧的情况下,回调了onReadClose方法,这个待会要细看下。
    • readMessageFrame方法中,主要是读取了消息后,回调了onReadMessage方法。

    至此可以发现,其实WebSocket传输数据并不是一个简单的事,只是OkHttp都帮我们封装好了,我们只需要直接传输数据即可,感谢这些三方库为我们开发作出的贡献,不知道什么时候我也能做出点贡献呢🤔。

    对了,刚才说回调也很重要,接着看看。onReadCloseonReadMessage回调到哪了呢?还记得上文初始化WebSocketWriter的时候设置了回调接口吗。所以就是回调给RealWebSocket了:

      //RealWebSocket.kt
      override fun onReadClose(code: Int, reason: String) {
        require(code != -1)
    
        var toClose: Streams? = null
        var readerToClose: WebSocketReader? = null
        var writerToClose: WebSocketWriter? = null
        synchronized(this) {
          check(receivedCloseCode == -1) { "already closed" }
          receivedCloseCode = code
          receivedCloseReason = reason 
          //...
        }
    
        try {
          listener.onClosing(this, code, reason)
    
          if (toClose != null) {
            listener.onClosed(this, code, reason)
          }
        } finally {
          toClose?.closeQuietly()
          readerToClose?.closeQuietly()
          writerToClose?.closeQuietly()
        }
      }
      
      @Throws(IOException::class)
      override fun onReadMessage(text: String) {
        listener.onMessage(this, text)
      }
    
      @Throws(IOException::class)
      override fun onReadMessage(bytes: ByteString) {
        listener.onMessage(this, bytes)
      }  
    
    

    onReadClose回调方法里面有个关键的参数,receivedCloseCode。还记得这个参数吗?上文中解析消息的循环条件就是receivedCloseCode == -1,所以当收到关闭帧的时候,receivedCloseCode就不再等于-1(规定大于1000),也就不再去读取解析消息了。这样整个流程就结束了。

    其中还有一些WebSocketListener的回调,比如onClosing,onClosed,onMessage等,就直接回调给用户使用了。至此,接收消息处理消息说完了。

    发消息

    好了。接着说发送,看看send方法:

      @Synchronized private fun send(data: ByteString, formatOpcode: Int): Boolean {
        // ***
        // Enqueue the message frame.
        queueSize += data.size.toLong()
        messageAndCloseQueue.add(Message(formatOpcode, data))
        runWriter()
        return true
      }
    

    首先,把要发送的data封装成Message对象,然后入队列messageAndCloseQueue。最后执行runWriter方法。这都不用猜了,runWriter肯定就要开始发送消息了,继续看:

      //RealWebSocket.kt
      private fun runWriter() {
        this.assertThreadHoldsLock()
    
        val writerTask = writerTask
        if (writerTask != null) {
          taskQueue.schedule(writerTask)
        }
      }
      
      private inner class WriterTask : Task("$name writer") {
        override fun runOnce(): Long {
          try {
            if (writeOneFrame()) return 0L
          } catch (e: IOException) {
            failWebSocket(e, null)
          }
          return -1L
        }
      }  
      
      //以下是schedule方法转到WriterTask的runOnce方法过程
    
      //TaskQueue.kt
      fun schedule(task: Task, delayNanos: Long = 0L) {
        synchronized(taskRunner) {
          if (scheduleAndDecide(task, delayNanos, recurrence = false)) {
            taskRunner.kickCoordinator(this)
          }
        }
      }
      
      internal fun scheduleAndDecide(task: Task, delayNanos: Long, recurrence: Boolean): Boolean {
        //***
        if (insertAt == -1) insertAt = futureTasks.size
        futureTasks.add(insertAt, task)
    
        // Impact the coordinator if we inserted at the front.
        return insertAt == 0
      }  
    
      //TaskRunner.kt
      internal fun kickCoordinator(taskQueue: TaskQueue) {
        this.assertThreadHoldsLock()
        
        if (taskQueue.activeTask == null) {
          if (taskQueue.futureTasks.isNotEmpty()) {
            readyQueues.addIfAbsent(taskQueue)
          } else {
            readyQueues.remove(taskQueue)
          }
        }    
        
        if (coordinatorWaiting) {
          backend.coordinatorNotify(this@TaskRunner)
        } else {
          backend.execute(runnable)
        }
      }  
      
      private val runnable: Runnable = object : Runnable {
        override fun run() {
          while (true) {
            val task = synchronized(this@TaskRunner) {
              awaitTaskToRun()
            } ?: return
    
            logElapsed(task, task.queue!!) {
              var completedNormally = false
              try {
                runTask(task)
                completedNormally = true
              } finally {
                // If the task is crashing start another thread to service the queues.
                if (!completedNormally) {
                  backend.execute(this)
                }
              }
            }
          }
        }
      }
      
      private fun runTask(task: Task) {
        try {
          delayNanos = task.runOnce()
        } 
      }  
      
    

    代码有点长,这里是从runWriter开始跟的几个方法,拿到writerTask实例后,存到TaskQueuefutureTasks列表里,然后到runnable这里可以看到是一个while死循环,不断的从futureTasks中取出Task并执行runTask方法,直到Task为空,循环停止。

    其中涉及到两个新的类:

    • TaskQueue类主要就是管理消息任务列表,保证按顺序执行
    • TaskRunner类主要就是做一些任务的具体操作,比如线程池里执行任务,记录消息任务的状态(准备发送的任务队列readyQueues,正在执行的任务队列busyQueues等等)

    而每一个Task最后都是执行到了WriterTaskrunOnce方法,也就是writeOneFrame方法:

      internal fun writeOneFrame(): Boolean {
        synchronized(this@RealWebSocket) {
          if (failed) {
            return false // Failed web socket.
          }
          writer = this.writer
          pong = pongQueue.poll()
          if (pong == null) {
            messageOrClose = messageAndCloseQueue.poll()
            if (messageOrClose is Close) {
            } else if (messageOrClose == null) {
                return false // The queue is exhausted.
            }
          }
        }
    
       //发送消息逻辑,包括`pong`消息,普通消息,关闭消息
        try {
          if (pong != null) {
            writer!!.writePong(pong)
          } else if (messageOrClose is Message) {
            val message = messageOrClose as Message
            writer!!.writeMessageFrame(message.formatOpcode, message.data)
            synchronized(this) {
              queueSize -= message.data.size.toLong()
            }
          } else if (messageOrClose is Close) {
            val close = messageOrClose as Close
            writer!!.writeClose(close.code, close.reason)
            // We closed the writer: now both reader and writer are closed.
            if (streamsToClose != null) {
              listener.onClosed(this, receivedCloseCode, receivedCloseReason!!)
            }
          } 
          return true
        } finally {
          streamsToClose?.closeQuietly()
          readerToClose?.closeQuietly()
          writerToClose?.closeQuietly()
        }
      }
    
    

    这里就会执行发送消息的逻辑了,主要有三种消息情况处理:

    • pong消息,这个主要是为服务器端准备的,发送给客户端回应心跳包。
    • 普通消息,就会把数据类型Opcode和具体数据发送过去
    • 关闭消息,其实当用户执行close方法关闭WebSocket的时候,也是发送了一条Close控制帧消息给服务器告知这个关闭需求,并带上code状态码reason关闭原因,然后服务器端就会关闭当前连接。

    好了。最后一步了,就是把这些数据组装成WebSocket数据帧并写入流,分成控制帧数据和普通消息数据帧

    
      //写入(发送)控制帧
      private fun writeControlFrame(opcode: Int, payload: ByteString) {
        if (writerClosed) throw IOException("closed")
        
        val length = payload.size
        require(length <= PAYLOAD_BYTE_MAX) {
          "Payload size must be less than or equal to $PAYLOAD_BYTE_MAX"
        }
        val b0 = B0_FLAG_FIN or opcode
        sinkBuffer.writeByte(b0)
    
        var b1 = length
        if (isClient) {
          b1 = b1 or B1_FLAG_MASK
          sinkBuffer.writeByte(b1)
          random.nextBytes(maskKey!!)
          sinkBuffer.write(maskKey)
    
          if (length > 0) {
            val payloadStart = sinkBuffer.size
            sinkBuffer.write(payload)
            sinkBuffer.readAndWriteUnsafe(maskCursor!!)
            maskCursor.seek(payloadStart)
            toggleMask(maskCursor, maskKey)
            maskCursor.close()
          }
        } else {
          sinkBuffer.writeByte(b1)
          sinkBuffer.write(payload)
        }
    
        sink.flush()
      }
    
    
      //写入(发送)普通消息数据帧
      @Throws(IOException::class)
      fun writeMessageFrame(formatOpcode: Int, data: ByteString) {
        if (writerClosed) throw IOException("closed")
    
        messageBuffer.write(data)
    
        var b0 = formatOpcode or B0_FLAG_FIN
        val dataSize = messageBuffer.size
        sinkBuffer.writeByte(b0)
    
        var b1 = 0
        if (isClient) {
          b1 = b1 or B1_FLAG_MASK
        }
        when {
          dataSize <= PAYLOAD_BYTE_MAX -> {
            b1 = b1 or dataSize.toInt()
            sinkBuffer.writeByte(b1)
          }
          dataSize <= PAYLOAD_SHORT_MAX -> {
            b1 = b1 or PAYLOAD_SHORT
            sinkBuffer.writeByte(b1)
            sinkBuffer.writeShort(dataSize.toInt())
          }
          else -> {
            b1 = b1 or PAYLOAD_LONG
            sinkBuffer.writeByte(b1)
            sinkBuffer.writeLong(dataSize)
          }
        }
    
        if (isClient) {
          random.nextBytes(maskKey!!)
          sinkBuffer.write(maskKey)
    
          if (dataSize > 0L) {
            messageBuffer.readAndWriteUnsafe(maskCursor!!)
            maskCursor.seek(0L)
            toggleMask(maskCursor, maskKey)
            maskCursor.close()
          }
        }
    
        sinkBuffer.write(messageBuffer, dataSize)
        sink.emit()
      }
    
    
    

    大家应该都能看懂了吧,其实就是组装数据帧,包括Opcode,mask,数据长度等等。两个方法的不同就在于普通数据需要判断数据长度的三种情况,再组装数据帧。最后都会通过sinkBuffer写入到输出数据流。

    终于,基本的流程说的差不多了。其中还有很多细节,同学们可以自己花时间看看琢磨琢磨,比如Okio部分。还是那句话,希望大家有空自己也读一读相关源码,这样理解才能深刻,而且你肯定会发现很多我没说到的细节,欢迎大家讨论。我也会继续努力,最后大家给我加个油点个赞吧,感谢感谢。

    总结

    再来个图总结下吧!🎉


    OkHttp-WebSocket源码.jpg

    参考

    OkHttp源码
    《WebSocket协议翻译》


    你的一个👍,就是我分享的动力❤️。

    相关文章

      网友评论

        本文标题:雨露均沾的OkHttp——WebSocket长连接(源码篇)

        本文链接:https://www.haomeiwen.com/subject/uyajcktx.html