美文网首页Android
OkHttp源码阅读(二)—— Dispatcher任务调度器

OkHttp源码阅读(二)—— Dispatcher任务调度器

作者: Sherlock丶Aza | 来源:发表于2021-04-10 08:06 被阅读0次

      上边OkHttp源码阅读(OkHttp源码阅读(一)-——初识OkHttp)中,初步的了解了一个简单http请求的执行流程,其中的一个关键角色就是Dispatcher,它主要负责请求的分发工作,下面具体分析一下它

    同步任务分发

    还记得上篇博客留下的坑嘛?那个标记异常醒目的重要代码块,如图:

    1.jpeg

    我们已经知道同步请求实际上调用的是RealCall类中的execute方法,如上图中所示的代码是execute方法执行的主要部分,标记①中调用Dispatcher的execute方法将构建出来的Realcall对象传入进去从而触发同步执行,Dispatcher的execute方法调用其实很简单,如下:

     /** Used by {@code Call#execute} to signal it is in-flight. */
      synchronized void executed(RealCall call) {
        runningSyncCalls.add(call);
      }
    

    Dispatcher的execute方法只是将请求call添加到了runningSyncCalls中,这里有了一个新的角色runningSyncCalls,从字面上理解就是正在运行的同步请求组,实际上,它是一个队列,正在执行的同步请求队列,我们看一下它的初始化,也就是Dispatcher中runningSyncCalls的定义。

    /** Ready async calls in the order they'll be run. */
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
    
      /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
      /**注意上边的注释,包括取消的但是还没结束的请求call*/
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    
      /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
      /**注意上边的注释,包括取消的但是还没结束的请求call*/
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    

    在Dispatcher中定义了三个队列,分别是

    1. readyAsyncCalls 准备执行的异步请求队列
    2. runningAsyncCalls 正在执行的异步请求队列
    3. runningSyncCalls 正在执行的同步请求队列

    他们的定义都是使用Deque的方式,Deque(双端队列)的特性是集成了Queue(队列)和Stack(栈)的特性,具体Deque的详细介绍可以另一篇博客Java容器之双端队列Deque, 接下来执行Response result = getResponseWithInterceptorChain();获取响应response,从源码中可以看到从Request请求发出到获取Response,所有的执行过程都是在主线程执行,所以同步请求会阻塞主线程,我们之前说过Call是链接Request和Response的一个中间桥梁,具体请求建立链接、参数传递等等包括获取response的过程交给了连接器链InterceptorChain,暂且不赘述,后边详细说明,获取到Response后,还有一个最最最重要的操作步骤 就是finally代码块,也就是标记③,调用Dispatcher的finished方法,具体源码实现如下:

    /** Used by {@code Call#execute} to signal completion. */
      void finished(RealCall call) {
        finished(runningSyncCalls, call, false);
      }
    
      private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
          /**首先从队列中移除这个请求*/
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          /**promoteCalls是个标记位,同步请求不会调用,异步请求会调用promoteCalls方法进行队列重新整理*/
          if (promoteCalls) promoteCalls();
          /**获取正在运行任务数*/
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
        /**一下是闲置回调*/
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
      }
    

      同步请求过程中的finished方法中,首先将runningSyncCalls正在执行的同步队列中的元素移除,然后计算正在运行的任务数runningCallsCount,只有当正在执行的任务数为0的时候才会回调idleCallback,否则Dispachter还会继续执行runningSyncCalls同步请求队列中的其他任务或者是其他等待运行或者正在运行的异步任务, runningCallsCount()方法很简单,就是判断下当前正在执行的异步队列和同步队列的size之和。

    public synchronized int runningCallsCount() {
        return runningAsyncCalls.size() + runningSyncCalls.size();
      }
    

    异步任务分发

      在实际开发过程中,大部分的网络请求都是异步请求,在OKHttp中,异步任务的分发流程也比同步的要复杂些,首先还是直接从RealCall的enqueue()方法开始:

    2.jpeg

    同样调用的是Dispatcher的enqueue方法,与execute方法不同的是,enqueue方法传入的是AsyncCall的对象,跟踪源码可以看到AsyncCall实际上是一个Runnable对象,

    final class AsyncCall extends NamedRunnable 
    **
    **
    ***
    **
    
    public abstract class NamedRunnable implements Runnable
    

    接下来看一下Dispatcher的enqueue()方法具体执行流程:

    synchronized void enqueue(AsyncCall call) {
        /**当前运行的异步任务数小于最大请求数,并且每个主机的请求数小于每个主机最大请求数*/
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          /**将改请求任务放入到正在执行的任务队列中*/
          runningAsyncCalls.add(call);
          /**线程池执行该任务*/
          executorService().execute(call);
        } else {
          /**如果不满足上边条件,则该任务会被加入到缓存队列,也就是准备执行的任务队列*/
          readyAsyncCalls.add(call);
        }
      }
    

    enqueue方法对runningAsyncCalls和readyAsyncCalls队列进行的操作,由于两个队列都非线程安全队列,所以enqueue方法被加上了synchronized,首先判断条件当前运行的异步任务数小于最大请求数,并且每个主机的请求数小于每个主机最大请求数时,该任务会直接加入到正在执行的异步任务队列runningAsyncCalls中,否则该任务会暂时被加入到缓存队列readyAsyncCalls中,Dispatcher中默认最大请求数 maxRequests是64个,每个主机最大请求数 maxRequestsPerHost是5个 ,另外Dispatcher还维护了一个线程池ExecutorService,并通过上面executorService()进行初始化:

      private int maxRequests = 64;
      private int maxRequestsPerHost = 5;
      private @Nullable Runnable idleCallback;
    
      *
      *
      *
      public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
    

    其中在ThreadPoolExecutor在初始化时,将线程池的最大线程个数设置为Integer.MAX_VALUE,理论上说正在运行的请求线程是Integer.MAX_VALUE个,但是Dispatcher中做了默认最大请求数maxRequests的限制,所以在用户没有特殊指定最大请求数时,Okhttp同时运行的请求个数不会超过64个。

    AsyncCall

      之前提到AsyncCall是个Runnable,上述异步任务执行过程中,当ExecutorService执行execute()方法时就是调用AsyncCall的run方法,由于AsyncCall继承NamedRunnable关系,实际上调用的是AsyncCall的execute方法,如下:

    final class AsyncCall extends NamedRunnable {
        private final Callback responseCallback;
    
        *
        *
        *
        
        @Override protected void execute() {
          boolean signalledCallback = false;
          try {
            /**通过拦截器链获取Response*/
            Response response = getResponseWithInterceptorChain();
            /**重试和重定向拦截器是否被取消*/
            if (retryAndFollowUpInterceptor.isCanceled()) {
              signalledCallback = true;
              responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
              signalledCallback = true;
              responseCallback.onResponse(RealCall.this, response);
            }
          } catch (IOException e) {
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
              eventListener.callFailed(RealCall.this, e);
              responseCallback.onFailure(RealCall.this, e);
            }
          } finally {
            /**以下代码块才是重点*/
            client.dispatcher().finished(this);
          }
        }
      }
    

    上边AsyncCall源码可以看出还是通过拦截器链获取Response,然后通过一些判断返回回调,这里拦截器链后面会详细介绍,重点是还是finally代码块,同步请求中Dispatcher的finished方法和异步的有所差别。

    /** Used by {@code AsyncCall#run} to signal completion. */
    /**异步请求调用此方法*/
      void finished(AsyncCall call) {
        finished(runningAsyncCalls, call, true);
      }
    
      /** Used by {@code Call#execute} to signal completion. */
      void finished(RealCall call) {
        finished(runningSyncCalls, call, false);
      }
    
      private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
          /**首先从队列中移除这个请求*/
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          /**promoteCalls是个标记位,同步请求不会调用,异步请求会调用promoteCalls方法进行队列重新整理*/
          if (promoteCalls) promoteCalls();
          /**获取正在运行任务数*/
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
        /**一下是闲置回调*/
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
      }
    

    异步请求调用了promoteCalls标记位为true的方法,前面的队列remove方法不变,重点在promoteCalls()方法,这个方法Dispatcher会重新整理缓存队列和正在执行的任务队列,如下:

    private void promoteCalls() {
        /**正在运行的任务数越界*/
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
        /**缓存队列中没有待执行任务了*/
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
          /**遍历缓存队列中等待执行的任务,判断条件是每台主机最大请求数小于最大限制*/
          if (runningCallsForHost(call) < maxRequestsPerHost) {
            /**满足上边条件的任务会从缓存队列中取出,并放入到正在执行的任务队列中,然后线程池执行该任务*/
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
          }
          /**每次正在执行任务队列在添加时都要判断是否已经超过指定最大请求任务数*/
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
      }
    

    可以看出,异步任务在调用Dispachter的finished()方法时,会循环的获取缓存队列中的任务,只要有任务并满足限制条件就会被执行(AsyncCall.execute()方法),然而AsyncCall.execute()方法执行完后finally又会执行Dispachter的finished()方法,如此循环调用直到正在执行的任务队列runningAsyncCalls没有任务为止,最后抛出idleCallback回调,处于闲置状态! 缓存队列和正在执行的任务队列二者之间的关系有点像生产者和消费者模式, Dispachter负责生产 ExecutorService负责消费

    总结

      同步和异步请求区别除了Call调用的方法不同以外,Dispatcher的处理机制也不相同,同步请求时Dispatcher会直接将这个任务丢进正在执行的同步任务队列中,直到获取到Response后,再从队列中移除该任务,所以同步请求会阻塞当前线程。异步任务请求时Dispatcher会判断当前的执行任务条件,选择放入缓存任务队列还是正在执行的异步任务队列,通过自己维护的线程池执行异步任务,得到Response,执行完成以后循环判断缓存队列和执行队列任务数,最终将所有任务执行完成,由于执行任务和获取Response都是在子线程中完成,所以异步任务不会阻塞当前线程

    相关文章

      网友评论

        本文标题:OkHttp源码阅读(二)—— Dispatcher任务调度器

        本文链接:https://www.haomeiwen.com/subject/gtaykltx.html