美文网首页
okhttp源码之分发器分析

okhttp源码之分发器分析

作者: RenHaiRenWjc | 来源:发表于2019-12-09 14:29 被阅读0次

    定义

    Square 贡献的一个处理网络请求的开源项目,从android 4.4 开始 httpURLConnection 底层实现采用 okhttp

    优点

    支持HTTP/2并允许对同一主机的所有请求共享一个套接字
    通过连接池,减少了请求延迟
    默认通过GZip压缩数据,帮我们压缩数据,请求速度更快、使用流量更少
    响应缓存,避免了重复请求的网络
    请求失败自动重试主机的其他ip,自动重定向

    使用流程

    okhttp使用流程图

    从OkHttp使用流程图来看:
    在使用okhttp发起一次请求时,最少存在在OkHttpClientRequestCall三个角色,结合代码来:

    OkHttpClient client = new OkHttpClient.Builder().cache(cache).build();
    Request request = new Request.Builder().url(ENDPOINT).build();
    Call mCall = client.newCall(request);
    Response response = mCall.execute();
    

    其中OkHttpClientRequest的创建可以使用它为我们提供的Builder(建造者模式)。而Call则是把Request交给OkHttpClient之后返回的一个已准备好执行的请求。(建造者模式:将一个复杂的构建与其表示相分离,使得同样的构建过程可以创建不同的表示。实例化 OKHttpClient 和 Request 的时候,因为有太多的属性需要设置,而且开发者的需求组合千变万化,使用建造者模式可以)
    同时 OkHttp 在设计时采用的门面模式,将整个系统的复杂性给隐藏起来,将子系统接口通过一个客户端 OkHttpClient 统一暴露出来。OkHttpClient中全是一些配置,比如拦截器配置等。
    Call本身是一个接口,我们获得的实现为:RealCall,如下:

     fun newRealCall(client: OkHttpClient, originalRequest: Request, forWebSocket: Boolean): RealCall {
          // Safely publish the Call instance to the EventListener.
          return RealCall(client, originalRequest, forWebSocket).apply {
            transmitter = Transmitter(client, this)
          }
        }
    

    Callexecute代表了同步请求,而enqueue则代表异步请求。两者唯一区别在于一个会直接发起网络请求,而另一个使用OkHttp内置的线程池来进行,这就涉及到OkHttp的任务分发器。

    分发器---Dispatcher

    是来调配请求任务的,内部会包含一个线程池。可以在创建OkHttpClient时,传递我们自己定义的线程池来创建分发器。
    主要成员有:

    var maxRequests = 64     // 异步请求时,最大请求数
    var maxRequestsPerHost = 5    // The maximum number of requests for each host to execute concurrently.
    var idleCallback: Runnable? = null     // A callback to be invoked each time the dispatcher becomes idle
    var executorServiceOrNull: ExecutorService? = null   // 异步请求线程池
    val readyAsyncCalls = ArrayDeque<AsyncCall>()  // 异步请求等待执行队列
    val runningAsyncCalls = ArrayDeque<AsyncCall>()   // 异步请求正在执行队列
    val runningSyncCalls = ArrayDeque<RealCall>()  // 同步请求正在执行队列
    

    同步请求

      @Synchronized internal fun executed(call: RealCall) {
        runningSyncCalls.add(call)
      }
    

    因为同步请求不需要线程池,也不存在任何限制,所以分发器仅做一下记录。

    异步请求

      internal fun enqueue(call: AsyncCall) {
        synchronized(this) {
          readyAsyncCalls.add(call)  // 加入等待请求队列
          .....
        }
        promoteAndExecute()
      }
    
    private fun promoteAndExecute(): Boolean {
        this.assertThreadDoesntHoldLock()
        val executableCalls = mutableListOf<AsyncCall>()
        val isRunning: Boolean
        synchronized(this) {
          val i = readyAsyncCalls.iterator() // 迭代器
          while (i.hasNext()) {
            val asyncCall = i.next()
            if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.超过运行中最大的请求数
            // 超过同一域名请求;如果不超过则放入执行列表中
            if (asyncCall.callsPerHost().get() >= this.maxRequestsPerHost) continue // Host max capacity.
            i.remove() // 从等待中的请求列表删除
            asyncCall.callsPerHost().incrementAndGet()
            executableCalls.add(asyncCall)
            runningAsyncCalls.add(asyncCall)
          }
          isRunning = runningCallsCount() > 0
        }
        for (i in 0 until executableCalls.size) {
          val asyncCall = executableCalls[i]
          asyncCall.executeOn(executorService)  // 提交给线程池
        }
        return isRunning
      }
    

    从上面看到,请求都是先加入等待请求队列,然后再判断最大请求数、同一域名最大数来看是否加入执行请求列表中。
    提交到线程池后,那下一步做什么呢?看下executeOn() 所在的 AsyncCall

    // 实现 Runnable ,线程启动时它时,会执行 run 方法
     internal inner class AsyncCall( private val responseCallback: Callback ) : Runnable {
        @Volatile
        
        fun executeOn(executorService: ExecutorService) {//重定向到`execute`方法
          var success = false
          try {
            executorService.execute(this)
            success = true
          } catch (e: RejectedExecutionException) {
           .....
            responseCallback.onFailure(this@RealCall, ioException)
          } finally {
            if (!success) {
              client.dispatcher.finished(this) // This call is no longer running!
            }
          }
        }
        override fun run() {
          threadName("OkHttp ${redactedUrl()}") {
            var signalledCallback = false
            transmitter.timeoutEnter()
            try {
              val response = getResponseWithInterceptorChain() // 真正的执行请求,返回结果,这才是 okhttp 的核心:拦截器责任链
              signalledCallback = true
              responseCallback.onResponse(this@RealCall, response)
            } catch (e: IOException) {
              ......
            } catch (t: Throwable) {
              ......
                responseCallback.onFailure(this@RealCall, canceledException)
              throw t
            } finally {
              client.dispatcher.finished(this) // 请求完成
            }
          }
        }
      }
    

    看到没,启动一个线程在 run 中执行请求,那 AsyncCall 是什么初始的呢,看回enqueue(call: AsyncCall),显然通过传参进来,继续追踪,看到了
    RealCall类中:

      // 异步
      override fun enqueue(responseCallback: Callback) {
        synchronized(this) {
          check(!executed) { "Already Executed" } // 检查是否重复执行
          executed = true
        }
        transmitter.callStart()
        client.dispatcher.enqueue(AsyncCall(responseCallback)) // 调用分发器
      }
    

    而 RealCall 是实现 Call 的,因此在 Call mCall = client.newCall(request);已经初始了,就等线程执行 Runnable

    而当执行请求列表执行完一个任务后,是怎么通知下一个等待请求的呢?
    我们来看下 finished看代码

      internal fun finished(call: AsyncCall) {
        call.callsPerHost().decrementAndGet()
        finished(runningAsyncCalls, call)
      }
      internal fun finished(call: RealCall) {
        finished(runningSyncCalls, call)
      }
      private fun <T> finished(calls: Deque<T>, call: T) {
        val idleCallback: Runnable?
        synchronized(this) {
          // 从请求列表中移除
          if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
          idleCallback = this.idleCallback
        }
        val isRunning = promoteAndExecute() // 异步任务结束后,重新调配请求,把等待中的请求放入执行请求列表中 
        if (!isRunning && idleCallback != null) {
          idleCallback.run()
        }
      }
    

    从代码来看,当任务执行完后,调用finished来获取下一个请求

    分发器线程池

    分发器内部会包含一个线程池。当异步请求时,会将请求任务交给线程池来执行。
    那分发器中默认的线程池是如何定义的呢?为什么要这么定义?

      @get:Synchronized
      @get:JvmName("executorService") val executorService: ExecutorService
        get() {
          if (executorServiceOrNull == null) {
            executorServiceOrNull = ThreadPoolExecutor(
              0, Int.MAX_VALUE,  // 核心数、最大线程、
              60, TimeUnit.SECONDS,  // 空闲线程闲置时间、闲置单位、
              SynchronousQueue(),  //线程等待队列,SynchronousQueue:一个不存储元素的阻塞队列
              threadFactory("OkHttp Dispatcher", false)) //线程工厂
          }
          return executorServiceOrNull!!
        }
    

    首先核心线程为0,表示线程池不会一直为我们缓存线程,线程池中所有线程都是在60s内没有工作就会被回收。而最大线程Integer.MAX_VALUE与等待队列SynchronousQueue的组合能够得到最大的吞吐量。即当需要线程池执行任务时,如果不存在空闲线程不需要等待,马上新建线程执行任务!等待队列的不同指定了线程池的不同排队机制。
    但是需要注意的时,我们都知道,进程的内存是存在限制的,而每一个线程都需要分配一定的内存。所以线程并不能无限个数。那么当设置最大线程数为Integer.MAX_VALUE时,OkHttp同时还有最大请求任务执行个数: 64的限制。这样即解决了这个问题同时也能获得最大吞吐。

    总结:
    上面就是分发器的分发流程了,其实分发器只是调配请求任务的,真正执行请求的工作都是在getResponseWithInterceptorChain()
    分发器的源码分析主要弄清楚下面三个问题即可
    Q: 如何决定将请求放入ready还是running?
    A: 如果当前正在请求数不小于64放入ready;如果小于64,但是已经存在同一域名主机的请求5个放入ready!

    Q: 从running移动ready的条件是什么?
    A: 每个请求执行完成就会从running移除,同时进行第一步相同逻辑的判断,决定是否移动!

    Q: 分发器线程池的工作优点?
    A:无等待,最大并发

    相关文章

      网友评论

          本文标题:okhttp源码之分发器分析

          本文链接:https://www.haomeiwen.com/subject/sbbbgctx.html