美文网首页
OkHttp源码-流程-拦截器分析

OkHttp源码-流程-拦截器分析

作者: 一碗清汤面_ | 来源:发表于2020-04-22 14:07 被阅读0次

    前言

    网络模块在开发中很常见了吧,也可以说是很重要了吧,所以梳理梳理,做做总结 、产出以及记录。
    总的来说,本文基于OkHttp4.5.0,Kotlin版本,做了对OkHttp的基本请求、整体流程、同/异步请求、责任链调度以及CacheInterceptor的分析。

    Http

    网络的底层,万变不离其底层,这部分还是很重要的,以下两篇都写得非常nice
    面试带你飞:这是一份全面的 计算机网络基础 总结攻略
    TCP协议灵魂之问,巩固你的网路底层基础

    OkHttp

    How 基本请求

    fun load() {
          //1.创建请求(包含url,method,headers,body)
          val request = Request
                  .Builder()
                  .url("")
                  .build()
           //2.创建OkHttpClient (包含分发器、拦截器、DNS等)
           val okHttpClient = OkHttpClient.Builder().build()
           //3.创建Call(用于调用请求)
           val newCall = okHttpClient.newCall(request)
           //4.通过异步请求数据
           newCall.enqueue(object :Callback{
               override fun onFailure(call: Call, e: IOException) {}
               override fun onResponse(call: Call, response: Response) {}
           })
           //4.通过同步请求数据
           val response =  newCall.execute()
    }
    
    • 流程图
    okhttp流程图.png

    前面就是基于构造者的创建(主要是量太大,不想占篇幅所以就不展示出来了)
    关键(并不)处在于RealCall的请求

    • 1.异步请求 RealCall#enqueue(responseCallback: Callback)
    //RealCall#enqueue(responseCallback: Callback)  
      override fun enqueue(responseCallback: Callback) {
        synchronized(this) {
          //检查这个call是否已经执行过了,每 call只能被执行一次
          check(!executed) { "Already Executed" }
          executed = true
        }
        //Call被调用时调用,调用了EventListener#callStart()、扩展此类可以监视应用程序的HTTP调用的数量,大小和持续时间
        callStart()
        //1.这里创建了AsyncCall实际是个Runnable后面再提(标记为3)
        client.dispatcher.enqueue(AsyncCall(responseCallback))
      }
    
    //1.Dispatcher#enqueue(call: AsyncCall)
      internal fun enqueue(call: AsyncCall) {
        synchronized(this) {
          //添加到异步调用的队列
          readyAsyncCalls.add(call)
    
          if (!call.call.forWebSocket) {
            val existingCall = findExistingCallWithHost(call.host)
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
          }
        }
        //2
        promoteAndExecute()
      }
    
    //2.Dispatcher#promoteAndExecute()   
    //主要是将[readyAsyncCalls]过渡到[runningAsyncCalls]
     private fun promoteAndExecute(): Boolean {
        this.assertThreadDoesntHoldLock()
    
        val executableCalls = mutableListOf<AsyncCall>()
        val isRunning: Boolean
        synchronized(this) {
          val i = readyAsyncCalls.iterator()
          while (i.hasNext()) {
            val asyncCall = i.next()
    
            if (runningAsyncCalls.size >= this.maxRequests) break // 最大容量. 64
            if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // 最大主机容量. 5
    
            i.remove()
            asyncCall.callsPerHost.incrementAndGet()
            //异步请求各自添加到异步调用队列和集合中
            executableCalls.add(asyncCall)
            runningAsyncCalls.add(asyncCall)
          }
          //同步队列和异步队列中超过一个 代表正在运行
          isRunning = runningCallsCount() > 0
        }
    
        for (i in 0 until executableCalls.size) {
          val asyncCall = executableCalls[i]
          //3.
          asyncCall.executeOn(executorService)
        }
    
        return isRunning
      }
    
    //3.RealCall.kt中的内部类
      internal inner class AsyncCall(
        private val responseCallback: Callback
      ) : Runnable {
        fun executeOn(executorService: ExecutorService) {
          ...
            //有点长 直接说重点了
            //放到线程池中 执行这个Runnable
            executorService.execute(this)
          ...
      
        override fun run() {
          threadName("OkHttp ${redactedUrl()}") {
            ...
            try {
              //兜兜转转 终于调用这个关键方法了
              val response = getResponseWithInterceptorChain()
              signalledCallback = true
              //接口回调数据
              responseCallback.onResponse(this@RealCall, response)
            } catch (e: IOException) {
              if (signalledCallback) {
                Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
              } else {
                responseCallback.onFailure(this@RealCall, e)
              }
            } catch (t: Throwable) {
              cancel()
              if (!signalledCallback) {
                val canceledException = IOException("canceled due to $t")
                canceledException.addSuppressed(t)
                responseCallback.onFailure(this@RealCall, canceledException)
              }
              throw t
            } finally {
              //移除队列
              client.dispatcher.finished(this)
            }
          }
        }
      }
    
    • 2.同步请求 RealCall#execute()
      override fun execute(): Response {
        //同样判断是否执行过
        synchronized(this) {
          check(!executed) { "Already Executed" }
          executed = true
        }
        timeout.enter()
        //同样监听
        callStart()
        try {
          //同样执行
          client.dispatcher.executed(this)
          return getResponseWithInterceptorChain()
        } finally {
          //同样移除
          client.dispatcher.finished(this)
        }
      }
    

    (Real)关键处的关键在于getResponseWithInterceptorChain()方法

      @Throws(IOException::class)
      internal fun getResponseWithInterceptorChain(): Response {
        // 构建完整的拦截器堆栈
        val interceptors = mutableListOf<Interceptor>()
        interceptors += client.interceptors                    //用户自己拦截器
        interceptors += RetryAndFollowUpInterceptor(client)    //失败后的重试和重定向
        interceptors += BridgeInterceptor(client.cookieJar)    //桥接用户的信息和服务器的信息
        interceptors += CacheInterceptor(client.cache)         //处理缓存相关
        interceptors += ConnectInterceptor                     //负责与服务器连接
        if (!forWebSocket) {
          interceptors += client.networkInterceptors           //配置 OkHttpClient 时设置的 
        }
        interceptors += CallServerInterceptor(forWebSocket)    //负责向服务器发送请求数据、从服务器读取响应数据
        //创建拦截链
        val chain = RealInterceptorChain(
            call = this,
            interceptors = interceptors,
            index = 0,
            exchange = null,
            request = originalRequest,
            connectTimeoutMillis = client.connectTimeoutMillis,
            readTimeoutMillis = client.readTimeoutMillis,
            writeTimeoutMillis = client.writeTimeoutMillis
        )
    
        var calledNoMoreExchanges = false
        try {
          //拦截链的执行  1.
          val response = chain.proceed(originalRequest)
          if (isCanceled()) {
            response.closeQuietly()
            throw IOException("Canceled")
          }
          return response
        } catch (e: IOException) {
          calledNoMoreExchanges = true
          throw noMoreExchanges(e) as Throwable
        } finally {
          if (!calledNoMoreExchanges) {
            noMoreExchanges(null)
          }
        }
      }
    
    //1.RealInterceptorChain#proceed(request: Request)
    @Throws(IOException::class)
      override fun proceed(request: Request): Response {
        ...
        // copy出新的拦截链,链中的拦截器集合index+1
        val next = copy(index = index + 1, request = request)
        val interceptor = interceptors[index]
    
        //调用拦截器的intercept(chain: Chain): Response 返回处理后的数据 交由下一个拦截器处理
        @Suppress("USELESS_ELVIS")
        val response = interceptor.intercept(next) ?: throw NullPointerException(
            "interceptor $interceptor returned null")
        ...
        //返回最终的响应体
        return response
      }
    

    这里采用责任链的模式来使每个功能分开,每个Interceptor自行完成自己的任务,并且将不属于自己的任务交给下一个,简化了各自的责任和逻辑,和View中的事件分发机制类似.

    • 再来一张时序图
    okhttp 时序图.jpg

    OkHttp关键中的核心就是这每一个拦截器 具体怎么实现还需要look look,这里就看一个CacheInterceptor吧

    • CacheInterceptor.kt
      @Throws(IOException::class)
      override fun intercept(chain: Interceptor.Chain): Response {
        //在Cache(DiskLruCache)类中 通过request.url匹配response
        val cacheCandidate = cache?.get(chain.request())
        //记录当前时间点
        val now = System.currentTimeMillis()
        //缓存策略 有两种类型 
        //networkRequest 网络请求
        //cacheResponse 缓存的响应
        val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()
        val networkRequest = strategy.networkRequest
        val cacheResponse = strategy.cacheResponse
        //计算请求次数和缓存次数
        cache?.trackResponse(strategy)
        ...
        // 如果 禁止使用网络 并且 缓存不足,返回504和空body的Response
        if (networkRequest == null && cacheResponse == null) {
          return Response.Builder()
              .request(chain.request())
              .protocol(Protocol.HTTP_1_1)
              .code(HTTP_GATEWAY_TIMEOUT)
              .message("Unsatisfiable Request (only-if-cached)")
              .body(EMPTY_RESPONSE)
              .sentRequestAtMillis(-1L)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build()
        }
    
        // 如果策略中不能使用网络,就把缓存中的response封装返回
        if (networkRequest == null) {
          return cacheResponse!!.newBuilder()
              .cacheResponse(stripBody(cacheResponse))
              .build()
        }
        //调用拦截器process从网络获取数据
        var networkResponse: Response? = null
        try {
          networkResponse = chain.proceed(networkRequest)
        } finally {
          // If we're crashing on I/O or otherwise, don't leak the cache body.
          if (networkResponse == null && cacheCandidate != null) {
            cacheCandidate.body?.closeQuietly()
          }
        }
      
        //如果有缓存的Response
        if (cacheResponse != null) {
          //如果网络请求返回code为304 即说明资源未修改
          if (networkResponse?.code == HTTP_NOT_MODIFIED) {
            //直接封装封装缓存的Response返回即可
            val response = cacheResponse.newBuilder()
                .headers(combine(cacheResponse.headers, networkResponse.headers))
                .sentRequestAtMillis(networkResponse.sentRequestAtMillis)
                .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis)
                .cacheResponse(stripBody(cacheResponse))
                .networkResponse(stripBody(networkResponse))
                .build()
    
            networkResponse.body!!.close()
    
            // Update the cache after combining headers but before stripping the
            // Content-Encoding header (as performed by initContentStream()).
            cache!!.trackConditionalCacheHit()
            cache.update(cacheResponse, response)
            return response
          } else {
            cacheResponse.body?.closeQuietly()
          }
        }
    
        val response = networkResponse!!.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build()
    
        if (cache != null) {
          //判断是否具有主体 并且 是否可以缓存供后续使用
          if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {
            // 加入缓存中
            val cacheRequest = cache.put(response)
            return cacheWritingResponse(cacheRequest, response)
          }
          //如果请求方法无效 就从缓存中remove掉
          if (HttpMethod.invalidatesCache(networkRequest.method)) {
            try {
              cache.remove(networkRequest)
            } catch (_: IOException) {
              // The cache cannot be written.
            }
          }
        }
        
        return response
      }
    

    总结

    • OkHttpClient为Request创建RealCall
    • RealCall的enqueue(responseCallback: Callback)异步请求通过Dispatcher实现,和execute()同步请求一样最终调用getResponseWithInterceptorChain()
    • getResponseWithInterceptorChain()通过责任链模式用每个不同职责Interceptor处理Response返回数据

    最后

    相关文章

      网友评论

          本文标题:OkHttp源码-流程-拦截器分析

          本文链接:https://www.haomeiwen.com/subject/yzwrvhtx.html