美文网首页
OkHttp代码分析

OkHttp代码分析

作者: initLiu | 来源:发表于2019-06-05 20:40 被阅读0次

    1.OkHttpClient

    public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
        ...
        @Override 
        public Call newCall(Request request) {
        return RealCall.newRealCall(this, request, false /* for web socket */);
      }
    
      @Override 
      public WebSocket newWebSocket(Request request, WebSocketListener listener) {
        RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval);
        webSocket.connect(this);
        return webSocket;
      }
    }
    
    public interface Call extends Cloneable {
        ...
        interface Factory {
        Call newCall(Request request);
      }
    }
    
    public interface WebSocket {
        ...
        interface Factory {
        /**
         * Creates a new web socket and immediately returns it. Creating a web socket initiates an
         * asynchronous process to connect the socket. Once that succeeds or fails, {@code listener}
         * will be notified. The caller must either close or cancel the returned web socket when it is
         * no longer in use.
         */
        WebSocket newWebSocket(Request request, WebSocketListener listener);
      }
    }
    

    OkHttpClient实现了Call.Factory, WebSocket.Factory,做为创建CallWebSocket的工厂。这里使用了抽象工厂设计模式。

    2.RealCall

    2.1 设计模式

    final class RealCall implements Call {
      final OkHttpClient client;
      final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;
      /**
       * There is a cycle between the {@link Call} and {@link EventListener} that makes this awkward.
       * This will be set after we create the call instance then create the event listener instance.
       */
      private EventListener eventListener;
      ...
      @Override 
      public Response execute() throws IOException {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        captureCallStackTrace();
        eventListener.callStart(this);
        try {
          client.dispatcher().executed(this);
          Response result = getResponseWithInterceptorChain();
          if (result == null) throw new IOException("Canceled");
          return result;
        } catch (IOException e) {
          eventListener.callFailed(this, e);
          throw e;
        } finally {
          client.dispatcher().finished(this);
        }
      }
    
      @Override
      public void enqueue(Callback responseCallback) {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        captureCallStackTrace();
        eventListener.callStart(this);
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
      }
    
      @Override 
      public void cancel() {
        retryAndFollowUpInterceptor.cancel();
      }
    }
    

    从上面的代码中可以看到RealCall提供了一个对外的接口,内部调用各个子系统,通信是单向的(RealCall--->子系统)所以这是一个Facade模式。

    2.2 同步请求代码分析

    final class RealCall implements Call {
      final OkHttpClient client;
      final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;
    
      /**
       * There is a cycle between the {@link Call} and {@link EventListener} that makes this awkward.
       * This will be set after we create the call instance then create the event listener instance.
       */
      private EventListener eventListener;
      ...
      // Guarded by this.
      private boolean executed;
      ...
      @Override 
      public Response execute() throws IOException {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        captureCallStackTrace();
        // eventListener回调
        eventListener.callStart(this);
        try {
          // 2.2.1对于同步请求,直接把Call对象加入到队列中
          client.dispatcher().executed(this);
          // 执行请求
          Response result = getResponseWithInterceptorChain();
          if (result == null) throw new IOException("Canceled");
          return result;
        } catch (IOException e) {
          // eventListener回调
          eventListener.callFailed(this, e);
          throw e;
        } finally {
          // 2.2.1对于同步请求,请求完毕后把Call对象从队列中移出
          client.dispatcher().finished(this);
        }
      }
    }
    

    2.2.1 同步请求Call管理

    /**
     * Policy on when async requests are executed.
     *
     * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
     * own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
     * of calls concurrently.
     */
    public final class Dispatcher {
      ...
      /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
      ...
      /** Used by {@code Call#execute} to signal it is in-flight. */
      synchronized void executed(RealCall call) {
        runningSyncCalls.add(call);
      }
    
        /** Used by {@code Call#execute} to signal completion. */
      void finished(RealCall call) {
        finished(runningSyncCalls, call, false);
      }
    
      private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          if (promoteCalls) promoteCalls();
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
    
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
      }
    }
    

    2.3 异步请求代码分析

    final class RealCall implements Call {
      ...
      @Override 
      public void enqueue(Callback responseCallback) {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        captureCallStackTrace();
        // eventListener回调
        eventListener.callStart(this);
        // 2.3.1加入请求队列,等待调度
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
      }
    }
    
    /**
     * Policy on when async requests are executed.
     *
     * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
     * own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
     * of calls concurrently.
     */
    public final class Dispatcher {
      private int maxRequests = 64;
      private int maxRequestsPerHost = 5;
      private @Nullable Runnable idleCallback;
    
      /** Executes calls. Created lazily. */
      private @Nullable ExecutorService executorService;
    
      /** Ready async calls in the order they'll be run. */
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
      /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
      ...
      // 同步调用enqueue
      synchronized void enqueue(AsyncCall call) {
        // 2.3.1 异步请求入队列
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);
          executorService().execute(call);
        } else {
          readyAsyncCalls.add(call);
        }
      }
    
      /** Returns the number of running calls that share a host with {@code call}. */
      private int runningCallsForHost(AsyncCall call) {
        int result = 0;
        for (AsyncCall c : runningAsyncCalls) {
          if (c.get().forWebSocket) continue;
          if (c.host().equals(call.host())) result++;
        }
        return result;
      }
    }
    

    2.3.1 异步请求入队列

    runningAsyncCalls定义为private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();private int maxRequests = 64;
    从判断条件可以直到,OkHttp把异步请求分为执行中和待执行。

    什么时候将任务直接放到runningAsyncCalls执行队列中?

    首先看下runningCallsForHost(call)方法,根据它的实现可以知道,这个方法是遍历执行队列runningAsyncCalls中所有的任务AsyncCall,统计和当前的AsyncCall域名相同的AsyncCall数量。
    然后在看判断条件if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost)就可以知道任务可以直接放到runningAsyncCalls并行的条件是:

    • 执行队列中任务数量小于64
    • 在执行的所有请求中,和当前要执行的请求的host相同的请求不能大于5

    如果不满足上面的条件,任务就会被放入到readyAsyncCalls等待请求队列中。

    那么readyAsyncCalls队列总任务什么时候执行呢?
    在回答这个问题钱我们先看下异步任务怎么执行的。当异步请求被放入runningAsyncCalls队列后,就会调用executorService().execute(call);执行这个请求。

    public final class Dispatcher {
      ...
      public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
    }
    

    executorService()创建了一个线程池,然后把这个异步请求AsyncCall放入到线程池中执行。

    再看下AsyncCall的实现

    final class RealCall implements Call {
      ...
      final class AsyncCall extends NamedRunnable {
        private final Callback responseCallback;
    
        AsyncCall(Callback responseCallback) {
          super("OkHttp %s", redactedUrl());
          this.responseCallback = responseCallback;
        }
    
        String host() {
          return originalRequest.url().host();
        }
    
        @Override 
        protected void execute() {
          boolean signalledCallback = false;
          try {
            Response response = getResponseWithInterceptorChain();
            if (retryAndFollowUpInterceptor.isCanceled()) {
              signalledCallback = true;
              responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
              signalledCallback = true;
              responseCallback.onResponse(RealCall.this, response);
            }
          } catch (IOException e) {
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
              eventListener.callFailed(RealCall.this, e);
              responseCallback.onFailure(RealCall.this, e);
            }
          } finally {
            client.dispatcher().finished(this);
          }
        }
      }
    }
    

    execute()方法中和同步请求执行过程一样都是通过InterceptorChain执行请求。不同的是,异步请求通过responseCallback回调将结果告诉调用者。
    这里我们需要重点看下finaly块中的代码。这里调用了Dispatcher.finished(AsyncCall)方法通知Dispatcher请求执行结束。

    public final class Dispatcher {
      ...
      /** Used by {@code AsyncCall#run} to signal completion. */
      void finished(AsyncCall call) {
        //同步请求执行玩之后,也会调用这个方法,不同的是promoteCalls参数传的是false
        finished(runningAsyncCalls, call, true);
      }
    
      private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          if (promoteCalls) promoteCalls();
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
    
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
      }
    
      private void promoteCalls() {
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
    
          if (runningCallsForHost(call) < maxRequestsPerHost) {
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
          }
    
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
      }
    }
    

    最终调用了private <T> void finished(Deque<T> calls, T call, boolean promoteCalls)方法结束请求。同步请求执行玩之后,也会调用这个方法,不同的是promoteCalls参数传的是false
    那我们看下这个promoteCalls参数有什么作用。根据上面代码我们看到如果promoteCallstrue,就会执行promoteCalls(),根据这个方法实现,可以知道它就是把
    readyAsyncCalls队列中的任务取出,放入到runningAsyncCalls队列中,然后提交到线程池中执行。

    所以到这里我们就知道readyAsyncCalls中的任务什么时候执行了。

    相关文章

      网友评论

          本文标题:OkHttp代码分析

          本文链接:https://www.haomeiwen.com/subject/fdtexctx.html