OkHttp源码解读第二篇——请求过程

作者: A_si | 来源:发表于2021-03-13 12:45 被阅读0次

    OkHttp的请求过程

    上篇文章说到 OkHttp 的请求过程是在getResponseWithInterceptorChain()里,下面分析下请求和响应过程,先看下这个方法实现:

     internal fun getResponseWithInterceptorChain(): Response {
        // Build a full stack of interceptors.
        val interceptors = mutableListOf<Interceptor>()
        interceptors += client.interceptors
        interceptors += RetryAndFollowUpInterceptor(client)
        interceptors += BridgeInterceptor(client.cookieJar)
        interceptors += CacheInterceptor(client.cache)
        interceptors += ConnectInterceptor
        if (!forWebSocket) {
          interceptors += client.networkInterceptors
        }
        interceptors += CallServerInterceptor(forWebSocket)
    
        val chain = RealInterceptorChain(
            call = this,
            interceptors = interceptors,
            index = 0,
            exchange = null,
            request = originalRequest,
            connectTimeoutMillis = client.connectTimeoutMillis,
            readTimeoutMillis = client.readTimeoutMillis,
            writeTimeoutMillis = client.writeTimeoutMillis
        )
    
        var calledNoMoreExchanges = false
        try {
          val response = chain.proceed(originalRequest)
          if (isCanceled()) {
            response.closeQuietly()
            throw IOException("Canceled")
          }
          return response
        } catch (e: IOException) {
          calledNoMoreExchanges = true
          throw noMoreExchanges(e) as Throwable
        } finally {
          if (!calledNoMoreExchanges) {
            noMoreExchanges(null)
          }
        }
      }
    

    这个核心方法一共分为三部分,组成了 OkHttp 的核心代码。

    getResponseWithInterceptorChain分析

    添加拦截器

        val interceptors = mutableListOf<Interceptor>()
        interceptors += client.interceptors
        interceptors += RetryAndFollowUpInterceptor(client)
        interceptors += BridgeInterceptor(client.cookieJar)
        interceptors += CacheInterceptor(client.cache)
        interceptors += ConnectInterceptor
        if (!forWebSocket) {
          interceptors += client.networkInterceptors
        }
        interceptors += CallServerInterceptor(forWebSocket)
    
    

    上面代码很简单,就是拼装所有拦截器,第一个拦截器client.interceptors是开发者自定义的拦截器。

    组装

        val chain = RealInterceptorChain(
            call = this,
            interceptors = interceptors,
            index = 0,
            exchange = null,
            request = originalRequest,
            connectTimeoutMillis = client.connectTimeoutMillis,
            readTimeoutMillis = client.readTimeoutMillis,
            writeTimeoutMillis = client.writeTimeoutMillis
        )
    

    在这一步,把配置信息和拦截器组装成一个RealInterceptorChain

    处理

    下面这行代码,是真正处理的地方:

          val response = chain.proceed(originalRequest)
    
    

    继续点进去,看方法实现:

     override fun proceed(request: Request): Response {
        check(index < interceptors.size)
    
        calls++
    
        if (exchange != null) {
          check(exchange.finder.sameHostAndPort(request.url)) {
            "network interceptor ${interceptors[index - 1]} must retain the same host and port"
          }
          check(calls == 1) {
            "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
          }
        }
    
        // Call the next interceptor in the chain.
        val next = copy(index = index + 1, request = request)
        val interceptor = interceptors[index]
    
        @Suppress("USELESS_ELVIS")
        val response = interceptor.intercept(next) ?: throw NullPointerException(
            "interceptor $interceptor returned null")
    
        if (exchange != null) {
          check(index + 1 >= interceptors.size || next.calls == 1) {
            "network interceptor $interceptor must call proceed() exactly once"
          }
        }
    
        check(response.body != null) { "interceptor $interceptor returned a response with no body" }
    
        return response
      }
    

    RealInterceptorChainproceed方法主要做两件事情,取出对应index的拦截器,index+1组成另一个RealInterceptorChain,把新组成的RealInterceptorChain传给拦截器执行拦截代码。

    val interceptor = interceptors[index]这里是传入的index是 0 ,所以当前RealInterceptorChain对应的proceed方法中的拦截器第一个拦截器,val next = copy(index = index + 1, request = request)是一个配置信息一样但是index是 1 的新RealInterceptorChain。注意这里不是拦截器,是一个chain

    val response = interceptor.intercept(next)这里是当前的拦截器开始工作,这是一个接口,它的实现里有一句代码是上面见过的,随便点进一个拦截器查看,这里点的是RetryAndFollowUpInterceptor

              response = realChain.proceed(request)
    
    

    而这个realChain就是上面interceptor.intercept(next)传入的next,根据index取出的是第二个拦截器。是不是恍然大悟,这是俄罗斯套娃,一个接一个,并不是一个方法到底的。在第一个拦截器的拦截实现方法里,代码执行到中间,会去执行第二个拦截器,然后去执行第三个、第四个。等返回后再一个个返回。

    override fun intercept(chain: Interceptor.Chain): Response {
        val realChain = chain as RealInterceptorChain
        var request = chain.request
        val call = realChain.call
        var followUpCount = 0
        var priorResponse: Response? = null
        var newExchangeFinder = true
        var recoveredFailures = listOf<IOException>()
        while (true) {
          call.enterNetworkInterceptorExchange(request, newExchangeFinder)
    
          var response: Response
          var closeActiveExchange = true
          try {
            if (call.isCanceled()) {
              throw IOException("Canceled")
            }
    
            try {
              response = realChain.proceed(request)
              newExchangeFinder = true
            } catch (e: RouteException) {
              // The attempt to connect via a route failed. The request will not have been sent.
              if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
                throw e.firstConnectException.withSuppressed(recoveredFailures)
              } else {
                recoveredFailures += e.firstConnectException
              }
              newExchangeFinder = false
              continue
            } catch (e: IOException) {
              // An attempt to communicate with a server failed. The request may have been sent.
              if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {
                throw e.withSuppressed(recoveredFailures)
              } else {
                recoveredFailures += e
              }
              newExchangeFinder = false
              continue
            }
    
            // Attach the prior response if it exists. Such responses never have a body.
            if (priorResponse != null) {
              response = response.newBuilder()
                  .priorResponse(priorResponse.newBuilder()
                      .body(null)
                      .build())
                  .build()
            }
    
            val exchange = call.interceptorScopedExchange
            val followUp = followUpRequest(response, exchange)
    
            if (followUp == null) {
              if (exchange != null && exchange.isDuplex) {
                call.timeoutEarlyExit()
              }
              closeActiveExchange = false
              return response
            }
    
            val followUpBody = followUp.body
            if (followUpBody != null && followUpBody.isOneShot()) {
              closeActiveExchange = false
              return response
            }
    
            response.body?.closeQuietly()
    
            if (++followUpCount > MAX_FOLLOW_UPS) {
              throw ProtocolException("Too many follow-up requests: $followUpCount")
            }
    
            request = followUp
            priorResponse = response
          } finally {
            call.exitNetworkInterceptorExchange(closeActiveExchange)
          }
        }
      }
    

    这里也分为三部分:

    1. 配置信息,执行自己的拦截操作。
    2. 调用下一个拦截器执行
    3. 下一个拦截器返回后的操作
    流水线

    整个流程如上图,像一个流水线一样,从左向右执行,然后在返回执行。

    拦截器分析

    RetryAndFollowUpInterceptor

    请求失败重试和跟从重定向拦截器。

     while (true) {}
    

    循环里的第一行代码:

          call.enterNetworkInterceptorExchange(request, newExchangeFinder)
    

    这是为接下来的连接做准备,为当前call创建一个能寻找可用连接的对象ExchangeFinder

      this.exchangeFinder = ExchangeFinder(
              connectionPool,
              createAddress(request.url),
              this,
              eventListener
          )
    

    这个循环里面如果出现 RouteException 连接异常,或者 IOException 异常,如果可以重试,就会continue重试。能否重试的判断,在recover里:

      if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {
                throw e.firstConnectException.withSuppressed(recoveredFailures)
              } else {
                recoveredFailures += e.firstConnectException
              }
    

    下面是判断是否要执行重定向:

     val exchange = call.interceptorScopedExchange
     val followUp = followUpRequest(response, exchange)
    

    会根据返回的状态码判断是否是重定向,并执行重定向:

       HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT, HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> {
            return buildRedirectRequest(userResponse, method)
          }
    

    直到没有重试和重定向,return response退出循环。

    分析完这个拦截器,总结下它的作用:

    1. 准备连接
    2. 调用下一个拦截器
    3. 重试和重定向

    BridgeInterceptor

    桥接拦截器,主要功能是为当前请求添加header。默认添加requestBuilder.header("Accept-Encoding", "gzip")压缩,同时响应数据自动解压。

    分析完这个拦截器,总结下它的作用:

    1. 添加header
    2. 调用下一个拦截器
    3. 解压

    CacheInterceptor

    缓存拦截器。如果有缓存直接返回:

       if (networkRequest == null && cacheResponse == null) {
          return Response.Builder()
              .request(chain.request())
              .protocol(Protocol.HTTP_1_1)
              .code(HTTP_GATEWAY_TIMEOUT)
              .message("Unsatisfiable Request (only-if-cached)")
              .body(EMPTY_RESPONSE)
              .sentRequestAtMillis(-1L)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build().also {
                listener.satisfactionFailure(call, it)
              }
        }
    

    这里利用缓存构建一个Response

    如果没有去执行下一个拦截器做网络请求,这行代码已经出现很多次了:

          networkResponse = chain.proceed(networkRequest)
    
    

    然后缓存结果:

    if (cacheResponse != null) {
          if (networkResponse?.code == HTTP_NOT_MODIFIED) {
            val response = cacheResponse.newBuilder()
                .headers(combine(cacheResponse.headers, networkResponse.headers))
                .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
                .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
                .cacheResponse(stripBody(cacheResponse))
                .networkResponse(stripBody(networkResponse))
                .build()
    
            networkResponse.body!!.close()
    
            // Update the cache after combining headers but before stripping the
            // Content-Encoding header (as performed by initContentStream()).
            cache!!.trackConditionalCacheHit()
            cache.update(cacheResponse, response)
            return response.also {
              listener.cacheHit(call, it)
            }
          } else {
            cacheResponse.body?.closeQuietly()
          }
        }
    

    如果想看缓存策略,可以点这个类进去查看:

        val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
    
    

    ConnectInterceptor

    最核心的拦截器,请求的建立就是这里发生的。

    object ConnectInterceptor : Interceptor {
      @Throws(IOException::class)
      override fun intercept(chain: Interceptor.Chain): Response {
        val realChain = chain as RealInterceptorChain
        val exchange = realChain.call.initExchange(chain)
        val connectedChain = realChain.copy(exchange = exchange)
        return connectedChain.proceed(realChain.request)
      }
    }
    

    这行代码和上面的不太一样:

        return connectedChain.proceed(realChain.request)
    
    

    这里直接return,而其他的拦截器都是再做一些后置工作,比如缓存拦截器会解压缩。这是因为这里是真正建立连接的地方,至此请求建立完成,并没有需要的后续操作。

    下面分析这个拦截器的拦截操作:

     /** Finds a new or pooled connection to carry a forthcoming request and response. */
      internal fun initExchange(chain: RealInterceptorChain): Exchange {
        synchronized(this) {
          check(expectMoreExchanges) { "released" }
          check(!responseBodyOpen)
          check(!requestBodyOpen)
        }
    
        val exchangeFinder = this.exchangeFinder!!
        val codec = exchangeFinder.find(client, chain) 
        val result = Exchange(this, eventListener, exchangeFinder, codec) 
        this.interceptorScopedExchange = result
        this.exchange = result
        synchronized(this) {
          this.requestBodyOpen = true
          this.responseBodyOpen = true
        }
    
        if (canceled) throw IOException("Canceled")
        return result
      }
    
    
    `val codec = exchangeFinder.find(client, chain)` 这一行找到可以连接:
    
    private fun findHealthyConnection(
        connectTimeout: Int,
        readTimeout: Int,
        writeTimeout: Int,
        pingIntervalMillis: Int,
        connectionRetryEnabled: Boolean,
        doExtensiveHealthChecks: Boolean
      ): RealConnection {
        while (true) {
          val candidate = findConnection(
              connectTimeout = connectTimeout,
              readTimeout = readTimeout,
              writeTimeout = writeTimeout,
              pingIntervalMillis = pingIntervalMillis,
              connectionRetryEnabled = connectionRetryEnabled
          )
    
          // Confirm that the connection is good.
          if (candidate.isHealthy(doExtensiveHealthChecks)) {
            return candidate
          }
    
          // If it isn't, take it out of the pool.
          candidate.noNewExchanges()
    
          // Make sure we have some routes left to try. One example where we may exhaust all the routes
          // would happen if we made a new connection and it immediately is detected as unhealthy.
          if (nextRouteToTry != null) continue
    
          val routesLeft = routeSelection?.hasNext() ?: true
          if (routesLeft) continue
    
          val routesSelectionLeft = routeSelector?.hasNext() ?: true
          if (routesSelectionLeft) continue
    
          throw IOException("exhausted all routes")
        }
      }
    

    在循环里通过findConnection拿到可用连接,验证是否健康(连接正常,心跳正常等),健康就返回,一共有五种方式,分为初始状态获取和再次进入获取。

    初始状态三次获取连接方式

    1. 初始状态去连接池寻找连接:

        if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
            val result = call.connection!!
            eventListener.connectionAcquired(call, result)
            return result
          }
      

      这里会判断连接数量是否超限。

    2. 再次去连接池寻找链接,参数不同,传入了路由:

        if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
              val result = call.connection!!
              eventListener.connectionAcquired(call, result)
              return result
            }
      

      获取路由配置,所谓路由其实就是代理、ip地址等参数的一个组合。拿到路由后尝试重新从连接池中获取连接,这里主要针对http2的多路复用。

    1. 没有可用连接,新建连接:
          newConnection.connect(
              connectTimeout,
              readTimeout,
              writeTimeout,
              pingIntervalMillis,
              connectionRetryEnabled,
              call,
              eventListener
          )
          。。。
          connectionPool.put(newConnection)
    
    

    有了连接并没有结束,而是再去连接池取一次,目的是如果同时2个请求建立连接,符合多路复用,就会丢掉一个,节省资源,最后一个参数是说只拿多路复用的连接:

        if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
          val result = call.connection!!
          nextRouteToTry = route
          newConnection.socket().closeQuietly()
          eventListener.connectionAcquired(call, result)
          return result
        }
    

    然后把连接放入连接池:

          connectionPool.put(newConnection)
    
    

    总结:

    • 获取无多路复用的连接
    • 获取所有连接
    • 创建连接,只获取多路复用的连接

    再次进入获取连接

    上面是初次获取的时候,如果重定向等第二次获取连接的时候,也有两种方式。

    1. 连接不可用,关闭连接进入初始状态,但是复用路由信息,不需要再次寻找路由:
      if (callConnection != null) {
          var toClose: Socket? = null
          synchronized(callConnection) {
            if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) {
              toClose = call.releaseConnectionNoEvents()
            }
          }
    

    如果这个连接不接受新的连接,不能复用,或者 Http 重定向到 Https,就会释放掉这个连接。

    1. 如果这个连接可用,会直接重用这个连接:
       if (call.connection != null) {
            check(toClose == null)
            return callConnection
          }
    

    建立连接

    点进newConnection.connect,查看如何建立连接。

        while (true) {
          try {
            if (route.requiresTunnel()) {
              connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener)
              if (rawSocket == null) {
                // We were unable to connect the tunnel but properly closed down our resources.
                break
              }
            } else {
              connectSocket(connectTimeout, readTimeout, call, eventListener)
            }
            establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
            eventListener.connectEnd(call, route.socketAddress, route.proxy, protocol)
            break
          } catch (e: IOException) {
            socket?.closeQuietly()
            rawSocket?.closeQuietly()
            socket = null
            rawSocket = null
            source = null
            sink = null
            handshake = null
            protocol = null
            http2Connection = null
            allocationLimit = 1
    
            eventListener.connectFailed(call, route.socketAddress, route.proxy, null, e)
    
            if (routeException == null) {
              routeException = RouteException(e)
            } else {
              routeException.addConnectException(e)
            }
    
            if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
              throw routeException
            }
          }
        }
    
    

    route.requiresTunnel(),如果代理是 Http ,目标是 Https ,那么创建socket之后会创建一个createTunnelRequest

    connectSocket(connectTimeout, readTimeout, call, eventListener)
    tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url)
        ?: break // T
    

    如果是正常的 Http 请求,那么久创建一个socket

    connectSocket(connectTimeout, readTimeout, call, eventListener)
    

    其实现如下:

    val rawSocket = when (proxy.type()) {
      Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!
      else -> Socket(proxy)
    }
    

    socket建立之后,就去创建 Http 连接:

    establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener)
    

    这里会根据支持创建不同版本的 Http 连接和加密连接。

    CallServerInterceptor

    这个拦截是向服务器发送数据并接受数据的拦截器,代码比较简单。

    总结:

    流程

    相关文章

      网友评论

        本文标题:OkHttp源码解读第二篇——请求过程

        本文链接:https://www.haomeiwen.com/subject/xgrxcltx.html