美文网首页
Okhttp同步与异步二

Okhttp同步与异步二

作者: 浮游生物的幻想 | 来源:发表于2021-10-28 11:04 被阅读0次

之前根据Okhttp使用流程,逐块看了源码内的相关内容介绍。现在去看同步与异步之间的差异

   val mOk = OkHttpClient()
        val request = Request.Builder()
            .url("请求地址")
            .get()//请求方式
            .build()
        val call = mOk.newCall(request)

同步分析:

call.execute()

call.execute()开启同步请求,返回Response,点进去看看

  override fun execute(): Response {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    timeout.enter()
    callStart()
    try {
      client.dispatcher.executed(this)
      return getResponseWithInterceptorChain()
    } finally {
      client.dispatcher.finished(this)
    }
  }
  • timeout.enter(),内部代码如下:如果没有超时也没有结束? 不允许进入,阻塞
  fun enter() {
    val timeoutNanos = timeoutNanos()
    val hasDeadline = hasDeadline()
    if (timeoutNanos == 0L && !hasDeadline) {
      return
    }
    scheduleTimeout(this, timeoutNanos, hasDeadline)
  }
  • 事件侦听器EventListener回调callStart()
  private fun callStart() {
    this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
    eventListener.callStart(this)
  }
 /** Used by [Call.execute] to signal it is in-flight. */
 @Synchronized internal fun executed(call: RealCall) {
   runningSyncCalls.add(call)
 }
  • 构建一个拦截器链getResponseWithInterceptorChain() 返回Response
 internal fun getResponseWithInterceptorChain(): Response {
    // Build a full stack of interceptors.
    val interceptors = mutableListOf<Interceptor>()
    interceptors += client.interceptors //自定义
    interceptors += RetryAndFollowUpInterceptor(client) //错误和重定向
    interceptors += BridgeInterceptor(client.cookieJar) //桥梁:应用程序过渡到网络
    interceptors += CacheInterceptor(client.cache) //缓存
    interceptors += ConnectInterceptor //连接
    if (!forWebSocket) {
      interceptors += client.networkInterceptors //网络
    }
    interceptors += CallServerInterceptor(forWebSocket) //对服务器进行网络调用

    //承载整个拦截器链的具体拦截器链
    val chain = RealInterceptorChain(
        call = this,
        interceptors = interceptors,
        index = 0,
        exchange = null,
        request = originalRequest,
        connectTimeoutMillis = client.connectTimeoutMillis,
        readTimeoutMillis = client.readTimeoutMillis,
        writeTimeoutMillis = client.writeTimeoutMillis
    )

    //返回response
    var calledNoMoreExchanges = false
    try {
      val response = chain.proceed(originalRequest)
      if (isCanceled()) {
        response.closeQuietly()
        throw IOException("Canceled")
      }
      return response
    } catch (e: IOException) {
      calledNoMoreExchanges = true
      throw noMoreExchanges(e) as Throwable
    } finally {
      if (!calledNoMoreExchanges) {
        noMoreExchanges(null)
      }
    }
  }

异步分析:

     call.enqueue(object : Callback {
            override fun onResponse(call: Call, response: Response) {
                Log.e(TAG, "请求成功")
            }

            override fun onFailure(call: Call, e: IOException) {
                Log.e(TAG, "请求失败")
            }
        })

异步执行是通过call.enqueue(responseCallback: Callback)来执行,点进去查看

  override fun enqueue(responseCallback: Callback) {
    check(executed.compareAndSet(false, true)) { "Already Executed" }

    callStart()
    client.dispatcher.enqueue(AsyncCall(responseCallback))
  }
  • 事件侦听器EventListener回调callStart()和同步请求一样
  • 调用client.dispatcher.enqueue(AsyncCall(responseCallback))并传入了一个实例AsyncCall
  inner class AsyncCall(
    private val responseCallback: Callback
  ) : Runnable {
    @Volatile var callsPerHost = AtomicInteger(0)
      private set

    fun reuseCallsPerHostFrom(other: AsyncCall) {
      this.callsPerHost = other.callsPerHost
    }

    val host: String
      get() = originalRequest.url.host

    val request: Request
        get() = originalRequest

    val call: RealCall
        get() = this@RealCall

    /**
     * Attempt to enqueue this async call on [executorService]. This will attempt to clean up
     * if the executor has been shut down by reporting the call as failed.
     */
    fun executeOn(executorService: ExecutorService) {
      client.dispatcher.assertThreadDoesntHoldLock()

      var success = false
      try {
        executorService.execute(this)
        success = true
      } catch (e: RejectedExecutionException) {
        val ioException = InterruptedIOException("executor rejected")
        ioException.initCause(e)
        noMoreExchanges(ioException)
        responseCallback.onFailure(this@RealCall, ioException)
      } finally {
        if (!success) {
          client.dispatcher.finished(this) // This call is no longer running!
        }
      }
    }

    override fun run() {
      threadName("OkHttp ${redactedUrl()}") {
        var signalledCallback = false
        timeout.enter()
        try {
          val response = getResponseWithInterceptorChain()
          signalledCallback = true
          responseCallback.onResponse(this@RealCall, response)
        } catch (e: IOException) {
          if (signalledCallback) {
            // Do not signal the callback twice!
            Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
          } else {
            responseCallback.onFailure(this@RealCall, e)
          }
        } catch (t: Throwable) {
          cancel()
          if (!signalledCallback) {
            val canceledException = IOException("canceled due to $t")
            canceledException.addSuppressed(t)
            responseCallback.onFailure(this@RealCall, canceledException)
          }
          throw t
        } finally {
          client.dispatcher.finished(this)
        }
      }
    }
  }

AsyncCall继承了Runnable ,所以具体的请求流程都在run()里面进行处理,和同步请求流程一样调用timeout.enter() 最后也会构建一个拦截链getResponseWithInterceptorChain() 返回Response,成功回调 fun onResponse(call: Call, response: Response),失败回调fun onFailure(call: Call, e: IOException)。回过头来继续看client.dispatcher.enqueue

  internal fun enqueue(call: AsyncCall) {
    synchronized(this) {
      readyAsyncCalls.add(call)

      // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
      // the same host.
      if (!call.call.forWebSocket) {
        val existingCall = findExistingCallWithHost(call.host)
        if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
      }
    }
    promoteAndExecute()
  }

这里将AsyncCall加入到了准备执行的队列(readyAsyncCalls.add(call))中,往下看if里面的逻辑,首先是findExistingCallWithHost(host: String)方法

  private fun findExistingCallWithHost(host: String): AsyncCall? {
    for (existingCall in runningAsyncCalls) {
      if (existingCall.host == host) return existingCall
    }
    for (existingCall in readyAsyncCalls) {
      if (existingCall.host == host) return existingCall
    }
    return null
  }

在这个方法里面他主要在查找队列中已经存在的host并返回,回调asyncCall.reuseCallsPerHostFrom使其共享对同一主机的现有运行调用的AtomicInteger,再回到异步enqueue(call: AsyncCall)方法中,看最后一步调用promoteAndExecute()方法

private fun promoteAndExecute(): Boolean {
    this.assertThreadDoesntHoldLock()

    val executableCalls = mutableListOf<AsyncCall>()
    val isRunning: Boolean
    synchronized(this) {
      val i = readyAsyncCalls.iterator()
      while (i.hasNext()) {
        val asyncCall = i.next()

        if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
        if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.

        i.remove()
        asyncCall.callsPerHost.incrementAndGet()
        executableCalls.add(asyncCall)
        runningAsyncCalls.add(asyncCall)
      }
      isRunning = runningCallsCount() > 0
    }

    for (i in 0 until executableCalls.size) {
      val asyncCall = executableCalls[i]
      asyncCall.executeOn(executorService)
    }

    return isRunning
  }

再同步代码块内对readyAsyncCalls队列进行迭代,将符合要求的条件从队列中移除添加到runningAsyncCalls队列中,不符合的话继续待在readyAsyncCalls等待执行,最后调用asyncCall.executeOn(executorService)放入到线程中执行

总结:

  1. 同步请求:发送一个请求后需要等待返回,才能发送下一个请求。
  2. 异步请求:发送一个请求后不需要等待返回,可以继续发送,因为内部有两个队列,等待执行(readyAsyncCalls)和执行中(runningAsyncCalls),加入了AtomicInteger和线程池支持高并发
  3. Dispatcher:同步、异步都由Dispatcher进行统一管理

相关文章

网友评论

      本文标题:Okhttp同步与异步二

      本文链接:https://www.haomeiwen.com/subject/ddaualtx.html