美文网首页Andorid
okhttp3 源码详细解析

okhttp3 源码详细解析

作者: 萨达哈鲁酱 | 来源:发表于2019-03-05 17:04 被阅读0次

    前言

    OkHttp是一个非常优秀的网络请求框架。目前比较流行的Retrofit也是默认使用OkHttp的。所以OkHttp的源码是一个不容错过的学习资源。


    基本使用

    从使用方法出发,首先是怎么使用,其次是我们使用的功能在内部是如何实现的.源码地址

    OkHttpClient client = new OkHttpClient();
    
    String run(String url) throws IOException {
      Request request = new Request.Builder()
          .url(url)
          .build();
    
      Response response = client.newCall(request).execute();
      return response.body().string();
    }
    

    Request、Response、Call 基本概念

    上面的代码中涉及到几个常用的类:Request、Response和Call。下面分别介绍:

    Request

    每一个HTTP请求包含一个URL、一个方法(GET或POST或其他)、一些HTTP头。请求还可能包含一个特定内容类型的数据类的主体部分。

    Response

    响应是对请求的回复,包含状态码、HTTP头和主体部分。

    Call

    OkHttp使用Call抽象出一个满足请求的模型,尽管中间可能会有多个请求或响应。执行Call有两种方式,同步或异步

    简介

    这里写图片描述

    在早期的版本中,OkHttp支持Http1.0,1.1,SPDY协议,但是Http2协议的问世,导致OkHttp也做出了改变,OkHttp鼓励开发者使用HTTP2,不再对SPDY协议给予支持。另外,新版本的OkHttp还有一个新的亮点就是支持WebScoket,这样我们就可以非常方便的建立长连接了。

    作为一个优秀的网络框架,OkHttp同样支持网络缓存,OkHttp的缓存基于DiskLruCache,DiskLruCache虽然没有被收入到Android的源码中,但也是谷歌推荐的一个优秀的缓存框架。有时间可以自己学习源码,这里不再叙述。

    在安全方便,OkHttp目前支持了如上图所示的TLS版本,以确保一个安全的Socket连接。
    重试及重定向就不再说了,都知道什么意思,左上角给出了各浏览器或Http版本支持的重试或重定向次数。


    源码分析

    OKHttpClient

    首先,我们生成了一个OKHttpClient对象,注意OKHttpClient对象的构建是用Builder(构建者)模式来构建的。

    public OkHttpClient() {
        this(new Builder());
      }
    public Builder() {
          dispatcher = new Dispatcher();
          protocols = DEFAULT_PROTOCOLS;
          connectionSpecs = DEFAULT_CONNECTION_SPECS;
          eventListenerFactory = EventListener.factory(EventListener.NONE);
          proxySelector = ProxySelector.getDefault();
          cookieJar = CookieJar.NO_COOKIES;
          socketFactory = SocketFactory.getDefault();
          hostnameVerifier = OkHostnameVerifier.INSTANCE;
          certificatePinner = CertificatePinner.DEFAULT;
          proxyAuthenticator = Authenticator.NONE;
          authenticator = Authenticator.NONE;
          connectionPool = new ConnectionPool();
          dns = Dns.SYSTEM;
          followSslRedirects = true;
          followRedirects = true;
          retryOnConnectionFailure = true;
          connectTimeout = 10_000;
          readTimeout = 10_000;
          writeTimeout = 10_000;
          pingInterval = 0;
        }
    

    可以看到我们简单的一句new OkHttpClient(),OkHttp就已经为我们做了很多工作,很多我们需要的参数在这里都获得默认值。各字段含义如下:

    • dispatcher:直译就是调度器的意思。主要作用是通过双端队列保存Calls(同步&异步Call),同时在线程池中执行异步请求。后面会详细解析该类。
    • protocols:默认支持的Http协议版本 -- Protocol.HTTP_2, Protocol.HTTP_1_1;
    • connectionSpecs:OKHttp连接(Connection)配置 -- ConnectionSpec.MODERN_TLS, -ConnectionSpec.CLEARTEXT,我们分别看一下:
    /** TLS 连接 */ 
    public static final ConnectionSpec MODERN_TLS = new Builder(true)
          .cipherSuites(APPROVED_CIPHER_SUITES)
          .tlsVersions(TlsVersion.TLS_1_3, TlsVersion.TLS_1_2, TlsVersion.TLS_1_1, TlsVersion.TLS_1_0)
          .supportsTlsExtensions(true)
          .build();
    /** 未加密、未认证的Http连接. */
     public static final ConnectionSpec CLEARTEXT = new Builder(false).build();
    

    可以看出一个是针对TLS连接的配置,一个是针对普通的Http连接的配置;

    • eventListenerFactory :一个Call的状态监听器,注意这个是okhttp新添加的功能,目前还不是最终版,在后面的版本中会发生改变的。
    • proxySelector :使用默认的代理选择器;
    • cookieJar:默认是没有Cookie的;
    • socketFactory:使用默认的Socket工厂产生Socket;
    • hostnameVerifier、 certificatePinner、 proxyAuthenticator、 authenticator:安全相关的设置;
    • connectionPool :连接池;后面会详细介绍;
    • dns:这个一看就知道,域名解析系统 domain name -> ip address;
    • pingInterval :这个就和WebSocket有关了。为了保持长连接,我们必须间隔一段时间发送一个ping指令进行保活;

    RealCall

    在我们定义了请求对象request之后,我们需要生成一个Call对象,该对象代表了一个准备被执行的请求。Call是可以被取消的。Call对象代表了一个request/response 对(Stream).还有就是一个Call只能被执行一次。执行同步请求,代码如下(RealCall的execute方法):

    
    @Override public Response execute() throws IOException {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        captureCallStackTrace();
        eventListener.callStart(this);
        try {
          client.dispatcher().executed(this);
          Response result = getResponseWithInterceptorChain();
          if (result == null) throw new IOException("Canceled");
          return result;
        } catch (IOException e) {
          eventListener.callFailed(this, e);
          throw e;
        } finally {
          client.dispatcher().finished(this);
        }
      }
    

    首先如果executed等于true,说明已经被执行,如果再次调用执行就抛出异常。这说明了一个Call只能被执行。注意此处同步请求与异步请求生成的Call对象的区别,执行
    异步请求代码如下(RealCall的enqueue方法):

    
    @Override public void enqueue(Callback responseCallback) {
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        captureCallStackTrace();
        eventListener.callStart(this);
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
      }
    

    可以看到同步请求生成的是RealCall对象,而异步请求生成的是AsyncCall对象。AsyncCall说到底其实就是Runnable的子类。
    接着上面继续分析,如果可以执行,则对当前请求添加监听器等操作,然后将请求Call对象放入调度器Dispatcher中。最后由拦截器链中的各个拦截器来对该请求进行处理,返回最终的Response。

    Dispatcher -- 调度器

    Dispatcher是保存同步和异步Call的地方,并负责执行异步AsyncCall。

    这里写图片描述
    public final class Dispatcher {
      /** 最大并发请求数为64 */
      private int maxRequests = 64;
      /** 每个主机最大请求数为5 */
      private int maxRequestsPerHost = 5;
    
      /** 线程池 */
      private ExecutorService executorService;
    
      /** 准备执行的请求 */
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
    
      /** 正在执行的异步请求,包含已经取消但未执行完的请求 */
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    
      /** 正在执行的同步请求,包含已经取消单未执行完的请求 */
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    

    针对同步请求,Dispatcher使用了一个Deque保存了同步任务;针对异步请求,Dispatcher使用了两个Deque,一个保存准备执行的请求,一个保存正在执行的请求,为什么要用两个呢?因为Dispatcher默认支持最大的并发请求是64个,单个Host最多执行5个并发请求,如果超过,则Call会先被放入到readyAsyncCall中,当出现空闲的线程时,再将readyAsyncCall中的线程移入到runningAsynCalls中,执行请求。先看Dispatcher的流程,跟着流程读源码:

    在OkHttp,使用如下构造了单例线程池

    public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
    

    构造一个线程池ExecutorService:

    executorService = new ThreadPoolExecutor(
    //corePoolSize 最小并发线程数,如果是0的话,空闲一段时间后所有线程将全部被销毁
        0, 
    //maximumPoolSize: 最大线程数,当任务进来时可以扩充的线程最大值,当大于了这个值就会根据丢弃处理机制来处理
        Integer.MAX_VALUE, 
    //keepAliveTime: 当线程数大于corePoolSize时,多余的空闲线程的最大存活时间
        60, 
    //单位秒
        TimeUnit.SECONDS,
    //工作队列,先进先出
        new SynchronousQueue<Runnable>(),   
    //单个线程的工厂         
       Util.threadFactory("OkHttp Dispatcher", false));
    

    可以看出,在Okhttp中,构建了一个核心为[0, Integer.MAX_VALUE]的线程池,它不保留任何最小线程数,随时创建更多的线程数,当线程空闲时只能活60秒,它使用了一个不存储元素的阻塞工作队列,一个叫做”OkHttp Dispatcher”的线程工厂。

    也就是说,在实际运行中,当收到10个并发请求时,线程池会创建十个线程,当工作完成后,线程池会在60s后相继关闭所有线程。

    synchronized void enqueue(AsyncCall call) {
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);
          executorService().execute(call);
        } else {
          readyAsyncCalls.add(call);
        }
      }
    

    可以看到如果正在执行的请求总数<=64 && 单个Host正在执行的请求<=5,则将请求加入到runningAsyncCalls集合中,紧接着就是利用线程池执行该请求,否则就将该请求放入readyAsyncCalls集合中。上面我们已经说了,AsyncCall是Runnable的子类(间接),因此,在线程池中最终会调用AsyncCall的execute()方法执行异步请求:

     @Override protected void execute() {
          boolean signalledCallback = false;
          try {
            Response response = getResponseWithInterceptorChain();//拦截器链
            if (retryAndFollowUpInterceptor.isCanceled()) {//重试失败,回调onFailure方法
              signalledCallback = true;
              responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
              signalledCallback = true;
              responseCallback.onResponse(RealCall.this, response);
            }
          } catch (IOException e) {
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
              eventListener.callFailed(RealCall.this, e);
              responseCallback.onFailure(RealCall.this, e);
            }
          } finally {
            client.dispatcher().finished(this);//结束
          }
        }
    

    此处的执行逻辑和同步的执行逻辑基本相同,区别在最后一句代码:client.dispatcher().finished(this);因为这是一个异步任务,所以会调用另外一个finish方法:

    void finished(AsyncCall call) {
        finished(runningAsyncCalls, call, true);
      }
    
    private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
        int runningCallsCount;
        Runnable idleCallback;
        synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");//将请求移除集合
          if (promoteCalls) promoteCalls();
         ...
        }
     
       ...
      }
    

    可以看到最后一个参数是true,这意味着需要执行promoteCalls方法:

    
    private void promoteCalls() {
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
     
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
     
          if (runningCallsForHost(call) < maxRequestsPerHost) {
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
          }
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
      }
    

    该方法主要是遍历执行readyRunningCalls集合中待执行的请求,当然前提是正在执行的Call总数没有超过64,并且readyAsyncCalls集合不为空。如果readyAsyncCalls集合为空,则意味着请求差不多都执行了。放入runningAsyncCalls集合中的请求会继续走上述的流程,直到全部的请求被执行。

    InterceptorChain(拦截器链)

    在介绍RealCall 中源码的时候

    client.dispatcher().executed(this);
    Response result = getResponseWithInterceptorChain();
    

    上面已经介绍了分发器Dispatcher
    下面就介绍核心重点 getResponseWithInterceptorChain:

    Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List<Interceptor> interceptors = new ArrayList<>();
        interceptors.addAll(client.interceptors());
        interceptors.add(retryAndFollowUpInterceptor);
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        interceptors.add(new CacheInterceptor(client.internalCache()));
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
          interceptors.addAll(client.networkInterceptors());
        }
        interceptors.add(new CallServerInterceptor(forWebSocket));
    
        Interceptor.Chain chain = new RealInterceptorChain(
            interceptors, null, null, null, 0, originalRequest);
        return chain.proceed(originalRequest);
      }
    
    这里写图片描述

    可以看到,在该方法中,我们依次添加了用户自定义的interceptor、retryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor、 networkInterceptors、CallServerInterceptor,并将这些拦截器传递给了这个RealInterceptorChain。

    1)在配置 OkHttpClient 时设置的 interceptors;
    2)负责失败重试以及重定向的 RetryAndFollowUpInterceptor;
    3)负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的响应转换为用户友好的响应的 BridgeInterceptor;
    4)负责读取缓存直接返回、更新缓存的 CacheInterceptor;
    5)负责和服务器建立连接的 ConnectInterceptor;
    6)配置 OkHttpClient 时设置的 networkInterceptors;
    7)负责向服务器发送请求数据、从服务器读取响应数据的 CallServerInterceptor。

    OkHttp的这种拦截器链采用的是责任链模式,这样的好处是将请求的发送和处理分开,并且可以动态添加中间的处理方实现对请求的处理、短路等操作。

    从上述源码得知,不管okhttp有多少拦截器最后都会走,如下方法:

    Interceptor.Chain chain = new RealInterceptorChain(
            interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
    

    从方法名字基本可以猜到是干嘛的,调用 chain.proceed(originalRequest); 将request传递进来,从拦截器链里拿到返回结果。那么拦截器Interceptor是干嘛的,Chain是干嘛的呢?继续往下看RealInterceptorChain

    RealInterceptorChain类

    下面是RealInterceptorChain的定义,该类实现了Chain接口,在getResponseWithInterceptorChain调用时好几个参数都传的null。

    public final class RealInterceptorChain implements Interceptor.Chain {
    
       public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
            HttpCodec httpCodec, RealConnection connection, int index, Request request) {
            this.interceptors = interceptors;
            this.connection = connection;
            this.streamAllocation = streamAllocation;
            this.httpCodec = httpCodec;
            this.index = index;
            this.request = request;
      }
      ......
    
     @Override 
     public Response proceed(Request request) throws IOException {
        return proceed(request, streamAllocation, httpCodec, connection);
      }
    
      public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
          RealConnection connection) throws IOException {
        if (index >= interceptors.size()) throw new AssertionError();
    
        calls++;
    
        ......
    
        // Call the next interceptor in the chain.
        RealInterceptorChain next = new RealInterceptorChain(
            interceptors, streamAllocation, httpCodec, connection, index + 1, request);
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
    
       ......
    
        return response;
      }
    
      protected abstract void execute();
    }
    

    主要看proceed方法,proceed方法中判断index(此时为0)是否大于或者等于client.interceptors(List )的大小。由于httpStream为null,所以首先创建next拦截器链,主需要把索引置为index+1即可;然后获取第一个拦截器,调用其intercept方法。

    Interceptor 代码如下:

    public interface Interceptor {
      Response intercept(Chain chain) throws IOException;
    
      interface Chain {
        Request request();
    
        Response proceed(Request request) throws IOException;
    
        Connection connection();
      }
    }
    

    RetryAndFollowUpInterceptor

    负责失败重试以及重定向

    @Override 
    public Response intercept(Chain chain) throws IOException {
            Request request = chain.request();
            streamAllocation = new StreamAllocation(
                    client.connectionPool(), createAddress(request.url()));
            int followUpCount = 0;
            Response priorResponse = null;
            while (true) {
                if (canceled) {
                    streamAllocation.release();
                    throw new IOException("Canceled");
                }
    
                Response response = null;
                boolean releaseConnection = true;
                try {
                    response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);    //(1)
                    releaseConnection = false;
                } catch (RouteException e) {
                    // The attempt to connect via a route failed. The request will not have been sent.
                    //通过路线连接失败,请求将不会再发送
                    if (!recover(e.getLastConnectException(), true, request)) throw e.getLastConnectException();
                    releaseConnection = false;
                    continue;
                } catch (IOException e) {
                    // An attempt to communicate with a server failed. The request may have been sent.
                    // 与服务器尝试通信失败,请求不会再发送。
                    if (!recover(e, false, request)) throw e;
                    releaseConnection = false;
                    continue;
                } finally {
                    // We're throwing an unchecked exception. Release any resources.
                    //抛出未检查的异常,释放资源
                    if (releaseConnection) {
                        streamAllocation.streamFailed(null);
                        streamAllocation.release();
                    }
                }
    
                // Attach the prior response if it exists. Such responses never have a body.
                // 附加上先前存在的response。这样的response从来没有body
                // TODO: 2016/8/23 这里没赋值,岂不是一直为空?
                if (priorResponse != null) { //  (2)
                    response = response.newBuilder()
                            .priorResponse(priorResponse.newBuilder()
                                    .body(null)
                                    .build())
                            .build();
                }
    
                Request followUp = followUpRequest(response); //判断状态码 (3)
                if (followUp == null){
                    if (!forWebSocket) {
                        streamAllocation.release();
                    }
                    return response;
                }
    
                closeQuietly(response.body());
    
                if (++followUpCount > MAX_FOLLOW_UPS) {
                    streamAllocation.release();
                    throw new ProtocolException("Too many follow-up requests: " + followUpCount);
                }
    
                if (followUp.body() instanceof UnrepeatableRequestBody) {
                    throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
                }
    
                if (!sameConnection(response, followUp.url())) {
                    streamAllocation.release();
                    streamAllocation = new StreamAllocation(
                            client.connectionPool(), createAddress(followUp.url()));
                } else if (streamAllocation.codec() != null) {
                    throw new IllegalStateException("Closing the body of " + response
                            + " didn't close its backing stream. Bad interceptor?");
                }
    
                request = followUp;
                priorResponse = response;
            }
        }
    

    这里是最关键的代码,可以看出在response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);中直接调用了下一个拦截器,然后捕获可能的异常来进行操作

    这里对于返回的response的状态码进行判断,然后进行处理

    BridgeInterceptor

    BridgeInterceptor从用户的请求构建网络请求,然后提交给网络,最后从网络响应中提取出用户响应。从最上面的图可以看出,BridgeInterceptor实现了适配的功能。下面是其intercept方法:

    public final class BridgeInterceptor implements Interceptor {
      ......
    
    @Override 
    public Response intercept(Chain chain) throws IOException {
      Request userRequest = chain.request();
      Request.Builder requestBuilder = userRequest.newBuilder();
    
     RequestBody body = userRequest.body();
     //如果存在请求主体部分,那么需要添加Content-Type、Content-Length首部
     if (body != null) {
          MediaType contentType = body.contentType();
          if (contentType != null) {
            requestBuilder.header("Content-Type", contentType.toString());
          }
    
          long contentLength = body.contentLength();
          if (contentLength != -1) {
            requestBuilder.header("Content-Length", Long.toString(contentLength));
            requestBuilder.removeHeader("Transfer-Encoding");
          } else {
            requestBuilder.header("Transfer-Encoding", "chunked");
            requestBuilder.removeHeader("Content-Length");
          }
        }
    
        if (userRequest.header("Host") == null) {
          requestBuilder.header("Host", hostHeader(userRequest.url(), false));
        }
    
        if (userRequest.header("Connection") == null) {
          requestBuilder.header("Connection", "Keep-Alive");
        }
    
        // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
        // the transfer stream.
        boolean transparentGzip = false;
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
          transparentGzip = true;
          requestBuilder.header("Accept-Encoding", "gzip");
        }
    
        List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
        if (!cookies.isEmpty()) {
          requestBuilder.header("Cookie", cookieHeader(cookies));
        }
    
      if (userRequest.header("User-Agent") == null) {
          requestBuilder.header("User-Agent", Version.userAgent());
      }
    
    Response networkResponse = chain.proceed(requestBuilder.build());
    
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    
    Response.Builder responseBuilder = networkResponse.newBuilder()
            .request(userRequest);
    
        if (transparentGzip
            && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
          GzipSource responseBody = new GzipSource(networkResponse.body().source());
          Headers strippedHeaders = networkResponse.headers().newBuilder()
              .removeAll("Content-Encoding")
              .removeAll("Content-Length")
              .build();
          responseBuilder.headers(strippedHeaders);
          responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
        }
    
        return responseBuilder.build();
      }
    
      /** Returns a 'Cookie' HTTP request header with all cookies, like {@code a=b; c=d}. */
      private String cookieHeader(List<Cookie> cookies) {
        StringBuilder cookieHeader = new StringBuilder();
        for (int i = 0, size = cookies.size(); i < size; i++) {
          if (i > 0) {
            cookieHeader.append("; ");
          }
          Cookie cookie = cookies.get(i);
          cookieHeader.append(cookie.name()).append('=').append(cookie.value());
        }
        return cookieHeader.toString();
      }
    }
    

    上面的代码可以看出,首先获取原请求,然后在请求中添加头,比如Host、Connection、Accept-Encoding参数等,然后根据看是否需要填充Cookie,在对原始请求做出处理后,使用chain的procced方法得到响应,接下来对响应做处理得到用户响应,最后返回响应。

    CacheInterceptor

    @Override
    public Response intercept(Chain chain) throws IOException {
    Response cacheCandidate = cache != null
    ? cache.get(chain.request()) //通过request得到缓存
    : null;

    long now = System.currentTimeMillis();
    
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get(); //根据request来得到缓存策略
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;
    
    if (cache != null) {
      cache.trackResponse(strategy);
    }
    
    if (cacheCandidate != null && cacheResponse == null) { //存在缓存的response,但是不允许缓存
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it. 缓存不适合,关闭
    }
    
    // If we're forbidden from using the network and the cache is insufficient, fail.
      //如果我们禁止使用网络,且缓存为null,失败
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_BODY)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }
    
    // If we don't need the network, we're done.
    if (networkRequest == null) {  //没有网络请求,跳过网络,返回缓存
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }
    
    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);//网络请求拦截器    //
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
        //如果我们因为I/O或其他原因崩溃,不要泄漏缓存体
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }
    
    // If we have a cache response too, then we're doing a conditional get.
      //如果我们有一个缓存的response,然后我们正在做一个条件GET
    if (cacheResponse != null) {
      if (validate(cacheResponse, networkResponse)) { //比较确定缓存response可用
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();
    
        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
          //更新缓存,在剥离content-Encoding之前
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }
    
    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();
    
    if (HttpHeaders.hasBody(response)) {   
      CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
      response = cacheWritingResponse(cacheRequest, response);
    }
    
    return response;
    

    }

    • 首先,根据request来判断cache中是否有缓存的response,如果有,得到这个response,然后进行判断当前response是否有效,没有将cacheCandate赋值为空。
    • 根据request判断缓存的策略,是否要使用了网络,缓存 或两者都使用
    • 调用下一个拦截器,决定从网络上来得到response
    • 如果本地已经存在cacheResponse,那么让它和网络得到的networkResponse做比较,决定是否来更新缓存的cacheResponse
    • 缓存未经缓存过的response

    ConnectInterceptor

    public final class ConnectInterceptor implements Interceptor {
      ......
    
     @Override 
     public Response intercept(Chain chain) throws IOException {
     RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    
     // We need the network to satisfy this request. Possibly for validating a conditional GET.
     boolean doExtensiveHealthChecks = !request.method().equals("GET");
     HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
     RealConnection connection = streamAllocation.connection();
    
     return realChain.proceed(request, streamAllocation, httpCodec, connection);
      }
    }
    

    实际上建立连接就是创建了一个HttpCodec对象,它将在后面的步骤中被使用,那它又是何方神圣呢?它是对 HTTP 协议操作的抽象,有两个实现:Http1Codec和Http2Codec,顾名思义,它们分别对应 HTTP/1.1 和 HTTP/2 版本的实现。

    在Http1Codec中,它利用 Okio 对Socket的读写操作进行封装,Okio 以后有机会再进行分析,现在让我们对它们保持一个简单地认识:它对java.io和java.nio进行了封装,让我们更便捷高效的进行 IO 操作。

    而创建HttpCodec对象的过程涉及到StreamAllocation、RealConnection,代码较长,这里就不展开,这个过程概括来说,就是找到一个可用的RealConnection,再利用RealConnection的输入输出(BufferedSource和BufferedSink)创建HttpCodec对象,供后续步骤使用。

    NetworkInterceptors

    配置OkHttpClient时设置的 NetworkInterceptors。

    CallServerInterceptor

    CallServerInterceptor是拦截器链中最后一个拦截器,负责将网络请求提交给服务器。它的intercept方法实现如下:

    @Override 
    public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        HttpCodec httpCodec = realChain.httpStream();
        StreamAllocation streamAllocation = realChain.streamAllocation();
        RealConnection connection = (RealConnection) realChain.connection();
        Request request = realChain.request();
    
        long sentRequestMillis = System.currentTimeMillis();
        httpCodec.writeRequestHeaders(request);
    
        Response.Builder responseBuilder = null;
        if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
          // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
          // Continue" response before transmitting the request body. If we don't get that, return what
          // we did get (such as a 4xx response) without ever transmitting the request body.
          if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            httpCodec.flushRequest();
            responseBuilder = httpCodec.readResponseHeaders(true);
          }
    
          if (responseBuilder == null) {
            // Write the request body if the "Expect: 100-continue" expectation was met.
            Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
            request.body().writeTo(bufferedRequestBody);
            bufferedRequestBody.close();
          } else if (!connection.isMultiplexed()) {
            // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
            // being reused. Otherwise we're still obligated to transmit the request body to leave the
            // connection in a consistent state.
            streamAllocation.noNewStreams();
          }
        }
    
        httpCodec.finishRequest();
    
        if (responseBuilder == null) {
          responseBuilder = httpCodec.readResponseHeaders(false);
        }
    
        Response response = responseBuilder
            .request(request)
            .handshake(streamAllocation.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
    
        int code = response.code();
        if (forWebSocket && code == 101) {
          // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
          response = response.newBuilder()
              .body(Util.EMPTY_RESPONSE)
              .build();
        } else {
          response = response.newBuilder()
              .body(httpCodec.openResponseBody(response))
              .build();
        }
    
        if ("close".equalsIgnoreCase(response.request().header("Connection"))
            || "close".equalsIgnoreCase(response.header("Connection"))) {
          streamAllocation.noNewStreams();
        }
    
        if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
          throw new ProtocolException(
              "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
        }
    
        return response;
      }
    

    从上面的代码中可以看出,首先获取HttpStream对象,然后调用writeRequestHeaders方法写入请求的头部,然后判断是否需要写入请求的body部分,最后调用finishRequest()方法将所有数据刷新给底层的Socket,接下来尝试调用readResponseHeaders()方法读取响应的头部,然后再调用openResponseBody()方法得到响应的body部分,最后返回响应。


    总结

    OkHttp的底层是通过Java的Socket发送HTTP请求与接受响应的(这也好理解,HTTP就是基于TCP协议的),但是OkHttp实现了连接池的概念,即对于同一主机的多个请求,其实可以公用一个Socket连接,而不是每次发送完HTTP请求就关闭底层的Socket,这样就实现了连接池的概念。而OkHttp对Socket的读写操作使用的OkIo库进行了一层封装。

    总体流程图:


    这里写图片描述

    总体架构图:


    这里写图片描述

    相关文章

      网友评论

        本文标题:okhttp3 源码详细解析

        本文链接:https://www.haomeiwen.com/subject/ibuouqtx.html