美文网首页Android开发经验谈Android开发
Android okhttp4 kotlin版源码浅析

Android okhttp4 kotlin版源码浅析

作者: 折剑游侠 | 来源:发表于2020-07-08 16:17 被阅读0次
本文基于源码版本---4.7.2

实际上kotlin版相较于java版实现并无不同,调用流程基本一致,仅语法差异。

日常使用okhttp发送异步请求

    OkHttpClient().newCall(Request.Builder().build()).enqueue(object : Callback {
        override fun onFailure(call: Call, e: IOException) {

        }

        override fun onResponse(call: Call, response: Response) {

        }

    })

OkHttpClient().newCall(request)返回RealCall

class RealCall(
  val client: OkHttpClient,
  /** The application's original request unadulterated by redirects or auth headers. */
  val originalRequest: Request,
  val forWebSocket: Boolean
) 

RealCall.enqueue()

  override fun enqueue(responseCallback: Callback) {
    synchronized(this) {
      check(!executed) { "Already Executed" }
      executed = true
    }
    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }

调用到Dispatcher.enqueue();传参AsyncCall()

AsyncCall

  internal inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
    @Volatile var callsPerHost = AtomicInteger(0)
      private set

    fun reuseCallsPerHostFrom(other: AsyncCall) {
      this.callsPerHost = other.callsPerHost
    }

    val host: String
      get() = originalRequest.url.host

    val request: Request
        get() = originalRequest

    val call: RealCall
        get() = this@RealCall

    fun executeOn(executorService: ExecutorService) {
     ...
    }

    override fun run() {
      ...
    }
  }

AsyncCall对RealCall进行包装,实现Runnable接口重写了run()方法

Dispatcher.enqueue(AsyncCall)

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

先说说调度器Dispatcher,这里只贴重点。

class Dispatcher constructor() {
  //请求并发数
  @get:Synchronized var maxRequests = 64
    set(maxRequests) {
      require(maxRequests >= 1) { "max < 1: $maxRequests" }
      synchronized(this) {
        field = maxRequests
      }
      promoteAndExecute()
    }

  //请求主机数
  @get:Synchronized var maxRequestsPerHost = 5
    set(maxRequestsPerHost) {
      require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
      synchronized(this) {
        field = maxRequestsPerHost
      }
      promoteAndExecute()
    }

  //执行任务的线程池,等同于CacheThreadPool
  @get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
    get() {
      if (executorServiceOrNull == null) {
        executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
            SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
      }
      return executorServiceOrNull!!
    }

  //异步等待队列
  private val readyAsyncCalls = ArrayDeque<AsyncCall>()

  //异步任务队列
  private val runningAsyncCalls = ArrayDeque<AsyncCall>()

  //同步任务队列
  private val runningSyncCalls = ArrayDeque<RealCall>()
}

回到Dispatcher.enqueue()中,readyAsyncCalls.add(call)将AsyncCall加入等待队列。

然后调用Dispatcher.promoteAndExecute()

  private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      //重点
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

这里判断当前请求数小于最大并发数64,请求host小于最大请求host数5。

符合要求将AsyncCall从等待队列移除,add进异步任务队列runningAsyncCalls。

然后调用AsyncCall.executeOn(executorService)

executorService即上文Dispatcher中初始化的线程池。

    fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher.finished(this)
        }
      }
    }

executorService.execute(this)

此处this即为AsyncCall,前面说过AsyncCall实现了Runnable接口。

AsyncCall.run()

    override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)
        }
      }
    }
  }

重点val response = getResponseWithInterceptorChain()

很明显通过拦截器链处理得到请求结果response,请求相关逻辑自然都在拦截器中。

RealCall.getResponseWithInterceptorChain()

  internal fun getResponseWithInterceptorChain(): Response {
    //各种拦截器
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)

    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

val response = chain.proceed(originalRequest)

RealInterceptorChain.proceed()

 @Throws(IOException::class)
  override fun proceed(request: Request): Response {
    check(index < interceptors.size)

    calls++

    if (exchange != null) {
      check(exchange.finder.sameHostAndPort(request.url)) {
        "network interceptor ${interceptors[index - 1]} must retain the same host and port"
      }
      check(calls == 1) {
        "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
      }
    }

    // Call the next interceptor in the chain.
    val next = copy(index = index + 1, request = request)
    val interceptor = interceptors[index]

    @Suppress("USELESS_ELVIS")
    val response = interceptor.intercept(next) ?: throw NullPointerException(
        "interceptor $interceptor returned null")

    if (exchange != null) {
      check(index + 1 >= interceptors.size || next.calls == 1) {
        "network interceptor $interceptor must call proceed() exactly once"
      }
    }

    check(response.body != null) { "interceptor $interceptor returned a response with no body" }

    return response
  }

简单说下拦截器的链式调用:

  • RealInterceptorChain保存了拦截器数组interceptors。
  • RealInterceptorChain.proceed()方法调用拦截器interceptor.intercept()方法处理request。
  • interceptor处理完request后继续调用RealInterceptorChain.proceed()方法。
  • proceed()方法按照interceptors中拦截器顺序依次调用,直到interceptors中所有拦截器处理完毕,返回结果response到上层拦截器。
  • intercept()方法返回值即为response,逐级返回。

这也是为什么拦截器中可以同时处理request和response。

接下来看看okhttp默认添加的拦截器

    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors
    interceptors += RetryAndFollowUpInterceptor(client)
    interceptors += BridgeInterceptor(client.cookieJar)
    interceptors += CacheInterceptor(client.cache)
    interceptors += ConnectInterceptor
    if (!forWebSocket) {
      interceptors += client.networkInterceptors
    }
    interceptors += CallServerInterceptor(forWebSocket)
  1. client.interceptors:用户配置的拦截器。一般会统一添加请求头,打印日志等。
  2. RetryAndFollowUpInterceptor:失败重试,重定向。
  3. BridgeInterceptor:配置请求头。
  4. CacheInterceptor:用于缓存。
  5. ConnectInterceptor:连接服务器。
  6. networkInterceptors:用户配置的network拦截器。
  7. CallServerInterceptor:发起网络请求,获取结果。

相关文章

网友评论

    本文标题:Android okhttp4 kotlin版源码浅析

    本文链接:https://www.haomeiwen.com/subject/uryqcktx.html