美文网首页android
okhttp源码解析--dispatcher

okhttp源码解析--dispatcher

作者: 二妹是只猫 | 来源:发表于2019-02-12 13:24 被阅读31次
    Okhttpclient流程图.png

    1. dispatcher到底是什么、做了什么?

    public final class Dispatcher {
      private int maxRequests = 64;
      private int maxRequestsPerHost = 5;
      private @Nullable Runnable idleCallback;
    
      /** Executes calls. Created lazily. */
      private @Nullable ExecutorService executorService;
    
      /** Ready async calls in the order they'll be run. */
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
    
      /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    
      /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    

    通过上一篇的学习了解到不论是同步还是异步请求,最终调用的都是dispatcher中的方法,并分别将请求线程AsyncCall添加到异步队列(readyAsyncCalls、runningAsyncCalls)和同步队列(runningSyncCalls)中.

     synchronized void enqueue(AsyncCall call) {
       if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
         runningAsyncCalls.add(call);
         executorService().execute(call);
       } else {
         readyAsyncCalls.add(call);
       }
     }
    /** Used by {@code Call#execute} to signal it is in-flight. */
     synchronized void executed(RealCall call) {
       runningSyncCalls.add(call);
     }
     public synchronized ExecutorService executorService() {
       if (executorService == null) {
         executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
             new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
       }
       return executorService;
     }
    

    executorService()创建了一个线程池,在通过上面两张代码,不难看出Dispatcher主要工作就是维护请求队列和通过线程池来执行网络请求

    1.1 Dispatcher的readyAsyncCalls、runningAsyncCalls等队列是怎样进行控制的?

    从上一段代码中可以看到同步方法executed很简单直接将请求添加到同步队列中(runningSyncCalls)。异步请求也只有几行代码,判断了下是否小于最大请求数(maxRequests)和小于正在进行中的网络请求最大值(maxRequestsPerHost),是--就添加到进行中异步队列(runningAsyncCalls)并执行线程池,否者就添加到异步等待队列(readyAsyncCalls)
    看到这里我就产生了一个疑问,这里只是对消息队列进行了简单的添加,并没有和我们的线程池产生联系啊!不着急,我们继续。上一段代码我们以异步请求enqueue 方法继续分析 :当请求满足条件时执行了

    runningAsyncCalls.add(call);
    executorService().execute(call);
    

    这两步操作:第一步添加到执行中的异步请求队列(runningAsyncCalls),第二步线程池直接执行当前网络请求(注意executorService()返回线程池、AsyncCall(继承自NamedRunnable)就是一个线程哦,这俩在上一篇中已介绍过就不多说了)

    /**
     * Runnable implementation which always sets its thread name.
     */
    public abstract class NamedRunnable implements Runnable {
      protected final String name;
    
      public NamedRunnable(String format, Object... args) {
        this.name = Util.format(format, args);
      }
    
      @Override public final void run() {
        String oldName = Thread.currentThread().getName();
        Thread.currentThread().setName(name);
        try {
          execute();
        } finally {
          Thread.currentThread().setName(oldName);
        }
      }
    
      protected abstract void execute();
    }
    

    当线程池执行网络请求时,实际就是执行了子线程的run()方法,AsyncCall继承的NamedRunnable就是一个Runnable,在它的run()方法中执行了它的抽象方法execute()
    我们继续到AsyncCall中去看看execute()的具体实现:

      final class AsyncCall extends NamedRunnable {
        private final Callback responseCallback;
    
        AsyncCall(Callback responseCallback) {
          super("OkHttp %s", redactedUrl());
          this.responseCallback = responseCallback;
        }
    
        String host() {
          return originalRequest.url().host();
        }
    
        Request request() {
          return originalRequest;
        }
    
        RealCall get() {
          return RealCall.this;
        }
    
        @Override protected void execute() {
          boolean signalledCallback = false;
          try {
            Response response = getResponseWithInterceptorChain();
            if (retryAndFollowUpInterceptor.isCanceled()) {
              signalledCallback = true;
              responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
              signalledCallback = true;
              responseCallback.onResponse(RealCall.this, response);
            }
          } catch (IOException e) {
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
              responseCallback.onFailure(RealCall.this, e);
            }
          } finally {
            client.dispatcher().finished(this);
          }
        }
      }
    

    retryAndFollowUpInterceptor.isCanceled()首先判断请求是否取消,
    取消就调用 responseCallbackonFailure(...)失败方法,
    否则responseCallbackonResponse(...)返回数据。这样一个异步请求就完成了。
    最终在finlly看到了这段代码client.dispatcher().finished(this)调用了dispatcher的finished方法

      /** Used by {@code AsyncCall#run} to signal completion. */
      void finished(AsyncCall call) {
        finished(runningAsyncCalls, call, true);
      }
    
      /** Used by {@code Call#execute} to signal completion. */
      void finished(RealCall call) {
        finished(runningSyncCalls, call, false);
      }
    
      private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          if (promoteCalls) promoteCalls();
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
    
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
      }
    

    首先将执行完成的网络请求从runningSyncCalls中移除,第二步通过promoteCalls判断是否是异步请求,是就执行promoteCalls()

     private void promoteCalls() {
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
    
          if (runningCallsForHost(call) < maxRequestsPerHost) {
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
          }
    
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
      }
      /** Returns the number of running calls that share a host with {@code call}. */
      private int runningCallsForHost(AsyncCall call) {
        int result = 0;
        for (AsyncCall c : runningAsyncCalls) {
          if (c.host().equals(call.host())) result++;
        }
        return result;
      }
    

    首先判断请求进行中的异步队列(runningAsyncCalls)达到最大数量就return,异步等待队列(readyAsyncCalls)是为空就return。
    然后遍历异步等待队列(readyAsyncCalls)

    • 判断现在最大请求数是否达到,没到就将请求从异步等待队列(readyAsyncCalls)移除,并添加到异步队列(runningAsyncCalls)执行线程池,就又开始了下一次网络请求。

    • 在此判断请求是否到达了最大值,因为在上面可能刚刚添加了一个网络请求
      这样队列中的请求就完成了一个循环。队列也和线程池联系了起来

    简单的总结一下:
    executorService().execute(call)-->AsyncCall执行execute()-->dispatcher().finished(this)--> executorService().execute(call)

    相关文章

      网友评论

        本文标题:okhttp源码解析--dispatcher

        本文链接:https://www.haomeiwen.com/subject/qdjheqtx.html