美文网首页
OkHttp3(v3.4.1)剖析

OkHttp3(v3.4.1)剖析

作者: 一只小松 | 来源:发表于2017-11-11 22:52 被阅读0次

    特性

    1. OkHttp performs best when you create a single {@code OkHttpClient} instance and reuse it for all of your HTTP calls.

    创建OkHttpClient的单实例作为所有http的请求会得到最好的性能表现。

    1. This is because each client holds its own connection pool and thread pools.

    因为每个实例拥有自己的连接池和线程池

    1. Reusing connections and threads reduces latency and saves memory.

    重用连接池和线程池以达到减少延迟和节省内存的目的。

    1. Conversely, creating a client for each request wastes resources on idle pools.

    相反地,为每一个请求创建一个OkHttpClient实例将会导致空闲池的浪费。

    1. The threads and connections that are held will be released automatically if they remain idle.

    线程池和连接池将会在闲置的时候将被自动释放。

    开始

    本篇我们从一个网络请求的发起到收到响应数据的一个完整过程来剖析okhttp的源码。
    通常我们是这样使用的:

    OkHttpClient client = new OkHttpClient();
    
    String run(String url) throws IOException {
      // 通过Request的构建者,构建一个request对象
      Request request = new Request.Builder()
          .url(url)
          .build();
    
      // 通过OkHttpClient.newCall方法创建一个真实的Call对象
      Response response = client.newCall(request).execute();
      return response.body().string();
    }
    

    那么,首先我们看到OkHttpClient这个类的声明

    public class OkHttpClient implements Cloneable, Call.Factory {
    
        /**
         * Prepares the {@code request} to be executed at some point in the future.
         */
        @Override 
        public Call newCall(Request request) {
            return new RealCall(this, request);
        }
    }
    

    我们发现其实现了Call.Factory,我们可以看到Call这个接口的声明内容:

    public interface Call {
      /** Returns the original request that initiated this call. */
      Request request();
    
      Response execute() throws IOException;
    
      void enqueue(Callback responseCallback);
    
      void cancel();
    
      boolean isExecuted();
    
      boolean isCanceled();
    
      interface Factory {
        Call newCall(Request request);
      }
    }
    

    Call对象就是发起一个请求的实体,Call.Factory顾名思义就是用来生产一个Call对象的。那么我们再看RealCall的execute究竟发生了什么,

    @Override public Response execute() throws IOException {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        try {
           // 使用Dispatcher在当前线程立即执行同步请求任务
          client.dispatcher().executed(this);
           // 执行拦截器链,返回请求的响应结果
          Response result = getResponseWithInterceptorChain();
          if (result == null) throw new IOException("Canceled");
          return result;
        } finally {
          // 使用Dispatcher标记完成同步任务执行
          client.dispatcher().finished(this);
        }
      }
    

    关注到Dispatcher这个类:

    /** 并发请求最大值. */
    private int maxRequests = 64;
    /** 同个域名服务的并发请求最大值. */
    private int maxRequestsPerHost = 5;
    
    /**
     * 异步执行的任务入队处理
     */
    synchronized void enqueue(AsyncCall call) {
        // 异步任务并发总量未超过maxRequests,(默认64,可自定义),并且同域名的异步请求未超过maxRequestsPerHost(默认5,可自定义),则直接执行。否则暂存到readyAsyncCalls队列
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);
          executorService().execute(call);
        } else {
          readyAsyncCalls.add(call);
        }
      }
    
    public synchronized ExecutorService executorService() {
        if (executorService == null) {
          // 执行请求任务的线程池:无核心线程,线程池量是无界(Integer.Max_VALUE可认定为无界),线程空闲保留60s后无任务则销毁,采用同步队列(无存储功能),总结这是一个CachedThreadPool
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
    
    /** Used by {@code Call#execute} to signal it is in-flight. */
    synchronized void executed(RealCall call) {
        // 将同步请求放入同步执行队列中
        runningSyncCalls.add(call);
    }
    

    以及同步请求结束的收尾过程:

    /** Used by {@code Call#execute} to signal completion. */
      void finished(RealCall call) {
        finished(runningSyncCalls, call, false);
      }
    
    private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          if (promoteCalls) promoteCalls();// promoteCalls=true,线程池则会开始执行异步请求任务
          
          // 执行中的请求(不管同步或者异步)数量
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
    
        // 所有同步或异步请求已执行完成,标记空闲,执行空闲任务。有点messageQueue空闲任务的意思
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
      }
    

    我们再仔细的分析getResponseWithInterceptorChain()这个过程发生了什么:

    private Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
    
        // 因为arrayList底层是数组,则会按照添加顺序依次执行。
        List<Interceptor> interceptors = new ArrayList<>();
        // 导入应用上层配置的拦截器列表
        interceptors.addAll(client.interceptors());
        interceptors.add(retryAndFollowUpInterceptor); // 内置,网络请求失败后进行重试处理服务器的重定向发起新的请求,并在条件允许情况下复用当前连接
        interceptors.add(new BridgeInterceptor(client.cookieJar()));// cookie处理
        interceptors.add(new CacheInterceptor(client.internalCache()));// 缓存处理
        interceptors.add(new ConnectInterceptor(client));// 连接创建或复用
        if (!retryAndFollowUpInterceptor.isForWebSocket()) {
          interceptors.addAll(client.networkInterceptors());// 网络拦截器
        }
    
        interceptors.add(new CallServerInterceptor(
            retryAndFollowUpInterceptor.isForWebSocket()));// 访问服务器获取数据
    
    
        // 这个RealInterceptorChain,是启动整个拦截器责任链开始执行的链头节点
        Interceptor.Chain chain = new RealInterceptorChain(
            interceptors, null, null, null, 0, originalRequest);
        return chain.proceed(originalRequest);
      }
    

    RealInterceptorChain.proceed的执行过程如下,属于一个典型的责任链模式,链式调用所有的拦截器。

    @Override public Response proceed(Request request) throws IOException {
        return proceed(request, streamAllocation, httpStream, connection);
      }
    
      public Response proceed(Request request, StreamAllocation streamAllocation, HttpStream httpStream,
          Connection connection) throws IOException {
        if (index >= interceptors.size()) throw new AssertionError();
    
        calls++;
    
        // If we already have a stream, confirm that the incoming request will use it.
        if (this.httpStream != null && !sameConnection(request.url())) {
          throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
              + " must retain the same host and port");
        }
    
        // If we already have a stream, confirm that this is the only call to chain.proceed().
        if (this.httpStream != null && calls > 1) {
          throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
              + " must call proceed() exactly once");
        }
    
        // Call the next interceptor in the chain.
        RealInterceptorChain next = new RealInterceptorChain(
            interceptors, streamAllocation, httpStream, connection, index + 1, request);
        // 通过Interceptor0#intercept(nextChain),intercept内部会调用nextChain#procceed方法,又重新会到这个点 
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
    
        // Confirm that the next interceptor made its required call to chain.proceed().
        if (httpStream != null && index + 1 < interceptors.size() && next.calls != 1) {
          throw new IllegalStateException("network interceptor " + interceptor
              + " must call proceed() exactly once");
        }
    
        // Confirm that the intercepted response isn't null.
        if (response == null) {
          throw new NullPointerException("interceptor " + interceptor + " returned null");
        }
    
        return response;
      }
    

    责任链

    两个设计比较优秀的拦截器详细分析

    1. ConnectInterceptor

    /** Opens a connection to the target server and proceeds to the next interceptor. */
    public final class ConnectInterceptor implements Interceptor {
      public final OkHttpClient client;
    
      public ConnectInterceptor(OkHttpClient client) {
        this.client = client;
      }
    
      @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        StreamAllocation streamAllocation = realChain.streamAllocation();
    
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        HttpStream httpStream = streamAllocation.newStream(client, doExtensiveHealthChecks);
        RealConnection connection = streamAllocation.connection();
    
        return realChain.proceed(request, streamAllocation, httpStream, connection);
      }
    }
    

    2. CacheInterceptor

    @Override public Response intercept(Chain chain) throws IOException {
        // 1. 如果有缓存实现,则取出缓存内容候选
        Response cacheCandidate = cache != null
            ? cache.get(chain.request())
            : null;
    
        long now = System.currentTimeMillis();
        
         // 2. 根据候选cache获取缓存策略
        CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
        // 通过缓存策略计算的网络请求
        Request networkRequest = strategy.networkRequest;
        // 通过缓存策略处理得到的缓存响应数据
        Response cacheResponse = strategy.cacheResponse;
    
        if (cache != null) {
          cache.trackResponse(strategy);
        }
    
        // 缓存数据不能使用,清理此缓存数据
        if (cacheCandidate != null && cacheResponse == null) {
          closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
        }
    
        // If we're forbidden from using the network and the cache is insufficient, fail.
        // 3. 缓存策略告知不进行网络请求,而且没有缓存数据,则返回网络请求错误的结果
        if (networkRequest == null && cacheResponse == null) {
          return new Response.Builder()
              .request(chain.request())
              .protocol(Protocol.HTTP_1_1)
              .code(504) // 网关超时, 无法从服务端获得数据
              .message("Unsatisfiable Request (only-if-cached)")
              .body(EMPTY_BODY)
              .sentRequestAtMillis(-1L)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();
        }
    
        // If we don't need the network, we're done.
        // 4. 如果不进行网络请求,缓存数据依旧可用,则返回缓存数据.
        if (networkRequest == null) {
          return cacheResponse.newBuilder()
               // 响应体设置给cacheResponse属性
              .cacheResponse(stripBody(cacheResponse))
              .build();
        }
    
        // 5. 缓存无效,则继续执行网络请求。
        Response networkResponse = null;
        try {
          // 拦截器内通过调用chain.proceed方法,才能让责任链继续执行。否则就此拦截器直接返回结果
          networkResponse = chain.proceed(networkRequest);
        } finally {
          // If we're crashing on I/O or otherwise, don't leak the cache body.
          if (networkResponse == null && cacheCandidate != null) {
            closeQuietly(cacheCandidate.body());
          }
        }
    
        // If we have a cache response too, then we're doing a conditional get.
        if (cacheResponse != null) {
          // 6. 通过服务端校验,确认无修改,缓存数据可以被使用(返回304),则直接返回缓存数据,并且更新缓存
          if (validate(cacheResponse, networkResponse)) {
            Response response = cacheResponse.newBuilder()
                .headers(combine(cacheResponse.headers(), networkResponse.headers()))
                .cacheResponse(stripBody(cacheResponse))
                .networkResponse(stripBody(networkResponse))
                .build();
            networkResponse.body().close();
    
            // Update the cache after combining headers but before stripping the
            // Content-Encoding header (as performed by initContentStream()).
            cache.trackConditionalCacheHit();
            cache.update(cacheResponse, response);
            return response;
          } else {
            closeQuietly(cacheResponse.body());
          }
        }
    
        // 7. 将网络结果,构造response
        Response response = networkResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
    
        // 对数据进行缓存
        if (HttpHeaders.hasBody(response)) {
          CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
          response = cacheWritingResponse(cacheRequest, response);
        }
    
        return response;
      }
    
    1. 构造
        interceptors.add(new CacheInterceptor(client.internalCache()));
    

    可以看到缓存功能的实现是在OkHttpClient#internalCache()

    InternalCache internalCache() {
        return cache != null ? cache.internalCache : internalCache;
      }
    
    public final class Cache implements Closeable, Flushable {
      private static final int VERSION = 201105;
      private static final int ENTRY_METADATA = 0;
      private static final int ENTRY_BODY = 1;
      private static final int ENTRY_COUNT = 2;
    
      // 这个internalCache对象就是上面所使用的,可以看到其内部使用的都是Cache类的相关API
      final InternalCache internalCache = new InternalCache() {
         /**
          * 获取缓存
          */
        @Override public Response get(Request request) throws IOException {
          return Cache.this.get(request);
        }
        
        /**
         * 缓存存入
         */
        @Override public CacheRequest put(Response response) throws IOException {
          return Cache.this.put(response);
        }
    
         /**
          * 移除缓存
          */
        @Override public void remove(Request request) throws IOException {
          Cache.this.remove(request);
        }
    
        /**
         * 更新缓存
         */
        @Override public void update(Response cached, Response network) {
          Cache.this.update(cached, network);
        }
    
        /**
         * 跟踪一个满足缓存条件的GET请求
         */
        @Override public void trackConditionalCacheHit() {
          Cache.this.trackConditionalCacheHit();
        }
    
        /**
         * 跟踪满足缓存策略CacheStrategy的响应
         */
        @Override public void trackResponse(CacheStrategy cacheStrategy) {
          Cache.this.trackResponse(cacheStrategy);
        }
      };
    
      // 每个缓存API的实现都依赖于DiskLruCache的算法实现
      private final DiskLruCache cache;
      ...
    
    /**
     * 缓存存储实现
     */
    private CacheRequest put(Response response) {
        String requestMethod = response.request().method();
    
        if (HttpMethod.invalidatesCache(response.request().method())) {
          try {
            remove(response.request());
          } catch (IOException ignored) {
            // The cache cannot be written.
          }
          return null;
        }
    
        // 仅针对GET请求做缓存,幂等性
        if (!requestMethod.equals("GET")) {
          // Don't cache non-GET responses. We're technically allowed to cache
          // HEAD requests and some POST requests, but the complexity of doing
          // so is high and the benefit is low.
          return null;
        }
    
        if (HttpHeaders.hasVaryAll(response)) {
          return null;
        }
    
        Entry entry = new Entry(response);
        DiskLruCache.Editor editor = null;
        try {
          editor = cache.edit(urlToKey(response.request()));
          if (editor == null) {
            return null;
          }
          entry.writeTo(editor);
          return new CacheRequestImpl(editor);
        } catch (IOException e) {
          abortQuietly(editor);
          return null;
        }
      }
    }
    

    相关文章

      网友评论

          本文标题:OkHttp3(v3.4.1)剖析

          本文链接:https://www.haomeiwen.com/subject/ousbpxtx.html