浅析 OkHttp 调度器 Dispatcher

作者: RunAlgorithm | 来源:发表于2017-10-25 23:17 被阅读161次

概述

OkHttp 支持异步发起请求,可以不需要使用者自己创建线程池管理异步请求,它有内置实现。

比如这边发起一个异步请求

 OkHttpClient client = new OkHttpClient();
 Request request = new Request.Builder()
    .url("http://publicobject.com/helloworld.txt")
    .build();
 client.newCall(request).enqueue(new Callback() {
    ...
 });

跟踪 RealCall 中的 enqueue 代码中:

  @Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
  }

最后可以看到使用内部的一个 Dispatcher 对象来执行请求。在进入调度器前,会被包装成一个 NameRunnable 的实现类 AsyncCall,这个实现类也是 RealCall 的一个内部类,可以拿到 RealCall 的引用和各种资源。在 AsyncTask 的 execute 中,会调用 OkHttp 的拦截器栈,传入 Request,返回 Response。

final class AsyncCall extends NamedRunnable {
    ...
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        ...
      } catch (IOException e) {
        ...
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

而 NameRunnable 是 Runnable 的实现类,在 Runnable 中增加了一个功能,在任务执行期间修改了线程名为任务名,任务执行结束后再恢复旧的线程名。这样在 DDMS 查看线程的时候,我们就可以很方便地查阅到正在运行的线程是在执行哪个任务。

public abstract class NamedRunnable implements Runnable {
  protected final String name;

  public NamedRunnable(String format, Object... args) {
    this.name = Util.format(format, args);
  }

  @Override public final void run() {
    String oldName = Thread.currentThread().getName();
    Thread.currentThread().setName(name);
    try {
      execute();
    } finally {
      Thread.currentThread().setName(oldName);
    }
  }

  protected abstract void execute();
}

这个请求在什么时候执行,使用哪个线程执行,调用了 Dispatcher 的 enqueue 方法后由 Dispatcher 正式接管。

client.dispatcher().enqueue(new AsyncCall(responseCallback));

线程池的配置

为了实现请求的并发,Dispatcher 配置了一个线程池,具体如下:

  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }

可以看到 OkHttp 的线程池配置如下:

  • 核心线程数:0
  • 最大线程数:Iteger.MAX_VALUE,其实就是不限制
  • 空闲线程保活时间:60s
  • 任务队列:SynchronousQueue,为有界队列,且队列大小为 0,不存 Runnable,只是用来进行生产者和消费者之间的传递任务。

是不是很眼熟?和我们在 Executors 使用的 缓存线程池的配置完全一样,缓存线程池的代码如下:

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

所以 OkHttp 自己创建的这个线程池,和缓存线程池的区别,仅仅是修改了线程名。

我们简要分析一下缓存线程池的运行机制,进入到 ThreadPoolExecutor 中的 execute 方法的源码中,假设线程池处于运行状态,会有这样的流程:

  • 新过来一个任务,由于核心线程数为 0,不需要创建核心线程,所以尝试加入任务队列
  • 如果线程池中有线程刚好空闲可以接收任务,因为任务队列的类型是 SynchronousQueue,实际大小为 0,只做消费者和生产者的中转站,所以这时候,就可以入列成功,通过 SynchronousQueue 中转任务给空闲的线程执行。
  • 如果当前线程池所有线程工作饱和,会入列失败。这时候 ThreadPoolExecutor 会调用 addWorker(command, false) 来创建并且启动新线程。

这样子,就会产生疑问,那么如果一次性同时发起大量的请求,不就会产生大量线程了?比如我一次性发起 100 个请求,那么是不是会发出 100 个线程?

是的,如果直接使用该线程池而没有其他的策略的话,是有这样的问题。

所以在 OkHttp 中线程池只是一个辅助作用,仅仅是用来做线程缓存,便于复用的。真正对这些请求的并发数量限制,执行时机等等都是调度器 Dispatcher 承担的。在 OkHttp 这里,线程池只是个带缓存功能的执行器,而真正的调度是外部包了一个调度策略的。

调度策略

这里简单说明一下 OkHttp 的请求的具体调度策略。

首先,是线程数量的约束。会一次性同时并发大量请求吗?不会的,它做了这样的限制:

  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;

最大并发数 maxRequests 默认为 64,但个域名最大请求数 maxRequestsPerHost 默认为 5 个。所以极端情况下,才会开启 64 个线程。这个场景非常罕见。

这两个约束是如何做到的呢?

在执行异步请求 enqueue 的时候有这样处理:

  synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

可以看到,在并发执行的请求数量小于 64,并且当前域名的请求书小于 5 的情况下,才会调用 executorService().execute(call); 。因为是缓存线程池,有空闲线程直接执行,没有的话创建新的线程执行。

如果超过了约束,则会放入一个等待队列 readyAsyncCalls 中:

/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

那么,等待队列中的请求什么时候会执行呢?

回到之前提到的 AsyncCalls 中,在 execute 结束的时候会再调用一下 Dispatcher 的 finished 方法

  client.dispatcher().finished(this);

跟踪代码,可以看到会执行到 promoteCalls 方法,和 enqueue 的方法一样,也是按照 maxRequests 和 maxRequestsPerHost 的约束,执行等待队列中的任务。

  private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

所以,OkHttp 的调度器默认配置下,在保证每个域名可以快速响应请求的情况下,还限制了每个域名的并发数和总体的并发数。实际每个应用的请求用到的域名都不多。线程池仅仅做线程缓存功能,调度策略应该外部自己去实现。

自定义配置

OkHttp 默认最大并发数 64,单域名最大并发 5,但这个是灵活的,是允许使用者去按照自己的复杂场景做相应的配置。比如某个应用才一个域名,但请求非常频繁,就可以调整 Dispatcher 的配置,比如设置单域名最大并发 10,最大并发 10。然后把自己配置的结果在 OkHttpClient 构造的时候传入即可:

 Dispatcher dispatcher = new Dispatcher();
 dispatcher.setMaxRequests(10);
 dispatcher.setMaxRequestsPerHost(10);

 OkHttpClient client = new OkHttpClient.Builder()
    ...
    .dipatcher(dipatcher)
    .build();
 
 ...

又或者,本来应用线程就很多很紧张了,然后请求的域名也挺多有几十个,网络请求的优先级比较低,就可以适当的把最大并发数调低等等,具体情况具体分析,按需配置。

一般情况下,OkHttp 提供的默认配置已经足够,不需要使用者再折腾了。

相关文章

网友评论

    本文标题:浅析 OkHttp 调度器 Dispatcher

    本文链接:https://www.haomeiwen.com/subject/lxthpxtx.html