美文网首页
7、okhttp源码解析-Dispatcher任务管理器

7、okhttp源码解析-Dispatcher任务管理器

作者: 飞奔的口罩 | 来源:发表于2020-09-01 13:45 被阅读0次

1、okhttp源码解析-整体流程
2、okhttp源码解析-拦截器RetryAndFllowUpInterceptor
3、okhttp源码解析-拦截器BridgeInterceptor
4、okhttp源码解析-拦截器CacheInterceptor
5、okhttp源码解析-拦截器ConnectInterceptor
6、okhttp源码解析-拦截器CallServerInterceptor
7、okhttp源码解析-Dispatcher任务管理器

Dispatcher

okhttp的dispatcher调度器,其实就是维护了一个线程池。

  • dispatcher默认最大并发数是是 64
  • dispatcher默认同一域名下最大并发数是 5

一、调用流程图

Dispatcher (1).png

二、源码分析

1、维护一个线程池,executorService()

2、维护三个数组

  • readyAsyncCalls 待执行的异步任务list
  • runningAsyncCalls 正在执行的异步任务list
  • runningSyncCalls 正在执行的同步任务list

3、Dispatcher异步执行方法:enqueue(AsyncCall)

  • 判断runningAsyncCalls是否大于64,并且判断同一个host请求是否大于5.
  • 3.1 如果小于64将AsyncCall放到数组中,并直接将AsyncCall,放到线程池中执行execute。
  • 3.2 如果大于64了,就先放在readyAsyncCalls集合中。
4、线程池执行execute()时,会调用AsyncCall的execute方法,
  • 4.1 调用getResponseWithInterceptorChains()方法,通过拦截责任链获取response。
  • 4.2 调用Dispatcher.finished() -> promoteCalls()方法,循环readyAsyncCalls集合,如果runningAsyncCalls小于64的话,就从readyAsyncCalls里拿一个出来执行,并插入runningAsyncCalls里。

5、Dispatcher的finished()方法

  • 从runningAyncCalls中,移除当前已经执行完成的异步任务
  • 调用promoteCalls()方法

6、Dispatcher的promoteCalls()方法

  • 循环readyAsyncCalls集合,如果runningAsyncCalls小于64的话,就从readyAsyncCalls里拿一个出来执行,并插入runningAsyncCalls里

7、Dispatcher的execute方法中:

  • 将RealCall插入runningSyncCalls集合中。
  • 这个runningSyncCalls是存储同步任务的,他只是单纯用来计数的。
  • 计数的目的是在空闲回调idleCallback时,确定同步请求数runningSyncCalls和异步请求数runningAyncCalls之和等于0时,再做空闲回调。

二、关键代码

1、维护一个线程池,executorService()

public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

2、维护三个数组

 /** Ready async calls in the order they'll be run. */
 //待执行的异步任务集合
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  //正在执行的异步任务集合
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  //正在执行的同步任务集合,在空闲回调时,用于计算正在执行的任务。也就是同步任务、异步任务均为0时,才做空闲回调
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

3、异步执行方法:enqueue(AsyncCall)

synchronized void enqueue(AsyncCall call) {
    //正在执行的异步任务总数 < 64 && 同host的请求数 < 5
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    //将异步任务存储在集合中
      runningAsyncCalls.add(call);
      //线程池执行该异步任务
      executorService().execute(call);
    } else {
    //否则暂时将异步任务存储在待执行任务集合中
      readyAsyncCalls.add(call);
    }
  }
4、线程池执行execute()时,会调用AsyncCall的execute方法,
@Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
      } catch (IOException e) {
      } finally {
      //关键点:调用Dispatcher的finished方法。
        client.dispatcher().finished(this);
      }
    }

5、Dispatcher的finished()方法

 /** Used by {@code AsyncCall#run} to signal completion. */
  void finished(AsyncCall call) {
    finished(runningAsyncCalls, call, true);
  }

  /** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call, false);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    int runningCallsCount;
    Runnable idleCallback;
    synchronized (this) {
    //将执行完的异步任务,移除runningAsyncCalls集合。
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      //执行promoteCalls()方法,将待执行集合中的任务拿出来去执行。
      if (promoteCalls) promoteCalls();
      runningCallsCount = runningCallsCount();
      idleCallback = this.idleCallback;
    }

//runningCallsCount = 同步任务数 + 异步任务数。等于0调用空闲回调
    if (runningCallsCount == 0 && idleCallback != null) {
      idleCallback.run();
    }
  }

6、Dispatcher的promoteCalls()

  private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.

//遍历待执行异步任务集合
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        //放入正在执行的集合中
        runningAsyncCalls.add(call);
        //通过线程池执行异步任务
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

7、Dispatcher的execute()方法

仅仅用来记录同步任务的个数,辅助空闲回调时的判断条件。

/** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

相关文章

网友评论

      本文标题:7、okhttp源码解析-Dispatcher任务管理器

      本文链接:https://www.haomeiwen.com/subject/vsjdjktx.html