Okhttp3源码解析

作者: 求闲居士 | 来源:发表于2019-03-04 11:37 被阅读17次

    一 前言

    Retrofit + Okhttp + RxJava,可以说是现在最火的网络请求组合了,而它们背后的设计模式或设计思想,是它们成功的重大原因之一。

    分析源码,也是为了学习它们的设计思想,提升自己的编码能力。在上篇分析Retrofit的文章中,外观模式,代理模式,装饰器模式,工厂模式,策略模式等,都是在源码解析中加强了认识。

    总体分四步:

    1. 创建OkHttpClient客户端对象
    2. 创建请求消息Request对象。
    3. 创建网络请求的调用封装对象RealCall
    4. 执行网络请求,会获取响应消息Response对象给回调方法

    二 OkHttpClient

    用建造者模式创建对象。和Retrofit一样,主要功能生成默认配置,且属性很多,这时用建造者模式选择需要配置的属性创建对象就方便了许多。

    public OkHttpClient() {
        this(new Builder());
    }
    
    public Builder() {
      dispatcher = new Dispatcher();    //执行网络请求的任务调度器
      protocols = DEFAULT_PROTOCOLS;    //默认的协议 http2 http1.1
      connectionSpecs = DEFAULT_CONNECTION_SPECS; // 设置连接时支持的tls层协议以及不进行数据加密
      eventListenerFactory = EventListener.factory(EventListener.NONE);
      proxySelector = ProxySelector.getDefault();
      cookieJar = CookieJar.NO_COOKIES;
      socketFactory = SocketFactory.getDefault();   // socket生产工厂
      hostnameVerifier = OkHostnameVerifier.INSTANCE;
      certificatePinner = CertificatePinner.DEFAULT;
      proxyAuthenticator = Authenticator.NONE;
      authenticator = Authenticator.NONE;
      connectionPool = new ConnectionPool();    //连接池 支持多路复用
      dns = Dns.SYSTEM;
      followSslRedirects = true;
      followRedirects = true;
      retryOnConnectionFailure = true;  //是否重定向
      connectTimeout = 10_000;      //连接超时时间
      readTimeout = 10_000;     
      writeTimeout = 10_000;
      pingInterval = 0;
    }
    
    

    无参构造器是用Builder为参数创建的,相当于还是用Builder实现对象创建。参数是Builder构造器默认创建的。

    它是在配置全局发送请求中所需要的各种定制化的参数,并且持有各个参数引用对象。

    Dispatcher主要用来管理网络请求的线程池。

    可以看出每个OkHttpClient对象对线程池和连接池都有管理,所有OkHttpClient最好做成用单例模式创建。创建多个OkHttpClient对象会占用更多内存。

    2.1 Dispatcher

    我看网上有说法是使用享元模式,享元共厂内部用线程池实现享元池。用于减少创建对象的数量,以减少内存占用和提高性能。但这里只有一种类型,没有分组对象。

    public final class Dispatcher {
      private int maxRequests = 64;
      private int maxRequestsPerHost = 5;
      private @Nullable Runnable idleCallback;
    
      //请求网络的线程池,懒汉式单例
      private @Nullable ExecutorService executorService;
    
      //准备异步队列,当运行队列满时存储任务
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
    
      //运行的异步队列
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    
      //运行的同步队列
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
      
      //单例获取线程池
      public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
      //异步进行网络请求,AsyncCall是一个Runnable
      synchronized void enqueue(AsyncCall call) {
        //运行时异步队列未超过maxRequests且相同host的请求数量不超过maxRequestsPerHost,加入运行时队列并执行它
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);
          executorService().execute(call);
        } else {
            //加入等待队列
          readyAsyncCalls.add(call);
        }
      }
      //同步网络请求,直接加入同步队列
      synchronized void executed(RealCall call) {
        runningSyncCalls.add(call);
      }
      //网络请求结束后,会将请求结束的Call从运行队列中移除
      private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          //如果是异步队列,还要再根据运行时异步队列和同host的请求数量操控运行时异步队列和等待队列
          if (promoteCalls) promoteCalls();
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
        }
    
        if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
        }
      }
      
      //从准备队列把Call已到运行队列来
      private void promoteCalls() {
        //运行队列已满,则结束
        if (runningAsyncCalls.size() >= maxRequests) return; // 准备队列为空,怎结束
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
          //如果这个Call的同host请求在运行队列中不超过maxRequestsPerHost,则加入运行队列
          if (runningCallsForHost(call) < maxRequestsPerHost) {
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
          }
        //如果运行队列已满,则退出
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
      }
    }
    

    对于异步请求,调用enqueue方法,将要执行的AsyncCall放入运行或等待队列并交予线程池执行,执行完成后,再调用finished来移除请求。

    对于同步请求,调用executed方法,将RealCall放入运行同步队列,在执行完成后调用finished将Call移除队列。

    当为异步任务时,会调用promoteCalls方法。遍历准备队列,如果运行队列未满,且运行队列中同一主机请求不超过5个,则将其从准备队列移入运行队列;否则不移动。

    三 Request

    这个对象是为了配置http请求的请求消息Request,而请求消息分为四部分:请求行(request line)、请求头部(header)、空行和请求数据。

    从请求头的组成就可知Request有哪些配置。

    Request request = new Request.Builder()
        .url(baseUrl + "top250")
        .build();
        
    public static class Builder {
        HttpUrl url;    //请求的url
        String method;  //请求方法
        Headers.Builder headers;    //请求头
        RequestBody body;   //请求数据
        Object tag;
    
        public Builder() {
          this.method = "GET";
          this.headers = new Headers.Builder();
        }
    }
    

    很明显,就是HTTP请求消息Request的组成

    四 RealCall

    okhttp3.Call call = client.newCall(request);
    
    @Override public Call newCall(Request request) {
        return new RealCall(this, request, false /* for web socket */);
    }
    
    RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        final EventListener.Factory eventListenerFactory = client.eventListenerFactory();
    
        this.client = client;
        this.originalRequest = originalRequest;
        this.forWebSocket = forWebSocket;
        this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
    
        // TODO(jwilson): this is unsafe publication and not threadsafe.
        this.eventListener = eventListenerFactory.create(this);
      }
    

    创建的Call对象实际是RealCall实现的,RealCall创建就是初始化了几个属性,OkHttpClient,Request,retryAndFollowUpInterceptor。

    retryAndFollowUpInterceptor是重定向拦截器,和先前的任务调度器Dispatcher一样,是实现网络请求的关键之一。

    五 call.enqueue

    前面都是初始化和配置设置工作,终于到实现网络请求的步骤了,enqueue是上面介绍的RealCall实现的。

    @Override public void enqueue(Callback responseCallback) {
        //只能执行一次
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        captureCallStackTrace();
        //使用dispatcher用线程池执行网络请求
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
      }
      
    final class AsyncCall extends NamedRunnable {
        private final Callback responseCallback;
    
        AsyncCall(Callback responseCallback) {
          super("OkHttp %s", redactedUrl());
          this.responseCallback = responseCallback;
        }
        
         @Override protected void execute() {
          boolean signalledCallback = false;
          try {
          //获取返回值
            Response response = getResponseWithInterceptorChain();
            //对返回值进行回调
            if (retryAndFollowUpInterceptor.isCanceled()) {
              signalledCallback = true;
              responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
              signalledCallback = true;
              responseCallback.onResponse(RealCall.this, response);
            }
          } catch (IOException e) {
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
              responseCallback.onFailure(RealCall.this, e);
            }
          } finally {
            client.dispatcher().finished(this);
          }
        }
    }
    

    call.enqueue中,调用了OkHttpClient的Dispatcher的enqueue,将AsyncCall将于线程池执行,线程真正执行的内容在AsyncCall的execute中。

    通过getResponseWithInterceptorChain获取到网络请求的返回值,那实现网络请求的重点就在getResponseWithInterceptorChain中了。

    Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List<Interceptor> interceptors = new ArrayList<>();
        interceptors.addAll(client.interceptors());
        interceptors.add(retryAndFollowUpInterceptor);//在网络请求失败后进行重试
        interceptors.add(new BridgeInterceptor(client.cookieJar()));//桥接拦截器,对头部信息进行一系列设置
        interceptors.add(new CacheInterceptor(client.internalCache()));//缓存拦截器
        interceptors.add(new ConnectInterceptor(client));//连接拦截器
        if (!forWebSocket) {
          interceptors.addAll(client.networkInterceptors());
        }
        interceptors.add(new CallServerInterceptor(forWebSocket));
    
        Interceptor.Chain chain = new RealInterceptorChain(
            interceptors, null, null, null, 0, originalRequest);
        return chain.proceed(originalRequest);
      }
    

    getResponseWithInterceptorChain里添加了一系列的拦截器,再创建RealInterceptorChain开始通过进行责任链模式实现网络请求。

    而责任链的顺序:

    1. OkHttpClient的addInterceptor的拦截器
    2. BridgeInterceptor
    3. CacheInterceptor
    4. ConnectInterceptor
    5. OkHttpClient的addNetworkInterceptor的拦截器
    6. CallServerInterceptor

    5.1 Interceptor

    5.1.1 RealInterceptorChain

    那从责任链模式模式的角度先来分析它的是如何实现的。这种模式为请求创建了一个接收者对象的链,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,依此类推。

    public final class RealInterceptorChain implements Interceptor.Chain {
    @Override public Response proceed(Request request) throws IOException {
        return proceed(request, streamAllocation, httpCodec, connection);
      }
    
    public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
          RealConnection connection) throws IOException {
        
        // 1.创建下一阶段的链对象
        RealInterceptorChain next = new RealInterceptorChain(
            interceptors, streamAllocation, httpCodec, connection, index + 1, request);
        //2.获取这一阶段链的拦截者
        Interceptor interceptor = interceptors.get(index);
        //3.让这一阶段的拦截者处理请求
        Response response = interceptor.intercept(next);
    
        // 确保下一阶段的拦截者next,执行了chain.proceed()
        if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
          throw new IllegalStateException("network interceptor " + interceptor
              + " must call proceed() exactly once");
        }
    
        // 确保拦截器请求结果不为空
        if (response == null) {
          throw new NullPointerException("interceptor " + interceptor + " returned null");
        }
    
        return response;
      }
    }
    

    在第一次调用chain.proceed(originalRequest)开启责任链处理请求时,index还是0,

    • 在注释中第一步,创建下一阶段的拦截者,index增加了1;
    • 第二步,根据index从拦截器队列中获取下一阶段的拦截者;
    • 第三步,用这一阶段的拦截者,执行拦截获得请求结果

    RealInterceptorChainInterceptor关系就很明了了:RealInterceptorChain的proceed方法中,会创建链的下一阶段RealInterceptorChain对象,做为Interceptor的intercept方法参数,在intercept中会执行chain.proceed()进而到链的下一环。

    简单来讲,proceed方法就是RealInterceptorChain根据index获取响应登记的Interceptor,调用intercept方法时又会调用传入的chain.proceed。

    在proceed方法中,有四个参数:Request,StreamAllocation,HttpCodec,RealConnection。但在RealCall的getResponseWithInterceptorChain中

     Interceptor.Chain chain = new RealInterceptorChain(
            interceptors, null, null, null, 0, originalRequest);
    

    只穿入了Request对象,其它对象则是在后续的拦截者中创建的。下面就根据这几个参数来解析这些的拦截者。

    5.1.2 RetryAndFollowUpInterceptor

    顾名思义,重试与重定向拦截器。也就是在网络请求失败的情况下,会自动进行重连,内部通过while(true)死循环来进行重试获取Response(有重试上限,超过会抛出异常)。

    影响process方法中的StreamAllocation的创建。

    @Override public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();
        //创建streamAllocation对象
        streamAllocation = new StreamAllocation(
            client.connectionPool(), createAddress(request.url()), callStackTrace);
    
        int followUpCount = 0;
        Response priorResponse = null;
        //1.循环获取请求结果
        while (true) {
        Response response = null;
          boolean releaseConnection = true;
          try {
            //2.责任链下一环去处理请求,获取请求结果
            response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
            releaseConnection = false;
          } catch (RouteException e) {
            // 3.通过路由连接的尝试失败,这时请求没有发送出去。判断满足可恢复条件,满足则继续循环重试。
            if (!recover(e.getLastConnectException(), false, request)) {
              throw e.getLastConnectException();
            }
            releaseConnection = false;
            continue;
          } catch (IOException e) {
            // 尝试与服务器通信失败。请求可能已经发送。
            boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
            if (!recover(e, requestSendStarted, request)) throw e;
            releaseConnection = false;
            continue;
          } finally {
            // 抛出异常如果没有被catch,或没有异常发生,释放streamAllocation资源
            if (releaseConnection) {
              streamAllocation.streamFailed(null);
              streamAllocation.release();
            }
          }
          
          //4.分析返回码,303的话需要重定向
          Request followUp = followUpRequest(response);
          //不需要重定向就返回请求结果
          if (followUp == null) {
            if (!forWebSocket) {
              streamAllocation.release();
            }
            return response;
          }
          //5.重定向次数不能超过MAX_FOLLOW_UPS
          if (++followUpCount > MAX_FOLLOW_UPS) {
            streamAllocation.release();
            throw new ProtocolException("Too many follow-up requests: " + followUpCount);
          }
        }
      }
    
    1. 通过循环来获取请求返回值
    2. 责任链叫给下一级获取请求返回值
    3. 请求过程中发生RouteException,IOException异常,则进行重连请求。
    4. 重定向判断,如需要就重定向,不需要就返回请求值给上一级责任链。
    5. 重定向次数不能超过MAX_FOLLOW_UPS,20次。

    5.1.3 BridgeInterceptor

    适配器模式(Adapter Pattern)是作为两个不兼容的接口之间的桥梁。而BridgeInterceptor也是起着这样的作用,它是实现应用层和网络层直接的数据格式编码的桥,用于完善请求头。影响process方法中的Request参数。

    public final class BridgeInterceptor implements Interceptor {
        @Override public Response intercept(Chain chain) throws IOException {
            Request userRequest = chain.request();
            Request.Builder requestBuilder = userRequest.newBuilder();
        
            //1.完善请求头
            RequestBody body = userRequest.body();
            if (body != null) {
                MediaType contentType = body.contentType();
                  if (contentType != null) {
                    requestBuilder.header("Content-Type", contentType.toString());
                  }
                  ...
            }
            ...
            
            //2.责任链下一环,获取请求返回值
            Response networkResponse = chain.proceed(requestBuilder.build());
            
            //3.对返回值的消息报头进行转化
            HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    
            Response.Builder responseBuilder = networkResponse.newBuilder()
                .request(userRequest);
            ...
        }
    }
    

    如注释所写的,分为三步

    1. 完善请求头
    2. 责任链下一环,获取请求返回值
    3. 对返回值的消息报头进行转化

    就是对请求头和消息报头处理。

    5.1.4 CacheInterceptor

    要了解这里缓存如何实现,还是要先了解http缓存机制。但也要注意一点,有些缓存字符意义并不完全一样,这个解析CacheInterceptor代码时介绍。

    5.1.4.1 http缓存机制

    这里就不大篇幅描述,只做简要概述。

    缓存其实可以分成两类,一种是不请求网络读取本地缓存的,一种是请求网络进行对比的缓存。Header中的Expires/Cache-Control属于第一种,Last-Modified / If-Modified-Since,Etag / If-None-Match属于第二种。

    1. 第一种缓存的流程:客户端请求数据,直接从本地缓存返回结果;本地缓存失效则请求网络,再将缓存请求结果。
    2. 第二种缓存流程:客户端请求数据,从缓存获取缓存里header的缓存标示,再请求服务器验证本地缓存是否失效,失效则返回结果并缓存;有效通知客户端缓存有效,客户端读取本地缓存。
    5.1.4.1.1 Expires

    Expires的值为服务端返回的到期时间,即下一次请求时,请求时间小于服务端返回的到期时间,直接使用缓存数据。

    Expires 是HTTP 1.0的东西,现在默认浏览器均默认使用HTTP 1.1,所以它的作用基本忽略。所以HTTP 1.1 的版本,使用Cache-Control替代。

    5.1.4.1.2 Cache-Control

    请求头

    指令 说明
    no-cache 需要使用网络对比缓存来验证缓存数据
    no-store 所有内容都不会缓存
    max-age=[秒] 缓存的内容将在 xxx 秒后失效
    max-stale=[秒] 可接受的最大过期时间
    min-fresh=[秒] 询问再过[秒]时间后资源是否过期,若过期则不返回
    only-if-cached 只获取缓存的资源而不联网获取,如果缓存没有命中,返回504

    响应头还有public,private等,这些在特别情况下如响应状态码为302,307等情况作用缓存响应消息。

    5.1.4.1.3 Last-Modified / If-Modified-Since
    • Last-Modified在响应请求时,告诉浏览器资源的最后修改时间;
    • If-Modified-Since在请求服务时,通过此字段通知服务器上次请求时间(上次响应头的Last-Modified值),服务器比较其 与被请求资源的最后修改时间进行比对,若修改时间大于其,则返回所有资源和状态码200;修改时间小于其,说明资源没有修改,则响应HTTP304,告诉客户端使用缓存。
    5.1.4.1.4 Etag / If-None-Match(优先级高于Last-Modified / If-Modified-Since)
    • Etag:在响应请求头中,存储当前资源在服务器的唯一标识
    • If-None-Match:在请求头中,存储服务器客户段缓存数据的唯一标识。服务器会比对其和被请求资源的唯一标识,如果不同,则返回被请求资源和状态码200;如果相同,说明资源没有修改,则响应HTTP 304,告诉客户端使用缓存。

    5.1.4.2 CacheInterceptor

    OKHttp的缓存策略,就是实现Http的缓存策略。

    public final class CacheInterceptor implements Interceptor {
     @Override public Response intercept(Chain chain) throws IOException {
        //1.如果配置了缓存,获取同一请求的缓存Response
        Response cacheCandidate = cache != null
            ? cache.get(chain.request())
            : null;
    
        long now = System.currentTimeMillis();
        //2.根据请求头和缓存的响应,生成缓存策略
        //也就是对请求头和响应头中缓存相关的标示生成策略,看用哪种HTTP缓存方式
        CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
        //网络请求,为空则不请求网络
        Request networkRequest = strategy.networkRequest;
        //缓存,为空则不获取缓存
        Response cacheResponse = strategy.cacheResponse;
    
        //记录网络请求或缓存结果获取的次数
        if (cache != null) {
          cache.trackResponse(strategy);
        }
        //如果有缓存这次请求却不用缓存,则释放缓存
        if (cacheCandidate != null && cacheResponse == null) {
          closeQuietly(cacheCandidate.body()); 
        }
    
        // 3.如果网络没有且无法从缓存中获取结果,返回504错误
        if (networkRequest == null && cacheResponse == null) {
          return new Response.Builder()
              .request(chain.request())
              .protocol(Protocol.HTTP_1_1)
              .code(504)
              .message("Unsatisfiable Request (only-if-cached)")
              .body(Util.EMPTY_RESPONSE)
              .sentRequestAtMillis(-1L)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();
        }
    
        // 4.如果不需要网络请求,缓存结果有,则使用缓存获取结果
        if (networkRequest == null) {
          return cacheResponse.newBuilder()
              .cacheResponse(stripBody(cacheResponse))
              .build();
        }
    
        //5.走到这步,则说明有网络请求。通过责任链下一环获取响应值。
        Response networkResponse = null;
        try {
          networkResponse = chain.proceed(networkRequest);
        } finally {
          // 如果网络请求失败且缓存结果不为空,则释放缓存
          if (networkResponse == null && cacheCandidate != null) {
            closeQuietly(cacheCandidate.body());
          }
        }
    
        // 6.,缓存结果不为空,且网络请求返回304,则读取本地缓存
        if (cacheResponse != null) {
          if (networkResponse.code() == HTTP_NOT_MODIFIED) {
          //合并网络请求返回结果和缓存结果,主要是合并网络请求结果的header
            Response response = cacheResponse.newBuilder()
                .headers(combine(cacheResponse.headers(), networkResponse.headers()))
                .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
                .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
                .cacheResponse(stripBody(cacheResponse))
                .networkResponse(stripBody(networkResponse))
                .build();
            networkResponse.body().close();
    
            // 6.1 将结果集更新到缓存中
            cache.trackConditionalCacheHit();
            cache.update(cacheResponse, response);
            return response;
          } else {
            closeQuietly(cacheResponse.body());
          }
        }
    
        //7.走到这,说明不需要使用缓存,则直接使用网络结果
        Response response = networkResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        //8.如果有缓存,经过服务器校验缓存过期了
        if (cache != null) {
        //如果网络请求结果有响应正文,且根据请求和响应的Header判断需要保存缓存,就保存
          if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
            // 将网络结果保存到缓存中
            CacheRequest cacheRequest = cache.put(response);
            return cacheWritingResponse(cacheRequest, response);
          }
        //不需要保存缓存,且是POST,PATCH,PUT,DELETE,MOVE中的请求方法,则删除缓存
          if (HttpMethod.invalidatesCache(networkRequest.method())) {
            try {
              cache.remove(networkRequest);
            } catch (IOException ignored) {
              // The cache cannot be written.
            }
          }
        }
        //9.走到这里,说明网络结果过程都没用到缓存,是无缓存操作
        return response;
      }
    }
    

    这个方法,其实就是实现Http缓存,大部分可以按HTTP缓存来理解它。Header里缓存字段如何配置,决定了缓存要如何处理。这些上面介绍Http缓存时已经说过,就不再赘述。

    说几个要注意的地方

    • 要把网络请求结果缓存下来,要响应正文不为空,且请求头和响应头的缓存字段Cache-Control的值都不为no-cache
    • Cache对象需要我们在OKHttpClient中配置好,Cache主要有put方法和get方法,通过DiskLruCache算法写入,读取缓存,用Entry类来存储所有的缓存信息。

    但缓存实现也有与Http缓存介绍不同的地方,这需要到CacheStrategy中分析。

    5.1.4.3 CacheStrategy

    缓存策略的生成,它是通过工厂模式创建,和Retrofit的字符转换器网络适配器生成几乎一样的行式:创建工厂在调用get创建。

    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    

    先分析Factory

    public static class Factory {
        public Factory(long nowMillis, Request request, Response cacheResponse) {
          this.nowMillis = nowMillis;
          this.request = request;
          this.cacheResponse = cacheResponse;
          //将缓存结果的header分析,将缓存相关字符分离出来
          if (cacheResponse != null) {
            this.sentRequestMillis = cacheResponse.sentRequestAtMillis();
            this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis();
            Headers headers = cacheResponse.headers();
            for (int i = 0, size = headers.size(); i < size; i++) {
              String fieldName = headers.name(i);
              String value = headers.value(i);
              if ("Date".equalsIgnoreCase(fieldName)) {
                servedDate = HttpDate.parse(value);
                servedDateString = value;
              } else if ("Expires".equalsIgnoreCase(fieldName)) {
                expires = HttpDate.parse(value);
              } else if ("Last-Modified".equalsIgnoreCase(fieldName)) {
                lastModified = HttpDate.parse(value);
                lastModifiedString = value;
              } else if ("ETag".equalsIgnoreCase(fieldName)) {
                etag = value;
              } else if ("Age".equalsIgnoreCase(fieldName)) {
                ageSeconds = HttpHeaders.parseSeconds(value, -1);
              }
            }
          }
        }
    }
    

    主要作用就是将缓存结果头中的缓存字符分离出来,为后面的缓存策略生成做准备。

    public CacheStrategy get() {
          CacheStrategy candidate = getCandidate();
          //如果请求头有only-if-cached,说明只用缓存,但缓存又失效了,网络缓存都不能用
          if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
            return new CacheStrategy(null, null);
          }
    
          return candidate;
        }
    

    对only-if-cached字符加了道验证,策略实现的主要方法是getCandidate

        private CacheStrategy getCandidate() {
          // 1.没有缓存结果,创建的策略只用网络请求
          if (cacheResponse == null) {
            return new CacheStrategy(request, null);
          }
    
          // 2.当请求的协议是https的时候,如果cache没有TLS握手就丢弃缓存,只用网络请求
          if (request.isHttps() && cacheResponse.handshake() == null) {
            return new CacheStrategy(request, null);
          }
    
          // 3.cacheResponse和request的header的Cache-Control都要不是no-store,否则只用网络请求
          if (!isCacheable(cacheResponse, request)) {
            return new CacheStrategy(request, null);
          }
        
          //4.如果请求头里的Cache-Control值是no-cache或有If-Modified-Since,If-None-Match,则只用网络请求
          CacheControl requestCaching = request.cacheControl();
          if (requestCaching.noCache() || hasConditions(request)) {
            return new CacheStrategy(request, null);
          }
         
          //5.缓存有效性校验,根据缓存的缓存时间,缓存可接受最大过期时间等等HTTP协议上的规范来判断缓存是否可用。
          //这里是第一种不请求网络只用本地缓存的策略
          long ageMillis = cacheResponseAge();
          long freshMillis = computeFreshnessLifetime();
    
          if (requestCaching.maxAgeSeconds() != -1) {
            freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds()));
          }
    
          long minFreshMillis = 0;
          if (requestCaching.minFreshSeconds() != -1) {
            minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds());
          }
    
          long maxStaleMillis = 0;
          CacheControl responseCaching = cacheResponse.cacheControl();
          if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) {
            maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds());
          }
    
          if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
            Response.Builder builder = cacheResponse.newBuilder();
            if (ageMillis + minFreshMillis >= freshMillis) {
              builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"");
            }
            long oneDayMillis = 24 * 60 * 60 * 1000L;
            if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
              builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"");
            }
            return new CacheStrategy(null, builder.build());
          }
          
          // 6.这里是看请求是否为缓存网络比对。
          String conditionName;
          String conditionValue;
          if (etag != null) {
            conditionName = "If-None-Match";
            conditionValue = etag;
          } else if (lastModified != null) {
            conditionName = "If-Modified-Since";
            conditionValue = lastModifiedString;
          } else if (servedDate != null) {
            conditionName = "If-Modified-Since";
            conditionValue = servedDateString;
          } else {
            //7.如果比对缓存也不是,那只有常规的网络请求了
            return new CacheStrategy(request, null); 
          }
    
          Headers.Builder conditionalRequestHeaders = request.headers().newBuilder();
          Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue);
    
          Request conditionalRequest = request.newBuilder()
              .headers(conditionalRequestHeaders.build())
              .build();
          //7.返回第二种网络比对的缓存策略
          return new CacheStrategy(conditionalRequest, cacheResponse);
        }
    

    CacheControl是request和Response中为辅助缓存处理的类,通过分解header的缓存字符来创建。

    其实创建的策略就三种,

    • 常规网络请求;只有networkRequest参数
    • 只获取本地缓存结果,不用网络请求;只有cacheResponse参数
    • 网络比对缓存获取结果的。networkRequest和cacheResponse都有

    only-if-cached不能单独使用,它只是限制网络请求,缓存有效的判断还是要另外设置。

    缓存策略与Http缓存不同的点:

    • CacheStrategy缓存策略,就是根据缓存字段来实现缓存算法。但有一点,request的header里包含If-Modified-Since或If-None-Match或no-cache,会直接走网络请求而不走缓存。这两个字符是要在缓存结果里有才会走缓存,这与上面介绍的HTTP缓存字段不同。具体看下面分析。
    • 当只用本地缓存时,Http缓存会在本地缓存失效后再请求网络,而OKhttp只会请求本地缓存,不会再请求网络

    5.1.5 ConnectInterceptor

    解析完缓存,责任链下一环就是ConnectInterceptor了,它通过StreamAllocation获取到HttpCodec和RealConnection。

    @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        //这是RetryAndFollowUpInterceptor创建并传入的
        StreamAllocation streamAllocation = realChain.streamAllocation();
    
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        // HTTP/1.1生成Http1xStream;HTTP/2 生成Http2xStream ,实际上是通过RealConnection生成的
        HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
        //从连接池中获取可用连接
        RealConnection connection = streamAllocation.connection();
    
        return realChain.proceed(request, streamAllocation, httpCodec, connection);
      }
      
      public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
        try {
        //1.获取连接
          RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
              writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
        //通过连接获取HttpCodec
          HttpCodec resultCodec = resultConnection.newCodec(client, this);
    
          synchronized (connectionPool) {
            codec = resultCodec;
            return resultCodec;
          }
        } catch (IOException e) {
          throw new RouteException(e);
        }
      }
    

    HttpCodec里实际是利用Okio来实现读写。

    先来看如何获取RealConnection吧

    5.1.5.1 RealConnection的获取

    findHealthyConnection -> findConnection

      private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
          boolean connectionRetryEnabled) throws IOException {
        Route selectedRoute;
        //connectionPool 连接池
        synchronized (connectionPool) {
        
          // 1.复用已有连接
          RealConnection allocatedConnection = this.connection;
          //已有连接是否可用
          if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
            return allocatedConnection;
          }
    
          // 2.从连接池中获取连接,赋值给了connection
          Internal.instance.get(connectionPool, address, this, null);
          //连接池获到了connection,就使用
          if (connection != null) {
            return connection;
          }
    
          selectedRoute = route;
        }
    
        // 如果需要路由,就获取一个,里面有代理信息
        if (selectedRoute == null) {
          selectedRoute = routeSelector.next();
        }
    
        RealConnection result;
        synchronized (connectionPool) {
          if (canceled) throw new IOException("Canceled");
    
          // 3.加上路由再从连接池获取连接一次
          // This could match due to connection coalescing.
          Internal.instance.get(connectionPool, address, this, selectedRoute);
          if (connection != null) return connection;
    
          // 4.自己创建连接
          // 用一个异步cancel()来中断我们将要进行的握手。
          route = selectedRoute;
          refusedStreamCount = 0;
          result = new RealConnection(connectionPool, selectedRoute);
          acquire(result);
        }
    
        // 这里初始化初始化了BufferedSource和BufferedSink,与创建HttpCodec有关。
        result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
        routeDatabase().connected(result.route());
    
        Socket socket = null;
        synchronized (connectionPool) {
          // 5.将新建的连接放入连接池
          Internal.instance.put(connectionPool, result);
    
          // If another multiplexed connection to the same address was created concurrently, then
          // release this connection and acquire that one.
          if (result.isMultiplexed()) {
            socket = Internal.instance.deduplicate(connectionPool, address, this);
            result = connection;
          }
        }
        closeQuietly(socket);
    
        return result;
      }
    

    RealConnection的获取,总得来说有四个优先级:

    1. StreamAllocation中已有的connection。
    2. 无需路由参数从连接池获取的connection。
    3. 有路由参数从连接池获取的connection。
    4. 自己创建的connection

    RealConnection的connect方法里,调用connectSocket将实现网络请求的Socket和用Okio实现了读写,也就是说请求消息和响应消息都通过Okio来实现读写了,实现读写的关键BufferedSourceBufferedSink在这里初始化,也实现HttpCodec功能的重要参数。

    对于从连接池获取connection,分析下连接池ConnectionPool就能明白怎么实现的了。

    5.1.5.2 ConnectionPool

    既然是连接池,那我们首要关心的是如果存储连接,和如何管理连接。

    private final Deque<RealConnection> connections = new ArrayDeque<>();
    

    在ConnectionPool中,是用双向队列Deque来存储连接。

    再看放入连接方法

    void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        //进行连接池清理
        if (!cleanupRunning) {
          cleanupRunning = true;
          executor.execute(cleanupRunnable);
        }
        //放入队列
        connections.add(connection);
      }
    

    在每次放入新的连接时,先清理存放的连接,再存放连接。

    //连接存放的最大数量,默认为5
    private final int maxIdleConnections;
    //空闲连接存活时间,最多五分钟
      private final long keepAliveDurationNs;
    //用线程池来进行连接清理
    private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
          Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
     private final Runnable cleanupRunnable = new Runnable() {
        @Override public void run() {
          while (true) {
          //cleanup就是根据设置的存活条件,进行连接清理
            long waitNanos = cleanup(System.nanoTime());
            if (waitNanos == -1) return;
            if (waitNanos > 0) {
              long waitMillis = waitNanos / 1000000L;
              waitNanos -= (waitMillis * 1000000L);
              synchronized (ConnectionPool.this) {
                try {
                  ConnectionPool.this.wait(waitMillis, (int) waitNanos);
                } catch (InterruptedException ignored) {
                }
              }
            }
          }
        }
      };
    

    在OKHttp3 的默认实现中,这些连接中最多只能存在 5 个空闲连接,空闲连接最多只能存活 5 分钟。

    @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
        assert (Thread.holdsLock(this));
        for (RealConnection connection : connections) {
        //根据传入的地址和路由,判断连接是否合格
          if (connection.isEligible(address, route)) {
            //acquire方法赋值连接给streamAllocation的connection
            streamAllocation.acquire(connection);
            return connection;
          }
        }
        return null;
      }
      //streamAllocation的acquire方法
      public void acquire(RealConnection connection) {
        assert (Thread.holdsLock(connectionPool));
        if (this.connection != null) throw new IllegalStateException();
        //进行赋值
        this.connection = connection;
        //用allocations来记录每个使用了connection的streamAllocation对象
        connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
      }
    

    5.1.6 CallServerInterceptor

    这里就是执行网络请求的拦截器了。

    @Override public Response intercept(Chain chain) throws IOException {
        //责任链
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        //从ConnectInterceptor中获取的httpCodec
        HttpCodec httpCodec = realChain.httpStream();
        //从retryAndFollowUpInterceptor获取的streamAllocation
        StreamAllocation streamAllocation = realChain.streamAllocation();
        //从ConnectInterceptor中获取的connection
        RealConnection connection = (RealConnection) realChain.connection();
        //在BridgeInterceptor进行组装Request的header
        Request request = realChain.request();
    
        long sentRequestMillis = System.currentTimeMillis();
        //将请求头放入
        httpCodec.writeRequestHeaders(request);
    
        Response.Builder responseBuilder = null;
        //对请求消息的请求数据不为空的类型进行操作
        //操作是对头包含“Expect: 100-continue”的请求,特别处理
        if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
          //对头包含“Expect: 100-continue”的请求,先询问服务器是否愿意接受数据
          if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            //发送请求询问
            httpCodec.flushRequest();
            //获取返回值,如果code为100,则responseBuilder为空,表示服务器原因接收
            responseBuilder = httpCodec.readResponseHeaders(true);
          }
            
          if (responseBuilder == null) {
            // 通过请求消息,获得http的主体,httpCodec的内部类
            Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
            //获取RealBufferedSink对象,和RealConnection中创建的BufferedSink是同一个类
            //相当于迭代执行请求消息写入
            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
            //将请求消息通过BufferedSink迭代写入,bufferedRequestBody->requestBodyOut->httpCodec.sink->Okio.sink->AsyncTimeout.sink->sink
            //最后一个sink是与Socket关联的类,是Okio的静态内部类
            request.body().writeTo(bufferedRequestBody);
            //迭代关闭BufferedSink
            bufferedRequestBody.close();
          } else if (!connection.isMultiplexed()) {
            // "Expect: 100-continue"询问结果为不接受body,所以关闭请求
            streamAllocation.noNewStreams();
          }
        }
        //将请求消息写入socket,socket发送请求消息;没有发送请求就会正常发送,发送过的就什么都不执行
        httpCodec.finishRequest();
        
        if (responseBuilder == null) {
            //读取响应头
          responseBuilder = httpCodec.readResponseHeaders(false);
        }
    
        //根据响应头先创建Response
        Response response = responseBuilder
            .request(request)
            .handshake(streamAllocation.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
    
        int code = response.code();
        if (forWebSocket && code == 101) {
          // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
          response = response.newBuilder()
              .body(Util.EMPTY_RESPONSE)
              .build();
        } else {
        //将响应正文加入response,openResponseBody方法里会用Okio读取正文。
          response = response.newBuilder()
              .body(httpCodec.openResponseBody(response))
              .build();
        }
    
        if ("close".equalsIgnoreCase(response.request().header("Connection"))
            || "close".equalsIgnoreCase(response.header("Connection"))) {
          streamAllocation.noNewStreams();
        }
    
        if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
          throw new ProtocolException(
              "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
        }
    
        return response;
      }
    

    对于网络请求真正的实现,还是利用Okio对Socket的读写操作进行封装,例如有请求数据时的Okio写入流程:

    bufferedRequestBody->requestBodyOut->httpCodec.sink->Okio.sink->AsyncTimeout.sink->sink

    最后一个sink是与Socket关联的类,是Okio的静态内部类

    6.总结

    总体流程: OKHttp流程.png
    • OkHttpClient对象对线程池和连接池都有管理,所有OkHttpClient最好做成用单例模式创建。创建多个OkHttpClient对象会占用更多内存。
    • Dispatcher管理线程池;ConnectionPool是连接池管理,这些连接中最多只能存在 5 个空闲连接,空闲连接最多只能存活 5 分钟。。

    它们都用双向队列来存储管理数据。Dispatcher中有三个队列:Deque<AsyncCall> readyAsyncCalls,Deque<AsyncCall> runningAsyncCalls,Deque<RealCall> runningSyncCalls。ConnectionPool中有Deque<RealConnection> connections。

    • Request用来配置请求消息。
    • okhttp3.Call是一个接口规定了需要执行的几个行为,具体的实现类有RealCall和AyncCall;RealCall中初始化了一些参数,主要是拦截器,异步请求由dispatcher执行,同步异步执行内容都是用责任链来实现网络请求。RealInterceptorChain与Interceptor是实现的关键,相互迭代调用。
    • RetryAndFollowUpInterceptor,重试与重定向拦截器,创建了streamAllocation对象
    • BridgeInterceptor,用适配器模式思想实现应用层和网络层直接的数据格式编码的桥,用于完善请求头。
    • CacheInterceptor,根据缓存结果和请求数据来策略实现缓存Http缓存功能。

    缓存策略与Http缓存不同的点:
    CacheStrategy缓存策略,就是根据缓存字段来实现缓存算法。但有一点,request的header里包含If-Modified-Since或If-None-Match或no-cache,会直接走网络请求而不走缓存。这两个字符是要在缓存结果里有才会走缓存,这与上面介绍的HTTP缓存字段不同。具体看下面分析。
    当只用本地缓存时,Http缓存会在本地缓存失效后再请求网络,而OKhttp只会请求本地缓存,不会再请求网络。

    • ConnectInterceptor,它通过StreamAllocation获取到HttpCodec和RealConnection。在创建RealConnection时,也初始化初始化了BufferedSource和BufferedSink,与创建HttpCodec有关。

    BufferedSource和BufferedSink是Okio连接网络的Socket实现其读写,RealConnection创建HttpCodec来统一管理网络请求的输入输出。例如有请求数据时的Okio写入流程:
    bufferedRequestBody->requestBodyOut->httpCodec.sink->Okio.sink->AsyncTimeout.sink->sink->socket.getOutputStream()

    • CallServerInterceptor,网络请求的拦截器,责任链最后一层。利用httpCodec将请求数据发送网络,实际上是利用Okio将请求数据写入Socket,请求网络。

    7.参考

    关于HTTP协议,一篇就够了

    http缓存浅谈

    彻底弄懂HTTP缓存机制及原理

    HTTP/1.1 协议Expect: 100-continue

    HTTP状态码

    Okhttp3基本使用

    Okhttp基本用法和流程分析

    OkHttp3源码和设计模式-1

    OKHTTP3源码2-连接池管理

    OkHttp3源码详解(二整体流程)

    okhttp3源码的使用详解

    OkHttp3源码解析

    在 Retrofit 和 OkHttp 中使用网络缓存,提高访问效率

    相关文章

      网友评论

        本文标题:Okhttp3源码解析

        本文链接:https://www.haomeiwen.com/subject/buqbuqtx.html