美文网首页
OkHttp 4.2.2 源码分析

OkHttp 4.2.2 源码分析

作者: DthFish | 来源:发表于2019-11-18 10:59 被阅读0次

    OkHttp 4.2.2 源码分析

    原文链接
    注意:OkHttp 3.12.0 以上版本需要 Android 21+,否则会奔溃,这个问题之前遇见过,但是具体什么错误因为当时没有记录所以遗忘了,但是这里分析的代码还是采用当前最新的版本 4.2.2,并且这个版本的代码已经采用了 Kotlin 了。

    基本使用

            val okHttpClient = OkHttpClient.Builder().build()
            val request = Request.Builder().get().url("https://www.baidu.com").build()
            val call = okHttpClient.newCall(request)
            call.enqueue(object : Callback {
                override fun onFailure(call: Call, e: IOException) {
                    Log.d("OkHttpClient", e.toString())
                }
    
                override fun onResponse(call: Call, response: Response) {
                    Log.d("OkHttpClient", response.toString())
                }
    
            })
    

    以上就是 OkHttp 的基本使用,里面涉及到了 OkHttpClient、Request、Call、Callback 对象,为了最快的理解 OkHttp 是如何工作的我们可以从 call.enqueue() 入手,但是 Call 仅仅是一个接口,所以我们要看看它的实现类和这个实现类是怎么来的。

    一、 OkHttpClient.newCall

        // OkHttpClient.kt  
        override fun newCall(request: Request): Call {
        return RealCall.newRealCall(this, request, forWebSocket = false)
      }
        // RealCall.kt
        fun newRealCall(
          client: OkHttpClient,
          originalRequest: Request,
          forWebSocket: Boolean
        ): RealCall {
          // Safely publish the Call instance to the EventListener.
          return RealCall(client, originalRequest, forWebSocket).apply {
            transmitter = Transmitter(client, this)
          }
        }
    

    仅仅是调用了 RealCall 的方法,把 OkHttpClient 和 Request 对象当作参数,新建了一个 RealCall,然后初始化 Transmitter(发射器)对象。

    二、RealCall.execute

    我们发起请求的方式有同步和异步两种,execute 就是同步请求,而 enqueue 为异步请求。

      override fun execute(): Response {
        //1、
        synchronized(this) {
          check(!executed) { "Already Executed" }
          executed = true
        }
        //2、
        transmitter.timeoutEnter()
        //3、
        transmitter.callStart()
        try {
          //4、
          client.dispatcher.executed(this)
          //5、
          return getResponseWithInterceptorChain()
        } finally {
          //6、
          client.dispatcher.finished(this)
        }
      }
    
    1. 首先通过 executed 判断是否已经调用过,如果多次调用则抛出异常。

    2. 请求超时的判断处理。

    3. 在初始化 OkHttpClient 的时候我们可以创建 EventListener.Factory 对象,为每个 RealCall 对象创建监听对象,在 transmitter.callStart 中会调用 EventListener.callStart。

    4. 调用 Dispatcher 对象的 executed 方法,把 RealCall 传入,事实上仅仅是加入到一个列表当中。

        @Synchronized internal fun executed(call: RealCall) {
          runningSyncCalls.add(call)
        }
      
    5. getResponseWithInterceptorChain 是真正开始处理请求的地方,这里我们后面单独讲。

    6. 请求结束后从列表中移除,如果发现 Dispatcher 空闲还会调用空闲的回调,

    三、RealCall.enqueue

    enqueue 为异步请求。

        // RealCall.kt
        override fun enqueue(responseCallback: Callback) {
        //1、
        synchronized(this) {
          check(!executed) { "Already Executed" }
          executed = true
        }
        //2、
        transmitter.callStart()
        //3、
        client.dispatcher.enqueue(AsyncCall(responseCallback))
      }
    
    1. 防止重复调用。
    2. 事件开始分发。
    3. 与同步调用不同的地方,这里把我们使用的时候传入的 Callback 对象,进一步用 AsyncCall 封装,然后调用 Dispatcher.enqueue 方法。
        //Dispatcher.kt
      internal fun enqueue(call: AsyncCall) {
        synchronized(this) {
                //4、
          readyAsyncCalls.add(call)
          if (!call.get().forWebSocket) {
            //5、
            val existingCall = findExistingCallWithHost(call.host())
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
          }
        }
        //6、
        promoteAndExecute()
      }
    
    1. 把 AsycCall 对象添加到等待队列。
    2. 如果不是 WebScoket 的请求(一般都不是),会进一步从正在执行的等待中的异步请求列表中查找 Host 相同的请求,把它的 callsPerHost 对象的引用拷贝过来,用来统计相同 Host 的请求数。reuseCallsPerHostFrom 方法实现见下。
    // RealCall.kt 中内部类 AsyncCall
        @Volatile private var callsPerHost = AtomicInteger(0)
    
        fun callsPerHost(): AtomicInteger = callsPerHost
    
        fun reuseCallsPerHostFrom(other: AsyncCall) {
          this.callsPerHost = other.callsPerHost
        }
    
    1. promoteAndExecute 方法见下。
      private fun promoteAndExecute(): Boolean {
        assert(!Thread.holdsLock(this))
    
        val executableCalls = mutableListOf<AsyncCall>()
        val isRunning: Boolean
        synchronized(this) {
          val i = readyAsyncCalls.iterator()
          // 7、
          while (i.hasNext()) {
            val asyncCall = i.next()
                    //到达最大请求数,默认为 64
            if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
            //到达单个 host 最大请求数,默认为 5
            if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
    
            i.remove()
            asyncCall.callsPerHost().incrementAndGet()
            executableCalls.add(asyncCall)
            runningAsyncCalls.add(asyncCall)
          }
          isRunning = runningCallsCount() > 0
        }
            // 8、
        for (i in 0 until executableCalls.size) {
          val asyncCall = executableCalls[i]
          asyncCall.executeOn(executorService)
        }
    
        return isRunning
      }
    
    1. 遍历等待执行的异步请求列表 readyAsyncCalls,将符合要求的请求收集到 executableCalls 列表中,同时在 readyAsyncCalls 中移除。这里有两种情况为暂时不处理的请求:一个是到达单个 host 最大请求数,另一个是总体请求到达最大请求数。
    2. 遍历收集到的请求,调用 AsyncCall.executeOn,参数为线程池。

    四、AsyncCall.executeOn

    AsyncCall 实现了 Runnable 接口,这里又把 executorService 传入,不难想到后面的逻辑就是用线程池执行 Runnable 的 run 方法,下面我们来看下。

        fun executeOn(executorService: ExecutorService) {
          assert(!Thread.holdsLock(client.dispatcher))
          var success = false
          try {
            //1、
            executorService.execute(this)
            success = true
          } catch (e: RejectedExecutionException) {
            //2、
            val ioException = InterruptedIOException("executor rejected")
            ioException.initCause(e)
            transmitter.noMoreExchanges(ioException)
            responseCallback.onFailure(this@RealCall, ioException)
          } finally {
            //3、
            if (!success) {
              client.dispatcher.finished(this) // This call is no longer running!
            }
          }
        }
    
    1. 用线程池执行,后面马上就会看 run 的实现。
    2. 异常处理,并分发。
    3. 异常结束的结束处理。
        override fun run() {
          // 4、
          threadName("OkHttp ${redactedUrl()}") {
            var signalledCallback = false
            // 5、
            transmitter.timeoutEnter()
            try {
              // 6、
              val response = getResponseWithInterceptorChain()
              signalledCallback = true
              responseCallback.onResponse(this@RealCall, response)
            } catch (e: IOException) {
              // 7、
              if (signalledCallback) {
                // Do not signal the callback twice!
                Platform.get().log(INFO, "Callback failure for ${toLoggableString()}", e)
              } else {
                responseCallback.onFailure(this@RealCall, e)
              }
            } finally {
              // 8、
              client.dispatcher.finished(this)
            }
          }
        }
    
    1. 修改线程名,在执行完内部代码之后修改回来,赞美 kotlin 的便捷性。
    2. 请求超时的判断处理。
    3. 殊途同归的 getResponseWithInterceptorChain,后面分析,紧跟着调用回调。
    4. 异常处理,以及回调。
    5. 处理结束把 AsyncCall 从 Dispatcher 的列表中移除,相同 Host 的请求计数减去一。

    五、getResponseWithInterceptorChain

    OkHttp 以责任链模式通过拦截器把需要进行的网络请求进行了责任的隔离,各个拦截器承担着各自的职责。

      @Throws(IOException::class)
      fun getResponseWithInterceptorChain(): Response {
        // Build a full stack of interceptors.
        // 1、
        val interceptors = mutableListOf<Interceptor>()
        interceptors += client.interceptors
        interceptors += RetryAndFollowUpInterceptor(client)
        interceptors += BridgeInterceptor(client.cookieJar)
        interceptors += CacheInterceptor(client.cache)
        interceptors += ConnectInterceptor
        if (!forWebSocket) {
          interceptors += client.networkInterceptors
        }
        interceptors += CallServerInterceptor(forWebSocket)
            // 2、
        val chain = RealInterceptorChain(interceptors, transmitter, null, 0, originalRequest, this,
            client.connectTimeoutMillis, client.readTimeoutMillis, client.writeTimeoutMillis)
    
        var calledNoMoreExchanges = false
        try {
          // 3、
          val response = chain.proceed(originalRequest)
          if (transmitter.isCanceled) {
            response.closeQuietly()
            throw IOException("Canceled")
          }
          return response
        } catch (e: IOException) {
          calledNoMoreExchanges = true
          throw transmitter.noMoreExchanges(e) as Throwable
        } finally {
          if (!calledNoMoreExchanges) {
            transmitter.noMoreExchanges(null)
          }
        }
      }
    
    1. 把各个拦截器添加到列表,包括:自定义拦截器,重试重定向拦截器,桥接拦截器,缓存拦截器,连接拦截器,自定义网络拦截器,网络请求拦截器。
    2. 把前面的拦截器列表封装成 RealInterceptorChain 对象。
    3. 调用 RealInterceptorChain 对象的 proceed 方法,获取 Response 对象,最后进行一些收尾的工作。
      // RealInterceptorChain.kt
        @Throws(IOException::class)
      fun proceed(request: Request, transmitter: Transmitter, exchange: Exchange?): Response {
        if (index >= interceptors.size) throw AssertionError()
            // 省略一些检查代码
        calls++
        // Call the next interceptor in the chain.
        //4、
        val next = RealInterceptorChain(interceptors, transmitter, exchange,
            index + 1, request, call, connectTimeout, readTimeout, writeTimeout)
        //5、
        val interceptor = interceptors[index]
        @Suppress("USELESS_ELVIS")
        val response = interceptor.intercept(next) ?: throw NullPointerException(
            "interceptor $interceptor returned null")
    
        return response
      }
    
    1. 新建 RealInterceptorChain 对象,把当前的对象的参数传入,index + 1。

    2. 获取当前 index 的拦截器,调用 intercept 方法,传入步骤 4 新建的 RealInterceptorChain 对象,返回结果 Response。

    那么这样能清楚拦截器的调用过程了,每个拦截器的 intercept 会被 RealInterceptorChain.proceed 触发,然后接收到一个新的 RealInterceptorChain 对象,只要在自身的 intercept 方法中调用接收到的对象的 proceed 方法,就能触发下一个动作,拿到一个 Response 对象。这样层层调用,就能走完我们一开始收集的拦截器列表中所有的拦截方法。这个就是核心思想。

    六、拦截器

    1、重试重定向拦截器 RetryAndFollowUpInterceptor

    重试重定向拦截器,主要是请求失败或者请求需要重定向的时候进行新的请求。

        // RetryAndFollowUpInterceptor.kt
        @Throws(IOException::class)
      override fun intercept(chain: Interceptor.Chain): Response {
        var request = chain.request()
        val realChain = chain as RealInterceptorChain
        val transmitter = realChain.transmitter()
        var followUpCount = 0
        var priorResponse: Response? = null
        // 循环操作
        while (true) {
          //1、
          transmitter.prepareToConnect(request)
    
          if (transmitter.isCanceled) {
            throw IOException("Canceled")
          }
    
          var response: Response
          var success = false
          try {
            //2、
            response = realChain.proceed(request, transmitter, null)
            success = true
          } catch (e: RouteException) {
            // The attempt to connect via a route failed. The request will not have been sent.
            // 路由错误,如果不可以恢复则直接抛出异常
            if (!recover(e.lastConnectException, transmitter, false, request)) {
              throw e.firstConnectException
            }
            continue
          } catch (e: IOException) {
            // An attempt to communicate with a server failed. The request may have been sent.
            // 
            val requestSendStarted = e !is ConnectionShutdownException
            // IO 错误,如果不可以恢复则直接抛出异常
            if (!recover(e, transmitter, requestSendStarted, request)) throw e
            continue
          } finally {
            // The network call threw an exception. Release any resources.
            // 3、
            if (!success) {
              transmitter.exchangeDoneDueToException()
            }
          }
    
          // Attach the prior response if it exists. Such responses never have a body.
          // 4、第一次 priorResponse 肯定是空的,如果经过重试的话则会把之前的 Response 赋值给新的 Response
          if (priorResponse != null) {
            response = response.newBuilder()
                .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
                .build()
          }
    
          val exchange = response.exchange
          val route = exchange?.connection()?.route()
          // 5、
          val followUp = followUpRequest(response, route)
                // 6、
          if (followUp == null) {
            if (exchange != null && exchange.isDuplex) {
              transmitter.timeoutEarlyExit()
            }
            return response
          }
    
          val followUpBody = followUp.body
          // 7、一次性请求的话直接返回
          if (followUpBody != null && followUpBody.isOneShot()) {
            return response
          }
    
          response.body?.closeQuietly()
          // 8、
          if (transmitter.hasExchange()) {
            exchange?.detachWithViolence()
          }
                // 9、次数判断,最大为20
          if (++followUpCount > MAX_FOLLOW_UPS) {
            throw ProtocolException("Too many follow-up requests: $followUpCount")
          }
                // 10、
          request = followUp
          priorResponse = response
        }
      }
    
    1. transmitter 对象是在创建 RealCall 的时候初始化的,判断是否有可以重用的连接或者对没用的连接的回收,如果没有则创建新的 ExchangeFinder 对象。

    2. realChain.proceed 调用到下一个拦截器,该行代码之后都是请求回来后的处理。

    3. catch 和 finally,catch 中主要是出现异常后,判断是否可以重试,如果可以就 continue 否则抛出异常并进入 finally 做回收操作。稍微看下 recover 方法。

        private fun recover(
          e: IOException,
          transmitter: Transmitter,
          requestSendStarted: Boolean,
          userRequest: Request
        ): Boolean {
          // 应用层面不允许重试,我们可以在 OkHttpClient 初始化时候设置
          if (!client.retryOnConnectionFailure) return false
      
          // 已经发起过请求,且只允许发送一次
          if (requestSendStarted && requestIsOneShot(e, userRequest)) return false
      
          // 一些致命问题,比如证书认证错误等
          if (!isRecoverable(e, requestSendStarted)) return false
      
          // 没有更多的路由去尝试
          if (!transmitter.canRetry()) return false
          // For failure recovery, use the same route selector with a new connection.
          return true
        }
      
    4. 第一次 priorResponse 肯定是空的,如果经过重试的话则会把之前的 Response 赋值给新的 Response 的 priorResponse 成员变量,并且显示的将 priorResponse 的 body 置空。这个对象的赋值意义是什么?其实作用是在步骤 5 中,作为停止重试的判断使用:如果先后两次 Http 请求结果获取的状态码都是 408(请求超时)或者都是 503(服务不可用)则直接停止重试。

    5. followUpRequest 根据请求回来的 Response 的响应码构建新的 Request,其中就包含了 301,302 等重定向的处理,步骤 4 提到的处理,以及我们一开始初始化 OkHttpClient 对象的是否允许重试的处理等。如果需要重新请求则 followUp 对象不为空,否则为 null 停止继续请求。

    6. 如果 followUp 为 null 则可以直接返回请求结果,这里还涉及到一个 Exchange 对象,这个我们后面再讲。

    7. 一次性请求的话直接返回。

    8. 走到这里表示,我们需要跟进一个新的请求,但是之前请求的一些操作还没有结束,则需要在这里停止。

    9. 重试次数判断,最大为20。

    10. 更新 Request 对象为新生成的对象,priorResponse 赋值(作用在步骤 4 中已经说明),进入下一次循环。

    2、桥接拦截器 BridgeInterceptor

    在发起请求之前,用应用层发起的请求将 Http 请求需要的请求头填充完整,在请求之后,将 Http 响应解析为应用层的响应。

      // BridgeInterceptor.kt
        @Throws(IOException::class)
      override fun intercept(chain: Interceptor.Chain): Response {
        val userRequest = chain.request()
        val requestBuilder = userRequest.newBuilder()
    
        val body = userRequest.body
        if (body != null) {
          // 1、
          val contentType = body.contentType()
          if (contentType != null) {
            requestBuilder.header("Content-Type", contentType.toString())
          }
    
          val contentLength = body.contentLength()
          if (contentLength != -1L) {
            requestBuilder.header("Content-Length", contentLength.toString())
            requestBuilder.removeHeader("Transfer-Encoding")
          } else {
            requestBuilder.header("Transfer-Encoding", "chunked")
            requestBuilder.removeHeader("Content-Length")
          }
        }
            // 2、
        if (userRequest.header("Host") == null) {
          requestBuilder.header("Host", userRequest.url.toHostHeader())
        }
    
        if (userRequest.header("Connection") == null) {
          requestBuilder.header("Connection", "Keep-Alive")
        }
    
        var transparentGzip = false
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
          transparentGzip = true
          requestBuilder.header("Accept-Encoding", "gzip")
        }
            // 3、
        val cookies = cookieJar.loadForRequest(userRequest.url)
        if (cookies.isNotEmpty()) {
          requestBuilder.header("Cookie", cookieHeader(cookies))
        }
    
        if (userRequest.header("User-Agent") == null) {
          requestBuilder.header("User-Agent", userAgent)
        }
    
        val networkResponse = chain.proceed(requestBuilder.build())
            // 4、
        cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)
    
        val responseBuilder = networkResponse.newBuilder()
            .request(userRequest)
            // 5、
        if (transparentGzip &&
            "gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&
            networkResponse.promisesBody()) {
          val responseBody = networkResponse.body
          if (responseBody != null) {
            val gzipSource = GzipSource(responseBody.source())
            val strippedHeaders = networkResponse.headers.newBuilder()
                .removeAll("Content-Encoding")
                .removeAll("Content-Length")
                .build()
            responseBuilder.headers(strippedHeaders)
            val contentType = networkResponse.header("Content-Type")
            responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))
          }
        }
    
        return responseBuilder.build()
      }
    
    1. 如果 Request.body 不为空,则添加 Content-Type、Content-Length 或者 Transfer-Encoding 请求头。
    2. 添加 Host、Connection 请求头,如果支持压缩还需要添加 Accept-Encoding:gzip,相应的需要在请求回来之后进行解压的操作。
    3. CookJar 是对 cookie 的支持,但是它默认是一个空实现(具体可以看 OkHttpClient.Builder 中的代码),如果我们需要则可以自己实现一个 CookJar 来保存和读取 cookie。
    4. 这一步,以及之后的逻辑都是响应后的处理了,这里首先是 cookie 的保持处理,默认是没有的。
    5. 如果之前请求的时候在请求头中添加了 Accept-Encoding:gzip,而且响应头中也的确说明了是 gzip 的,则进行解压。

    3、缓存拦截器 CacheInterceptor

    主要是用于从缓存读取和写入请求所对应的响应。这里缓存的策略事实上和 Http 的缓存机制有很大的关联。

      // CacheInterceptor.kt
        @Throws(IOException::class)
      override fun intercept(chain: Interceptor.Chain): Response {
        // 1、
        val cacheCandidate = cache?.get(chain.request())
    
        val now = System.currentTimeMillis()
            // 2、
        val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
        val networkRequest = strategy.networkRequest
        val cacheResponse = strategy.cacheResponse
    
        cache?.trackResponse(strategy)
    
        if (cacheCandidate != null && cacheResponse == null) {
          // The cache candidate wasn't applicable. Close it.
          cacheCandidate.body?.closeQuietly()
        }
    
        // If we're forbidden from using the network and the cache is insufficient, fail.
        // 3、
        if (networkRequest == null && cacheResponse == null) {
          return Response.Builder()
              .request(chain.request())
              .protocol(Protocol.HTTP_1_1)
              .code(HTTP_GATEWAY_TIMEOUT)
              .message("Unsatisfiable Request (only-if-cached)")
              .body(EMPTY_RESPONSE)
              .sentRequestAtMillis(-1L)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build()
        }
    
        // If we don't need the network, we're done.
        // 4、
        if (networkRequest == null) {
          return cacheResponse!!.newBuilder()
              .cacheResponse(stripBody(cacheResponse))
              .build()
        }
            // 5、
        var networkResponse: Response? = null
        try {
          networkResponse = chain.proceed(networkRequest)
        } finally {
          // If we're crashing on I/O or otherwise, don't leak the cache body.
          if (networkResponse == null && cacheCandidate != null) {
            cacheCandidate.body?.closeQuietly()
          }
        }
    
        // If we have a cache response too, then we're doing a conditional get.
        // 6、
        if (cacheResponse != null) {
          if (networkResponse?.code == HTTP_NOT_MODIFIED) {
            val response = cacheResponse.newBuilder()
                .headers(combine(cacheResponse.headers, networkResponse.headers))
                .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
                .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
                .cacheResponse(stripBody(cacheResponse))
                .networkResponse(stripBody(networkResponse))
                .build()
    
            networkResponse.body!!.close()
    
            // Update the cache after combining headers but before stripping the
            // Content-Encoding header (as performed by initContentStream()).
            cache!!.trackConditionalCacheHit()
            cache.update(cacheResponse, response)
            return response
          } else {
            cacheResponse.body?.closeQuietly()
          }
        }
            // 7、
        val response = networkResponse!!.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build()
    
        if (cache != null) {
          if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
            // Offer this request to the cache.
            val cacheRequest = cache.put(response)
            return cacheWritingResponse(cacheRequest, response)
          }
    
          if (HttpMethod.invalidatesCache(networkRequest.method)) {
            try {
              cache.remove(networkRequest)
            } catch (_: IOException) {
              // The cache cannot be written.
            }
          }
        }
    
        return response
      }
    
    1. Cache 对象可以在 OkHttpClient 初始化的时候构建,里面是通过 DiskLruCache 进行缓存的,以 url 生成 key 在缓存中查找到目标之后重新构造成 Response,然后进一步对比 Request 和从缓存恢复 Response,如果没命中则会返回 null,否则返回 Response。
    2. 获取缓存策略,根据策略的 networkRequest 和 cacheResponse 是否为 null 会有不同的处理。具体缓存策略的逻辑稍后再讲。
    3. 如果 networkRequest 和 cacheResponse 都为 null,则不再进行网络请求,直接构造失败的 Response, 结束。
    4. 如果只有 networkRequest 为 null,则表示缓存可用,结束。
    5. 如果 networkRequest 不为 null,则进行网络请求。
    6. 从这里开始是网络请求回来后的逻辑,当 cacheResponse 不为 null 且 networkResponse 的相应码为 304(客户端有缓存。http 请求时候发现自己缓存的文件有 Last Modified ,那么在请求中会包含 If Modified Since,服务器通过对比,如果没有改变,就会返回 304,响应体就不需要继续发送了以此减少带宽的消化。),则合并两个响应头,更新缓存,结束。
    7. 如果 cache 不为 null,验证 networkResponse 是否需要缓存,并缓存,结束;否则返回结果,结束。
    CacheStrategy

    前面讲到了缓存策略,事实证明返回的 CacheStrategy 的 networkRequest 和 cacheResponse 直接影响到了后续的调用逻辑。缓存策略是怎么产生的,主要还是要看 CacheStrategy.compute 方法:

        //CacheStrategy.kt
            fun compute(): CacheStrategy {
          val candidate = computeCandidate()
    
          // We're forbidden from using the network and the cache is insufficient.
          // 如果获得的 networkRequest 为 null,且只支持从缓存读取,那么就可以直接构造失败的响应了
          // 对应前面步骤 3
          if (candidate.networkRequest != null && request.cacheControl.onlyIfCached) {
            return CacheStrategy(null, null)
          }
    
          return candidate
        }
    
        private fun computeCandidate(): CacheStrategy {
          // 缓存没有命中,发起网络请求,对应前面步骤 5
          if (cacheResponse == null) {
            return CacheStrategy(request, null)
          }
    
          // 请求为 https,缓存没有握手信息,发起网络请求,对应前面步骤 5
          if (request.isHttps && cacheResponse.handshake == null) {
            return CacheStrategy(request, null)
          }
    
            // isCacheable 中对 cacheResponse 的响应码以及 request 和 cacheResponse 
          // Cache-Control 头做了判断,如果不满足,则对应前面步骤 5
          if (!isCacheable(cacheResponse, request)) {
            return CacheStrategy(request, null)
          }
          
          val requestCaching = request.cacheControl
          // hasConditions 中对请求头中是否存在 If-Modified-Since 和 If-None-Match 做了判断,
          // 事实上如果有这两个字段,也可以发起请求,如果服务端验证没有过期,则只发送响应头回来
          // 对应步骤 5 以及后续验证的步骤 6
          if (requestCaching.noCache || hasConditions(request)) {
            return CacheStrategy(request, null)
          }
    
          val responseCaching = cacheResponse.cacheControl
    
          val ageMillis = cacheResponseAge()
          var freshMillis = computeFreshnessLifetime()
    
          if (requestCaching.maxAgeSeconds != -1) {
            freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong()))
          }
    
          var minFreshMillis: Long = 0
          if (requestCaching.minFreshSeconds != -1) {
            minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong())
          }
    
          var maxStaleMillis: Long = 0
          if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1) {
            maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong())
          }
                // 有缓存,不新鲜则进行标记,但是没必要发起网络请求,对应步骤 4
          if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
            val builder = cacheResponse.newBuilder()
            if (ageMillis + minFreshMillis >= freshMillis) {
              builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"")
            }
            val oneDayMillis = 24 * 60 * 60 * 1000L
            if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
              builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"")
            }
            return CacheStrategy(null, builder.build())
          }
    
          val conditionName: String
          val conditionValue: String?
          when {
            etag != null -> {
              // 对应 cacheResponse 响应头中的 ETag 字段,
              // 如果不为空则给新的请求头添加 If-None-Match 
              conditionName = "If-None-Match"
              conditionValue = etag
            }
    
            lastModified != null -> {
                // 对应 cacheResponse 响应头中的 Last-Modified 字段,
              // 如果不为空则给新的请求头添加 If-Modified-Since
              conditionName = "If-Modified-Since"
              conditionValue = lastModifiedString
            }
    
            servedDate != null -> {
                // 对应 cacheResponse 响应头中的 Date 字段,
              // 如果不为空则给新的请求头添加 If-Modified-Since
              conditionName = "If-Modified-Since"
              conditionValue = servedDateString
            }
                    // 如果不满足以上情况,则直接发起请求,对应步骤 5
            else -> return CacheStrategy(request, null) // No condition! Make a regular request.
          }
                // 如果有满足上面 when 的条件,则添加进相应的请求头中,对应步骤 5以及后续验证的步骤 6
          val conditionalRequestHeaders = request.headers.newBuilder()
          conditionalRequestHeaders.addLenient(conditionName, conditionValue!!)
    
          val conditionalRequest = request.newBuilder()
              .headers(conditionalRequestHeaders.build())
              .build()
          return CacheStrategy(conditionalRequest, cacheResponse)
        }
    

    那么分析到这里,了解过 Http 缓存机制的同学应该也发现了,这里的代码和 Http 缓存机制完全一致。而且缓存策略中的处理,也和缓存拦截器里面的操作都能相互对应上。

    4、连接拦截器 ConnectInterceptor

    用于开启一个连接。

      // ConnectInterceptor.kt
        @Throws(IOException::class)
      override fun intercept(chain: Interceptor.Chain): Response {
        val realChain = chain as RealInterceptorChain
        val request = realChain.request()
        val transmitter = realChain.transmitter()
    
        // 1、
        val doExtensiveHealthChecks = request.method != "GET"
        val exchange = transmitter.newExchange(chain, doExtensiveHealthChecks)
    
        return realChain.proceed(request, transmitter, exchange)
      }
    
    1. 当请求方式不是 GET 的时候需要更全面的检查,获取 Exchange 对象(就叫他转换器),transmitter 之前提到过在创建 RealCall 的时候创建的。
      //Transmitter.kt
        internal fun newExchange(chain: Interceptor.Chain, doExtensiveHealthChecks: Boolean): Exchange {
        synchronized(connectionPool) {
          // 2、
          check(!noMoreExchanges) { "released" }
          check(exchange == null) {
            "cannot make a new request because the previous response is still open: " +
                "please call response.close()"
          }
        }
            // 3、
        val codec = exchangeFinder!!.find(client, chain, doExtensiveHealthChecks)
        // 4、
        val result = Exchange(this, call, eventListener, exchangeFinder!!, codec)
    
        synchronized(connectionPool) {
          this.exchange = result
          this.exchangeRequestDone = false
          this.exchangeResponseDone = false
          return result
        }
      }
    
    1. noMoreExchanges 如果为 true 表示 Transmitter 已经要释放不再接受新的处理,exchange 不为 null 表示之前的响应还没有关闭。
    2. 通过重试拦截器里面初始化的 ExchangeFinder 对象,查找或者新建一个健康的连接并封装到一个 ExchangeCodec(编码器)返回,根据 http 的不同版本,编码器也不同。
    3. 将 ExchangeCodec 编码器等参数封装到 Exchange 对象返回,通过最终通过 RealInterceptorChain.proceed 方法传递到下一个拦截器(如果我们有自定义的网络拦截器那么就会被调用到,如果没有则到了最后一个拦截器:网络请求拦截器)。
    如何获取一个健康的连接?

    连接拦截器自身的代码看起来很简洁,但核心点是怎么查找连接。所以我们还需要深入看下 ExchangeFinder.find 方法。

        // ExchangeFinder.kt
        fun find(
        client: OkHttpClient,
        chain: Interceptor.Chain,
        doExtensiveHealthChecks: Boolean
      ): ExchangeCodec {
            // 省略一些代码
        try {
          val resultConnection = findHealthyConnection(
              connectTimeout = connectTimeout,
              readTimeout = readTimeout,
              writeTimeout = writeTimeout,
              pingIntervalMillis = pingIntervalMillis,
              connectionRetryEnabled = connectionRetryEnabled,
              doExtensiveHealthChecks = doExtensiveHealthChecks
          )
          return resultConnection.newCodec(client, chain)
        }
        // 省略一些代码
      }
    
      @Throws(IOException::class)
      private fun findHealthyConnection(
        connectTimeout: Int,
        readTimeout: Int,
        writeTimeout: Int,
        pingIntervalMillis: Int,
        connectionRetryEnabled: Boolean,
        doExtensiveHealthChecks: Boolean
      ): RealConnection {
        // 循环查找可用的连接
        while (true) {
          val candidate = findConnection(
              connectTimeout = connectTimeout,
              readTimeout = readTimeout,
              writeTimeout = writeTimeout,
              pingIntervalMillis = pingIntervalMillis,
              connectionRetryEnabled = connectionRetryEnabled
          )
    
          // 如果成功的次数是0,表示找到的连接为新的,就不需要做全面的检查直接返回
          synchronized(connectionPool) {
            if (candidate.successCount == 0) {
              return candidate
            }
          }
    
            // 检查连接池里面的连接是否可用,如果可用则直接返回,否则从连接池中移除,进入下一次循环查找
          if (!candidate.isHealthy(doExtensiveHealthChecks)) {
            candidate.noNewExchanges()
            continue
          }
    
          return candidate
        }
      }
    
      // ExchangeFinder.kt
        @Throws(IOException::class)
      private fun findConnection(
        connectTimeout: Int,
        readTimeout: Int,
        writeTimeout: Int,
        pingIntervalMillis: Int,
        connectionRetryEnabled: Boolean
      ): RealConnection {
        var foundPooledConnection = false
        var result: RealConnection? = null
        var selectedRoute: Route? = null
        var releasedConnection: RealConnection?
        val toClose: Socket?
        synchronized(connectionPool) {
          // 如果已经通过发射器标记了取消,则直接抛异常退出
          if (transmitter.isCanceled) throw IOException("Canceled")
          // 新尝试的标记
          hasStreamFailure = false // This is a fresh attempt.
                // 第一次进来应该为 null,因为在循环中所以可以先看下边
          releasedConnection = transmitter.connection
          // 如果不为空,且持有的连接被标记不能做更多的处理,则释放资源(怎么释放呢?不着急,我们先看怎么获取)
          toClose = if (transmitter.connection != null && transmitter.connection!!.noNewExchanges) {
            transmitter.releaseConnectionNoEvents()
          } else {
            null
          }
    
          if (transmitter.connection != null) {
            // 如果能走到这里表示,是复用的连接,而且没有被上面的操作回收掉,即 noNewExchanges 为 false
            result = transmitter.connection
            releasedConnection = null
          }
    
          if (result == null) {
            // result 为 null 需要尝试从连接池里面获取一个
            if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
              // 走到这里表示已经从连接池里面获取到
              foundPooledConnection = true
              result = transmitter.connection
            } else if (nextRouteToTry != null) {
              // 标记1、第一次应该不会到这里,目前看不明白,我们一会回来看就清楚了
              selectedRoute = nextRouteToTry
              nextRouteToTry = null
            } else if (retryCurrentRoute()) {
              // 第一次应该不会到这里
              selectedRoute = transmitter.connection!!.route()
            }
          }
        }
        // 关闭上面需要释放的 socket
        toClose?.closeQuietly()
    
        if (releasedConnection != null) {
          eventListener.connectionReleased(call, releasedConnection!!)
        }
        if (foundPooledConnection) {
          eventListener.connectionAcquired(call, result!!)
        }
        if (result != null) {
          // 复用的或者是刚刚从连接池里面查找到的
          return result!!
        }
    
        // 创建一个路由选择器,第一次进来的话会从这里获取
        var newRouteSelection = false
        if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
          newRouteSelection = true
          // 如果我们没有自己给 OkHttpClient 设置额外的 Proxy 或者 ProxySelector 的话,
          // routeSelector 创建的时候,会走到 sun.net.spi.DefaultProxySelector 获取 Routes
          // next 里面会把 Routes 依次通过 dns 解析地址之后重新封装成 Route,
          // 再经过之前是否存在失败的情况筛选,
          // 最终保存在 Selection 对象里,返回。
          routeSelection = routeSelector.next()
        }
    
        var routes: List<Route>? = null
        synchronized(connectionPool) {
          if (transmitter.isCanceled) throw IOException("Canceled")
    
          if (newRouteSelection) {
            
            // 通过上一步创建 routeSelection 的操作,我们获取了一个 ip 地址的集合
            // 再次尝试从连接池中匹配是否有合适的连接
            // 与上一次不同的是 routes 对象不为空
            routes = routeSelection!!.routes
            if (connectionPool.transmitterAcquirePooledConnection(
                    address, transmitter, routes, false)) {
              foundPooledConnection = true
              result = transmitter.connection
            }
          }
    
          if (!foundPooledConnection) {
            if (selectedRoute == null) {
              // 连接池没有找到,那么我们从路由选择中选中一个路由
              selectedRoute = routeSelection!!.next()
            }
            
            // 创建一个全新的连接,
            result = RealConnection(connectionPool, selectedRoute!!)
            connectingConnection = result
          }
        }
    
        // 如果是复用的那么到这里就结束了,如果是新建的还需要进行其他的操作
        if (foundPooledConnection) {
          eventListener.connectionAcquired(call, result!!)
          return result!!
        }
        // 新建的连接,建立 TCP 连接,如果是 Https 那么还需要进行密钥的协商
        result!!.connect(
            connectTimeout,
            readTimeout,
            writeTimeout,
            pingIntervalMillis,
            connectionRetryEnabled,
            call,
            eventListener
        )
        // 从失败的路由集合中移除成功的路由
        connectionPool.routeDatabase.connected(result!!.route())
    
        var socket: Socket? = null
        synchronized(connectionPool) {
          connectingConnection = null
          
          // 最终再次从连接池中获取可以多路复用的连接,注意和之前两次不同的是最后一个参数为 true
          // 如果有多个并发的连接到同一个 host 这时候后来的就能找到前面的
          if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
            // 走到这里,就表示可以从连接池中获取的现成的,那么刚刚新建的就可以回收掉了
            result!!.noNewExchanges = true
            socket = result!!.socket()
            result = transmitter.connection
            // 有可能刚获取到的之后马上这个连接就不可用了,那么我们把之前能成功连接的路由暂时保存一下
            // 见前面注释中标记 1 的位置
            nextRouteToTry = selectedRoute
          } else {
            // 没有在连接池中找到,那么我们就把新建的添加进去
            connectionPool.put(result!!)
            transmitter.acquireConnectionNoEvents(result!!)
          }
        }
        socket?.closeQuietly()
    
        eventListener.connectionAcquired(call, result!!)
        return result!!
      }
    

    上面的步骤比较多,但是已经详细注释了,有些地方可以第一次不能理解,但是继续看下去就能发现什么情况才能调用到了,毕竟整个过程在一个循环中。

    下面单独提一下 connectionPool.transmitterAcquirePooledConnection、transmitter.acquireConnectionNoEvents 以及 transmitter.releaseConnectionNoEvents 三个方法,分别是 transmitter 从连接池获取连接,取得后的绑定,和释放的过程。

      // RealConnectionPool.kt
        fun transmitterAcquirePooledConnection(
        address: Address,
        transmitter: Transmitter,
        routes: List<Route>?,
        requireMultiplexed: Boolean
      ): Boolean {
        assert(Thread.holdsLock(this))
        // 遍历所有连接
        for (connection in connections) {
          // 如果是多路复用,但是连接不支持则跳过
          if (requireMultiplexed && !connection.isMultiplexed) continue
          // 检查是否合适,不合适则跳过
          if (!connection.isEligible(address, routes)) continue
          // 找到了,绑定一下
          transmitter.acquireConnectionNoEvents(connection)
          return true
        }
        return false
      }
    
      // Transmitter.kt
        fun acquireConnectionNoEvents(connection: RealConnection) {
        assert(Thread.holdsLock(connectionPool))
        check(this.connection == null)
        this.connection = connection
        // 处理上很简单,把 Transmitter 自身用弱引用添加进 RealConnection 的集合中
        connection.transmitters.add(TransmitterReference(this, callStackTrace))
      }
    
      fun releaseConnectionNoEvents(): Socket? {
        assert(Thread.holdsLock(connectionPool))
            // 从 RealConnection 的请求集合中查找的自己的索引,然后移除并置空 this.connection 的引用
        val index = connection!!.transmitters.indexOfFirst { it.get() == this@Transmitter }
        check(index != -1)
    
        val released = this.connection
        released!!.transmitters.removeAt(index)
        this.connection = null
            // 进一步判断 RealConnection.transmitters 已经空了,则让连接池把这个连接移除
        // 最终返回 socket 等待回收资源
        if (released.transmitters.isEmpty()) {
          released.idleAtNanos = System.nanoTime()
          if (connectionPool.connectionBecameIdle(released)) {
            return released.socket()
          }
        }
        return null
      }
    

    通过前面三个方法,发射器对象通过持有连接的引用,然后持有的请求就会在这个连接处理;

    而连接很可能是处理多个请求的,所以用集合保存了发射器对象的弱引用;

    而每个请求完成的时候那么就需要从这个弱引用集合中移除,当集合中所有的发射器对象都请求完毕之后,那么就可以考虑从连接池中移除这个连接释放资源了。

    5、网络请求拦截器 CallServerInterceptor

    开始真正的向服务器发送请求,读取响应,数据交换。

      @Throws(IOException::class)
      override fun intercept(chain: Interceptor.Chain): Response {
        val realChain = chain as RealInterceptorChain
        val exchange = realChain.exchange()
        val request = realChain.request()
        val requestBody = request.body
        val sentRequestMillis = System.currentTimeMillis()
            // 1、
        exchange.writeRequestHeaders(request)
    
        var responseHeadersStarted = false
        var responseBuilder: Response.Builder? = null
        if (HttpMethod.permitsRequestBody(request.method) && requestBody != null) {
          
          // 2、
          if ("100-continue".equals(request.header("Expect"), ignoreCase = true)) {
            exchange.flushRequest()
            responseHeadersStarted = true
            exchange.responseHeadersStart()
            responseBuilder = exchange.readResponseHeaders(true)
          }
          // 3、
          if (responseBuilder == null) {
            if (requestBody.isDuplex()) {
    
              exchange.flushRequest()
              val bufferedRequestBody = exchange.createRequestBody(request, true).buffer()
              requestBody.writeTo(bufferedRequestBody)
            } else {
              // 4、
              val bufferedRequestBody = exchange.createRequestBody(request, false).buffer()
              requestBody.writeTo(bufferedRequestBody)
              bufferedRequestBody.close()
            }
          } else {
            // 5、
            exchange.noRequestBody()
            if (!exchange.connection()!!.isMultiplexed) {
              exchange.noNewExchangesOnConnection()
            }
          }
        } else {
          exchange.noRequestBody()
        }
    
        if (requestBody == null || !requestBody.isDuplex()) {
          exchange.finishRequest()
        }
        if (!responseHeadersStarted) {
          exchange.responseHeadersStart()
        }
        if (responseBuilder == null) {
          // 6、
          responseBuilder = exchange.readResponseHeaders(false)!!
        }
        var response = responseBuilder
            .request(request)
            .handshake(exchange.connection()!!.handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build()
        var code = response.code
        if (code == 100) {
          // 7、
          response = exchange.readResponseHeaders(false)!!
              .request(request)
              .handshake(exchange.connection()!!.handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build()
          code = response.code
        }
    
        exchange.responseHeadersEnd(response)
    
        response = if (forWebSocket && code == 101) {
          // 8、
          response.newBuilder()
              .body(EMPTY_RESPONSE)
              .build()
        } else {
          // 9、
          response.newBuilder()
              .body(exchange.openResponseBody(response))
              .build()
        }
        // 10、
        if ("close".equals(response.request.header("Connection"), ignoreCase = true) ||
            "close".equals(response.header("Connection"), ignoreCase = true)) {
          exchange.noNewExchangesOnConnection()
        }
        if ((code == 204 || code == 205) && response.body?.contentLength() ?: -1L > 0L) {
          throw ProtocolException(
              "HTTP $code had non-zero Content-Length: ${response.body?.contentLength()}")
        }
        return response
      }
    
    1. 写入请求头。
    2. 如果不为 GET 或者 HEAD 请求,而且请求体不为空,检查到 Expect: 100-continue 包含在请求头中,则先发送请求头,暂时不发送请求体。等待读取响应头,如果这里得到服务器的响应码为 100,则获得的 responseBuilder 为 null,否则不为 null。
    3. responseBuilder 为 null 表示服务器响应 100,那么我就可以继续发送请求体(先发响应头的操作就是为了减少带宽消耗),ps:暂不讨论请求体支持双工的情况,因为没有看到支持双工的子类。
    4. 根据步骤 2、3 那么现在就可以开始发送请求体了。
    5. 走到这一步,表示在步骤 2 中 Expect: 100-continue 的请求没有被服务器同意,那么就不发送请求体,并标记请求完成,针对不可以多路复用的连接则直接标记使用完成。
    6. 没有响应头的,就再次读取响应头,经历过步骤 5 的不会走到这里。
    7. 如果步骤 6 中读取的响应码是 100,就直接尝试读取真正的响应。
    8. 如果是 WebSocket 且响应码为 101(升级协议),则给一个空的响应体,准备升级协议。
    9. 解析响应体的类型、长度以及准备字节流。
    10. 如果请求或者响应头里面包含 Connection:close,则标记连接使用完毕,防止被重用。
    11. 针对响应码为 204(没有新文档)205(没有新内容),但是内容长度又大于 0 的响应,直接抛出异常。

    小结

    没有小结,毕竟是源码分析,看完了自己能理清流程才是真的收获,不能建立整体的概念,太注重别人的总结的话最终会忽略很多细节。

    结语

    简单分析了 OkHttp 的调用流程,以及各个拦截器的实现,还有很多细节没有提到,如果有兴趣可以自己再钻研一下,复杂的东西拆解了就不会那么困难。很多时候阅读源码第一次阅读可能会的复杂,先粗略的了解建立整体的轮廓,再各个击破才是阅读源码的方法。

    另外,喜欢的同学,觉得对自己有帮助的同学,务必请花一点点时间帮我点个赞!点赞之交淡如水,但这个很重要!

    相关文章

      网友评论

          本文标题:OkHttp 4.2.2 源码分析

          本文链接:https://www.haomeiwen.com/subject/ktpvictx.html