美文网首页
Android OkHttp 源码阅读笔记(一)

Android OkHttp 源码阅读笔记(一)

作者: BlueSocks | 来源:发表于2023-12-04 17:58 被阅读0次

    OkHttp 源码阅读笔记(一)

    OkHttp 的大名不用多说了,本篇文章是对 OkHttp 的源码分析文章的第一篇。我后续分析源码的路径也都是针对 Http 1.1 来分析,略过 Http 2WebSocket 逻辑,基于的 OkHttp 版本是 4.11.0

    OkHttp 的简单使用

    val client: OkHttpClient = OkHttpClient
        .Builder()
        .build()
    
    val request = Request.Builder()
        .get()
        .url("https://www.google.com")
        .build()
    
    val call = client.newCall(request)
    
    /**
     * 异步请求方式
     */
    call.enqueue(responseCallback = object : Callback {
        override fun onFailure(call: Call, e: IOException) {
            
        }
    
        override fun onResponse(call: Call, response: Response) {
            
        }
    })
    
    /**
     * 同步请求方式
     */
    //val response = call.execute()
    
    

    首先通过创建一个 OkHttpClient 实例(通常在一般项目中都是只创建一个 OkHttp 实例),创建一个 Request 实例,Request 实例中包含我们 Http 请求的所有参数,通过 OkHttpClient#newCall() 方法传入创建的 Request 实例,会返回一个 Call 实例,通过 Call 实例就可以发送一次 Http 请求,请求的方式有两种,调用 execute() 方法是同步请求,调用 enqueue() 方法是同步请求。

    同步请求

    在介绍同步请求前,先看看 OkHttpClient#newCall() 创建 Call 的方法:

    /** Prepares the [request] to be executed at some point in the future. */
    override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
    
    

    Call 的实现类是 RealCall,我们看看 RealCall 执行同步调用的方法是 execute() 我们来看看它的实现:

    override fun execute(): Response {
      // 判断是否已经执行过了,如果已经执行过了就报错
      check(executed.compareAndSet(false, true)) { "Already Executed" }
      
      // 超时任务
      timeout.enter()
      callStart()
      try {
        // 将本次同步请求添加到 Dispatcher 中
        client.dispatcher.executed(this)
        // 真正执行请求
        return getResponseWithInterceptorChain()
      } finally {
        // 从 Dispatcher 中移除本次同步请求
        client.dispatcher.finished(this)
      }
    }
    
    
    • 一个 RealCall 最多只能执行一次
    • timeout.enter() 用来计算超时任务,到达最大时间就会 cancel 掉本次请求,然后报错,这个时间用的是 OkHttpClient#callTimeoutMillis 参数,默认值是 0,也就是不计算超时。
    • 请求中的同步任务会被添加到 Dispatcher 中,这个参数也是可以在创建 OkHttpClient 的时候自定义。它主要处理异步调用的任务调度,异步调用时会详细介绍。
    • getResponseWithInterceptorChain() 就是执行真正的网络请求(也就是在当前线程直接执行),后面详细介绍。

    我们看到上面还调用了 callStart() 方法,这里简单看一下:

    private fun callStart() {
      this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
      eventListener.callStart(this)
    }
    
    

    这个 eventListener 可以在创建 OkHttp 实例的时候自定义它,通过它可以监听到请求的每个关键节点,比如这个 callStart() 方法就表示请求开始的回调,后面还有很多的其他关键节点的回调,比如创建 SocketDNS 查询、TLS 握手等等,我们可以在 OkHttp 很多处的源码看见它,后面的源码分析我也都会再说起它。

    这里简单看看 Dispatcher#execute() 方法 和 Dispatcher#finished() 方法,来看看 Dispatcher 如何记录同步请求任务。

    /** Used by [Call.execute] to signal it is in-flight. */
    @Synchronized internal fun executed(call: RealCall) {
      // 添加到同步队列中
      runningSyncCalls.add(call)
    }
    
    /** Used by [Call.execute] to signal completion. */
    internal fun finished(call: RealCall) {
      finished(runningSyncCalls, call)
    }
    
    private fun <T> finished(calls: Deque<T>, call: T) {
      val idleCallback: Runnable?
      synchronized(this) {
        // 请队列中移除
        if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
        idleCallback = this.idleCallback
      }
      
      // 后面分析 `promoteAndExecute()` 方法
      val isRunning = promoteAndExecute()
    
      if (!isRunning && idleCallback != null) {
        // 当没有新的任务时,执行一次 `idleCallback`,表示当前 `OkHttp` 空闲了,和 Android 中的 `IdleHandler` 类似。  
        idleCallback.run()
      }
    }
    
    

    异步请求

    趁热打铁,我们接着看看 RealCall#enqueue() 方法执行的异步调用:

    override fun enqueue(responseCallback: Callback) {
      check(executed.compareAndSet(false, true)) { "Already Executed" }
    
      callStart()
      // 向 `Dispatcher` 中添加异步任务
      client.dispatcher.enqueue(AsyncCall(responseCallback))
    }
    
    

    异步任务的实现类是 AsyncCall,它是 RealCall 中的一个内部类,我们看看 Dispatcher#enqueue 方法:

    internal fun enqueue(call: AsyncCall) {
      synchronized(this) {
        // 添加到等待队列
        readyAsyncCalls.add(call)
    
        // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
        // the same host.
        if (!call.call.forWebSocket) {
          // 如果不是 WebSocket,会更新当前的域名正在请求的数量
          val existingCall = findExistingCallWithHost(call.host)
          if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
        }
      }
      // 检查等待中的任务是否可以执行
      promoteAndExecute()
    }
    
    

    首先把任务添加到等待队列中;然后获取当前域名正在请求的任务的数量(这个数量我后面会分析),把这个数量更新在 RealCall 中;调用 promoteAndExecute() 执行等待队列中的可以执行的任务。
    看看 promoteAndExecute() 方法的源码:

    private fun promoteAndExecute(): Boolean {
      this.assertThreadDoesntHoldLock()
    
      val executableCalls = mutableListOf<AsyncCall>()
      val isRunning: Boolean
      synchronized(this) {
        val i = readyAsyncCalls.iterator()
        // 遍历等待的任务
        while (i.hasNext()) {
          val asyncCall = i.next()
          
          // 如果当前执行中的任务大于了,最大的限制任务,退出遍历
          if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
          // 如果当前任务的单个域名执行执行的个数大于了最大的限制,跳过当前 asyncCall
          if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
    
          i.remove()
          asyncCall.callsPerHost.incrementAndGet()
          executableCalls.add(asyncCall)
          
          // 移动当前任务到正在执行的任务中
          runningAsyncCalls.add(asyncCall)
        }
        isRunning = runningCallsCount() > 0
      }
    
      // Avoid resubmitting if we can't logically progress
      // particularly because RealCall handles a RejectedExecutionException
      // by executing on the same thread.
      if (executorService.isShutdown) {
          // ...
      } else {
        // 在线程池上执行这些任务
        for (i in 0 until executableCalls.size) {
          val asyncCall = executableCalls[i]
          asyncCall.executeOn(executorService)
        }
      }
    
      return isRunning
    }
    
    

    Dispatcher 默认限制同时最多执行 64 个任务(可自定义),同一个域名最多有 5 个任务同时执行(可自定义)。如果没有达到限制,就会把等待中的任务移动到正在执行的任务中,最后把这些任务在 executorService 上执行(上面的限制只是对异步任务有效,同步任务不受影响),等待中的任务需要等待其他任务执行完成后调用 promoteAndExecute() 方法来检测等待中的任务是否达到执行的要求(还有其他的逻辑也会调用 promoteAndExecute() 方法去判断),可以执行的话移动到正在执行中的任务。

    我们看看 executorService 的创建:

    @get:Synchronized
    @get:JvmName("executorService") val executorService: ExecutorService
      get() {
        if (executorServiceOrNull == null) {
          executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
              SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
        }
        return executorServiceOrNull!!
      }
    
    

    线程池是一个无限大小的缓存线程池,线程的回收时间是 60s。

    我们再来看看 AsyncCall#executeOn() 方法:

    fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()
    
      var success = false
      try {
        // 线程池上执行任务,入口函数为 run()
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        failRejected(e)
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }
    
    
      override fun run() {
        threadName("OkHttp ${redactedUrl()}") {
          var signalledCallback = false
          timeout.enter()
          try {
            // 真正执行
            val response = getResponseWithInterceptorChain()
            signalledCallback = true
            // 回调成功
            responseCallback.onResponse(this@RealCall, response)
          } catch (e: IOException) {
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
            } else {
              // 回调失败
              responseCallback.onFailure(this@RealCall, e)
            }
          } catch (t: Throwable) {
            cancel()
            if (!signalledCallback) {
              val canceledException = IOException("canceled due to $t")
              canceledException.addSuppressed(t)
              // 回调失败
              responseCallback.onFailure(this@RealCall, canceledException)
            }
            throw t
          } finally {
            // 将任务从 Dispatcher 中移除
            client.dispatcher.finished(this)
          }
        }
      }
    }
    
    

    上面的代码也都是比较简单,就不多说了,然后他的真正的 Http 请求也是通过 getResponseWithInterceptorChain() 方法来完成。

    拦截器链的执行

    我们上面讲同步调用和异步调用中都讲到了最终的请求都是通过 getResponseWithInterceptorChain() 方法来完成。

    @Throws(IOException::class)
    internal fun getResponseWithInterceptorChain(): Response {
      // Build a full stack of interceptors.
      // 自定义普通拦截器
      val interceptors = mutableListOf<Interceptor>()
      interceptors += client.interceptors
      // 系统定义的拦截器
      interceptors += RetryAndFollowUpInterceptor(client)
      interceptors += BridgeInterceptor(client.cookieJar)
      interceptors += CacheInterceptor(client.cache)
      interceptors += ConnectInterceptor
      // 不是 WebSocket 的情况下
      if (!forWebSocket) {
        // 自定义网络拦截器
        interceptors += client.networkInterceptors
      }
      // 真正的通过 Socket 和 Server 来交换信息。
      interceptors += CallServerInterceptor(forWebSocket)
    
      // 构建拦截器链
      val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
      )
    
      var calledNoMoreExchanges = false
      try {
        // 开始执行拦截器链,最后一个拦截器执完成后就获取到了结果
        val response = chain.proceed(originalRequest)
        if (isCanceled()) {
          response.closeQuietly()
          throw IOException("Canceled")
        }
        return response
      } catch (e: IOException) {
        calledNoMoreExchanges = true
        throw noMoreExchanges(e) as Throwable
      } finally {
        if (!calledNoMoreExchanges) {
          noMoreExchanges(null)
        }
      }
    }
    
    

    拦截器链排在最前面的是自定义的普通拦截器,然后是 4 个普通的系统拦截器,再然后是自定义的网络拦截器(非 WebSocket,拦截器执行时,可以保证对应的 Socket 已经创建完成,而且在它之中必须调用一次 RealInterceptorChain#procced() 方法,也只能调用一次),最后是系统的 CallServerInterceptor 拦截器。我这里简单介绍一下系统拦截器的功能,具体的实现代码,后面的文章介绍。

    • RetryAndFollowUpInterceptor: 重试和重定向拦截器
    • BridgeInterceptor: 主要处理 RequestHeaderResponseHeader
    • CacheInterceptor: 主要处理 Http 中的缓存
    • ConnectionInterceptor: 它是一个单例,它是获取(创建)一个可用的 Socket 网络连接,也就是在它之后的拦截器执行时,网络链接就已经创建。
    • CallServerInterceptor: 通过 Socket 来和服务器交换数据

    所有的 Interceptor 都会被放入到 RealInterceptorChain,然后通过其 proceed() 方法开始执行,其中 index 就是下一个要执行的 Interceptorindex。 这里还有一个比较重要的参数就是 Exchange,如果它为空就表示还没有创建网络连接,反之就是已经创建了网络连接,我们上面也说到 ConnectionInterceptor,在 ConnnectionInterceport 执行后,Exchange 对象就不为空了,我们来看看 RealInterceptorChain#proceed() 方法的源码:

    @Throws(IOException::class)
    override fun proceed(request: Request): Response {
      check(index < interceptors.size)
      // 调用次数记录
      calls++
    
      // exchange 不为空表示为网络拦截器
      if (exchange != null) {
        // 网络拦截器不能修改请求的域名
        check(exchange.finder.routePlanner.sameHostAndPort(request.url)) {
          "network interceptor ${interceptors[index - 1]} must retain the same host and port"
        }
        // 网络拦截器必须请求一次 proceed() 方法,也只能请求一次。
        check(calls == 1) {
          "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
        }
      }
    
      // Call the next interceptor in the chain.
      // 复制下一次要请求需要的 RealInterceptorChain 对象,这里的 index + 1 了,也就表示下一次的 Chain 执行的 Interceptor 为当前 `Interceptor` 的下一个。
      val next = copy(index = index + 1, request = request)
      val interceptor = interceptors[index]
    
      @Suppress("USELESS_ELVIS")
      // 执行拦截器的拦截方法,得到最后的结果然后返回。
      val response = interceptor.intercept(next) ?: throw NullPointerException(
          "interceptor $interceptor returned null")
    
      if (exchange != null) {
        check(index + 1 >= interceptors.size || next.calls == 1) {
          "network interceptor $interceptor must call proceed() exactly once"
        }
      }
    
      return response
    }
    
    

    这里要简单说明一下在网络拦截器中不能修改域名,这很好理解,因为这个时候网络连接已经建立,也就是域名已经解析完成,如果你修改了网络域名,也就是说上面的网络连接对应的域名和你修改后的不一样,当然上面的网络连接是不可用的,所以 OkHttp 禁止在网络拦截器中修改域名;这里还限制了网络拦截器必须调用一次 proceed() 方法。

    这里会构建一个新的 RealInterceptorChain 对象,和原来的对象相比,index 值加 1,reqeust 使用外部传入的 reqeust。然后调用当前需要执行的 Interceptor,然后调用它的 intercept() 方法,把上面新创建的 RealInterceptorChain 对象传入进去。

    最后

    本篇文章介绍了 OkHttp 同步网络请求和异步网络请求,还介绍了异步网络请求时 Dispatcher 如何调度请求任务,还介绍了 RealInterceptorChain 拦截器链如何工作的。后面的文章还会介绍网络链接如何创建,默认的拦截器具体的工作原理。

    相关文章

      网友评论

          本文标题:Android OkHttp 源码阅读笔记(一)

          本文链接:https://www.haomeiwen.com/subject/lqlrgdtx.html