美文网首页
Okhttp同步与异步二

Okhttp同步与异步二

作者: 浮游生物的幻想 | 来源:发表于2021-10-28 11:04 被阅读0次

    之前根据Okhttp使用流程,逐块看了源码内的相关内容介绍。现在去看同步与异步之间的差异

       val mOk = OkHttpClient()
            val request = Request.Builder()
                .url("请求地址")
                .get()//请求方式
                .build()
            val call = mOk.newCall(request)
    

    同步分析:

    call.execute()
    

    call.execute()开启同步请求,返回Response,点进去看看

      override fun execute(): Response {
        check(executed.compareAndSet(false, true)) { "Already Executed" }
    
        timeout.enter()
        callStart()
        try {
          client.dispatcher.executed(this)
          return getResponseWithInterceptorChain()
        } finally {
          client.dispatcher.finished(this)
        }
      }
    
    • timeout.enter(),内部代码如下:如果没有超时也没有结束? 不允许进入,阻塞
      fun enter() {
        val timeoutNanos = timeoutNanos()
        val hasDeadline = hasDeadline()
        if (timeoutNanos == 0L && !hasDeadline) {
          return
        }
        scheduleTimeout(this, timeoutNanos, hasDeadline)
      }
    
    • 事件侦听器EventListener回调callStart()
      private fun callStart() {
        this.callStackTrace = Platform.get().getStackTraceForCloseable("response.body().close()")
        eventListener.callStart(this)
      }
    
     /** Used by [Call.execute] to signal it is in-flight. */
     @Synchronized internal fun executed(call: RealCall) {
       runningSyncCalls.add(call)
     }
    
    • 构建一个拦截器链getResponseWithInterceptorChain() 返回Response
     internal fun getResponseWithInterceptorChain(): Response {
        // Build a full stack of interceptors.
        val interceptors = mutableListOf<Interceptor>()
        interceptors += client.interceptors //自定义
        interceptors += RetryAndFollowUpInterceptor(client) //错误和重定向
        interceptors += BridgeInterceptor(client.cookieJar) //桥梁:应用程序过渡到网络
        interceptors += CacheInterceptor(client.cache) //缓存
        interceptors += ConnectInterceptor //连接
        if (!forWebSocket) {
          interceptors += client.networkInterceptors //网络
        }
        interceptors += CallServerInterceptor(forWebSocket) //对服务器进行网络调用
    
        //承载整个拦截器链的具体拦截器链
        val chain = RealInterceptorChain(
            call = this,
            interceptors = interceptors,
            index = 0,
            exchange = null,
            request = originalRequest,
            connectTimeoutMillis = client.connectTimeoutMillis,
            readTimeoutMillis = client.readTimeoutMillis,
            writeTimeoutMillis = client.writeTimeoutMillis
        )
    
        //返回response
        var calledNoMoreExchanges = false
        try {
          val response = chain.proceed(originalRequest)
          if (isCanceled()) {
            response.closeQuietly()
            throw IOException("Canceled")
          }
          return response
        } catch (e: IOException) {
          calledNoMoreExchanges = true
          throw noMoreExchanges(e) as Throwable
        } finally {
          if (!calledNoMoreExchanges) {
            noMoreExchanges(null)
          }
        }
      }
    

    异步分析:

         call.enqueue(object : Callback {
                override fun onResponse(call: Call, response: Response) {
                    Log.e(TAG, "请求成功")
                }
    
                override fun onFailure(call: Call, e: IOException) {
                    Log.e(TAG, "请求失败")
                }
            })
    

    异步执行是通过call.enqueue(responseCallback: Callback)来执行,点进去查看

      override fun enqueue(responseCallback: Callback) {
        check(executed.compareAndSet(false, true)) { "Already Executed" }
    
        callStart()
        client.dispatcher.enqueue(AsyncCall(responseCallback))
      }
    
    • 事件侦听器EventListener回调callStart()和同步请求一样
    • 调用client.dispatcher.enqueue(AsyncCall(responseCallback))并传入了一个实例AsyncCall
      inner class AsyncCall(
        private val responseCallback: Callback
      ) : Runnable {
        @Volatile var callsPerHost = AtomicInteger(0)
          private set
    
        fun reuseCallsPerHostFrom(other: AsyncCall) {
          this.callsPerHost = other.callsPerHost
        }
    
        val host: String
          get() = originalRequest.url.host
    
        val request: Request
            get() = originalRequest
    
        val call: RealCall
            get() = this@RealCall
    
        /**
         * Attempt to enqueue this async call on [executorService]. This will attempt to clean up
         * if the executor has been shut down by reporting the call as failed.
         */
        fun executeOn(executorService: ExecutorService) {
          client.dispatcher.assertThreadDoesntHoldLock()
    
          var success = false
          try {
            executorService.execute(this)
            success = true
          } catch (e: RejectedExecutionException) {
            val ioException = InterruptedIOException("executor rejected")
            ioException.initCause(e)
            noMoreExchanges(ioException)
            responseCallback.onFailure(this@RealCall, ioException)
          } finally {
            if (!success) {
              client.dispatcher.finished(this) // This call is no longer running!
            }
          }
        }
    
        override fun run() {
          threadName("OkHttp ${redactedUrl()}") {
            var signalledCallback = false
            timeout.enter()
            try {
              val response = getResponseWithInterceptorChain()
              signalledCallback = true
              responseCallback.onResponse(this@RealCall, response)
            } catch (e: IOException) {
              if (signalledCallback) {
                // Do not signal the callback twice!
                Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
              } else {
                responseCallback.onFailure(this@RealCall, e)
              }
            } catch (t: Throwable) {
              cancel()
              if (!signalledCallback) {
                val canceledException = IOException("canceled due to $t")
                canceledException.addSuppressed(t)
                responseCallback.onFailure(this@RealCall, canceledException)
              }
              throw t
            } finally {
              client.dispatcher.finished(this)
            }
          }
        }
      }
    

    AsyncCall继承了Runnable ,所以具体的请求流程都在run()里面进行处理,和同步请求流程一样调用timeout.enter() 最后也会构建一个拦截链getResponseWithInterceptorChain() 返回Response,成功回调 fun onResponse(call: Call, response: Response),失败回调fun onFailure(call: Call, e: IOException)。回过头来继续看client.dispatcher.enqueue

      internal fun enqueue(call: AsyncCall) {
        synchronized(this) {
          readyAsyncCalls.add(call)
    
          // Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
          // the same host.
          if (!call.call.forWebSocket) {
            val existingCall = findExistingCallWithHost(call.host)
            if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
          }
        }
        promoteAndExecute()
      }
    

    这里将AsyncCall加入到了准备执行的队列(readyAsyncCalls.add(call))中,往下看if里面的逻辑,首先是findExistingCallWithHost(host: String)方法

      private fun findExistingCallWithHost(host: String): AsyncCall? {
        for (existingCall in runningAsyncCalls) {
          if (existingCall.host == host) return existingCall
        }
        for (existingCall in readyAsyncCalls) {
          if (existingCall.host == host) return existingCall
        }
        return null
      }
    

    在这个方法里面他主要在查找队列中已经存在的host并返回,回调asyncCall.reuseCallsPerHostFrom使其共享对同一主机的现有运行调用的AtomicInteger,再回到异步enqueue(call: AsyncCall)方法中,看最后一步调用promoteAndExecute()方法

    private fun promoteAndExecute(): Boolean {
        this.assertThreadDoesntHoldLock()
    
        val executableCalls = mutableListOf<AsyncCall>()
        val isRunning: Boolean
        synchronized(this) {
          val i = readyAsyncCalls.iterator()
          while (i.hasNext()) {
            val asyncCall = i.next()
    
            if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.
            if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
    
            i.remove()
            asyncCall.callsPerHost.incrementAndGet()
            executableCalls.add(asyncCall)
            runningAsyncCalls.add(asyncCall)
          }
          isRunning = runningCallsCount() > 0
        }
    
        for (i in 0 until executableCalls.size) {
          val asyncCall = executableCalls[i]
          asyncCall.executeOn(executorService)
        }
    
        return isRunning
      }
    

    再同步代码块内对readyAsyncCalls队列进行迭代,将符合要求的条件从队列中移除添加到runningAsyncCalls队列中,不符合的话继续待在readyAsyncCalls等待执行,最后调用asyncCall.executeOn(executorService)放入到线程中执行

    总结:

    1. 同步请求:发送一个请求后需要等待返回,才能发送下一个请求。
    2. 异步请求:发送一个请求后不需要等待返回,可以继续发送,因为内部有两个队列,等待执行(readyAsyncCalls)和执行中(runningAsyncCalls),加入了AtomicInteger和线程池支持高并发
    3. Dispatcher:同步、异步都由Dispatcher进行统一管理

    相关文章

      网友评论

          本文标题:Okhttp同步与异步二

          本文链接:https://www.haomeiwen.com/subject/ddaualtx.html