美文网首页Android开发经验谈Android开发
Android okhttp4 kotlin版源码浅析

Android okhttp4 kotlin版源码浅析

作者: 折剑游侠 | 来源:发表于2020-07-08 16:17 被阅读0次
    本文基于源码版本---4.7.2

    实际上kotlin版相较于java版实现并无不同,调用流程基本一致,仅语法差异。

    日常使用okhttp发送异步请求

        OkHttpClient().newCall(Request.Builder().build()).enqueue(object : Callback {
            override fun onFailure(call: Call, e: IOException) {
    
            }
    
            override fun onResponse(call: Call, response: Response) {
    
            }
    
        })
    

    OkHttpClient().newCall(request)返回RealCall

    class RealCall(
      val client: OkHttpClient,
      /** The application's original request unadulterated by redirects or auth headers. */
      val originalRequest: Request,
      val forWebSocket: Boolean
    ) 
    

    RealCall.enqueue()

      override fun enqueue(responseCallback: Callback) {
        synchronized(this) {
          check(!executed) { "Already Executed" }
          executed = true
        }
        callStart()
        client.dispatcher.enqueue(AsyncCall(responseCallback))
      }
    

    调用到Dispatcher.enqueue();传参AsyncCall()

    AsyncCall

      internal inner class AsyncCall(
        private val responseCallback: Callback
      ) : Runnable {
        @Volatile var callsPerHost = AtomicInteger(0)
          private set
    
        fun reuseCallsPerHostFrom(other: AsyncCall) {
          this.callsPerHost = other.callsPerHost
        }
    
        val host: String
          get() = originalRequest.url.host
    
        val request: Request
            get() = originalRequest
    
        val call: RealCall
            get() = this@RealCall
    
        fun executeOn(executorService: ExecutorService) {
         ...
        }
    
        override fun run() {
          ...
        }
      }
    

    AsyncCall对RealCall进行包装,实现Runnable接口重写了run()方法

    Dispatcher.enqueue(AsyncCall)

      internal fun enqueue(call: AsyncCall) {
        synchronized(this) {
          readyAsyncCalls.add(call)
    
          // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
          // the same host.
          if (!call.call.forWebSocket) {
            val existingCall = findExistingCallWithHost(call.host)
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
          }
        }
        promoteAndExecute()
      }
    

    先说说调度器Dispatcher,这里只贴重点。

    class Dispatcher constructor() {
      //请求并发数
      @get:Synchronized var maxRequests = 64
        set(maxRequests) {
          require(maxRequests >= 1) { "max < 1: $maxRequests" }
          synchronized(this) {
            field = maxRequests
          }
          promoteAndExecute()
        }
    
      //请求主机数
      @get:Synchronized var maxRequestsPerHost = 5
        set(maxRequestsPerHost) {
          require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
          synchronized(this) {
            field = maxRequestsPerHost
          }
          promoteAndExecute()
        }
    
      //执行任务的线程池,等同于CacheThreadPool
      @get:Synchronized
      @get:JvmName("executorService") val executorService: ExecutorService
        get() {
          if (executorServiceOrNull == null) {
            executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
                SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
          }
          return executorServiceOrNull!!
        }
    
      //异步等待队列
      private val readyAsyncCalls = ArrayDeque<AsyncCall>()
    
      //异步任务队列
      private val runningAsyncCalls = ArrayDeque<AsyncCall>()
    
      //同步任务队列
      private val runningSyncCalls = ArrayDeque<RealCall>()
    }
    

    回到Dispatcher.enqueue()中,readyAsyncCalls.add(call)将AsyncCall加入等待队列。

    然后调用Dispatcher.promoteAndExecute()

      private fun promoteAndExecute(): Boolean {
        this.assertThreadDoesntHoldLock()
    
        val executableCalls = mutableListOf<AsyncCall>()
        val isRunning: Boolean
        synchronized(this) {
          val i = readyAsyncCalls.iterator()
          while (i.hasNext()) {
            val asyncCall = i.next()
    
            if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
            if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
    
            i.remove()
            asyncCall.callsPerHost.incrementAndGet()
            executableCalls.add(asyncCall)
            runningAsyncCalls.add(asyncCall)
          }
          isRunning = runningCallsCount() > 0
        }
    
        for (i in 0 until executableCalls.size) {
          val asyncCall = executableCalls[i]
          //重点
          asyncCall.executeOn(executorService)
        }
    
        return isRunning
      }
    

    这里判断当前请求数小于最大并发数64,请求host小于最大请求host数5。

    符合要求将AsyncCall从等待队列移除,add进异步任务队列runningAsyncCalls。

    然后调用AsyncCall.executeOn(executorService)

    executorService即上文Dispatcher中初始化的线程池。

        fun executeOn(executorService: ExecutorService) {
          client.dispatcher.assertThreadDoesntHoldLock()
    
          var success = false
          try {
            executorService.execute(this)
            success = true
          } catch (e: RejectedExecutionException) {
            val ioException = InterruptedIOException("executor rejected")
            ioException.initCause(e)
            noMoreExchanges(ioException)
            responseCallback.onFailure(this@RealCall, ioException)
          } finally {
            if (!success) {
              client.dispatcher.finished(this)
            }
          }
        }
    

    executorService.execute(this)

    此处this即为AsyncCall,前面说过AsyncCall实现了Runnable接口。

    AsyncCall.run()

        override fun run() {
          threadName("OkHttp ${redactedUrl()}") {
            var signalledCallback = false
            timeout.enter()
            try {
              val response = getResponseWithInterceptorChain()
              signalledCallback = true
              responseCallback.onResponse(this@RealCall, response)
            } catch (e: IOException) {
              if (signalledCallback) {
                // Do not signal the callback twice!
                Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
              } else {
                responseCallback.onFailure(this@RealCall, e)
              }
            } catch (t: Throwable) {
              cancel()
              if (!signalledCallback) {
                val canceledException = IOException("canceled due to $t")
                canceledException.addSuppressed(t)
                responseCallback.onFailure(this@RealCall, canceledException)
              }
              throw t
            } finally {
              client.dispatcher.finished(this)
            }
          }
        }
      }
    

    重点val response = getResponseWithInterceptorChain()

    很明显通过拦截器链处理得到请求结果response,请求相关逻辑自然都在拦截器中。

    RealCall.getResponseWithInterceptorChain()

      internal fun getResponseWithInterceptorChain(): Response {
        //各种拦截器
        val interceptors = mutableListOf<Interceptor>()
        interceptors += client.interceptors
        interceptors += RetryAndFollowUpInterceptor(client)
        interceptors += BridgeInterceptor(client.cookieJar)
        interceptors += CacheInterceptor(client.cache)
        interceptors += ConnectInterceptor
        if (!forWebSocket) {
          interceptors += client.networkInterceptors
        }
        interceptors += CallServerInterceptor(forWebSocket)
    
        val chain = RealInterceptorChain(
            call = this,
            interceptors = interceptors,
            index = 0,
            exchange = null,
            request = originalRequest,
            connectTimeoutMillis = client.connectTimeoutMillis,
            readTimeoutMillis = client.readTimeoutMillis,
            writeTimeoutMillis = client.writeTimeoutMillis
        )
    
        var calledNoMoreExchanges = false
        try {
          val response = chain.proceed(originalRequest)
          if (isCanceled()) {
            response.closeQuietly()
            throw IOException("Canceled")
          }
          return response
        } catch (e: IOException) {
          calledNoMoreExchanges = true
          throw noMoreExchanges(e) as Throwable
        } finally {
          if (!calledNoMoreExchanges) {
            noMoreExchanges(null)
          }
        }
      }
    

    val response = chain.proceed(originalRequest)

    RealInterceptorChain.proceed()

     @Throws(IOException::class)
      override fun proceed(request: Request): Response {
        check(index < interceptors.size)
    
        calls++
    
        if (exchange != null) {
          check(exchange.finder.sameHostAndPort(request.url)) {
            "network interceptor ${interceptors[index - 1]} must retain the same host and port"
          }
          check(calls == 1) {
            "network interceptor ${interceptors[index - 1]} must call proceed() exactly once"
          }
        }
    
        // Call the next interceptor in the chain.
        val next = copy(index = index + 1, request = request)
        val interceptor = interceptors[index]
    
        @Suppress("USELESS_ELVIS")
        val response = interceptor.intercept(next) ?: throw NullPointerException(
            "interceptor $interceptor returned null")
    
        if (exchange != null) {
          check(index + 1 >= interceptors.size || next.calls == 1) {
            "network interceptor $interceptor must call proceed() exactly once"
          }
        }
    
        check(response.body != null) { "interceptor $interceptor returned a response with no body" }
    
        return response
      }
    

    简单说下拦截器的链式调用:

    • RealInterceptorChain保存了拦截器数组interceptors。
    • RealInterceptorChain.proceed()方法调用拦截器interceptor.intercept()方法处理request。
    • interceptor处理完request后继续调用RealInterceptorChain.proceed()方法。
    • proceed()方法按照interceptors中拦截器顺序依次调用,直到interceptors中所有拦截器处理完毕,返回结果response到上层拦截器。
    • intercept()方法返回值即为response,逐级返回。

    这也是为什么拦截器中可以同时处理request和response。

    接下来看看okhttp默认添加的拦截器

        val interceptors = mutableListOf<Interceptor>()
        interceptors += client.interceptors
        interceptors += RetryAndFollowUpInterceptor(client)
        interceptors += BridgeInterceptor(client.cookieJar)
        interceptors += CacheInterceptor(client.cache)
        interceptors += ConnectInterceptor
        if (!forWebSocket) {
          interceptors += client.networkInterceptors
        }
        interceptors += CallServerInterceptor(forWebSocket)
    
    1. client.interceptors:用户配置的拦截器。一般会统一添加请求头,打印日志等。
    2. RetryAndFollowUpInterceptor:失败重试,重定向。
    3. BridgeInterceptor:配置请求头。
    4. CacheInterceptor:用于缓存。
    5. ConnectInterceptor:连接服务器。
    6. networkInterceptors:用户配置的network拦截器。
    7. CallServerInterceptor:发起网络请求,获取结果。

    相关文章

      网友评论

        本文标题:Android okhttp4 kotlin版源码浅析

        本文链接:https://www.haomeiwen.com/subject/uryqcktx.html