美文网首页面试技巧网络专题Android架构设计
OkHttp源码分析(三)内置拦截器解析

OkHttp源码分析(三)内置拦截器解析

作者: J__Beyond | 来源:发表于2017-05-07 19:45 被阅读818次

    在上篇 OkHttp源码分析(二)整体流程 中分析了OkHttp请求的整体流程,这接下来的这篇文章中将详细分析OkHttp5个内置的拦截器

    思维导图

    OkHttp内置拦截器.png

    RetryAndFollowUpInterceptor

    主要做了三件事

    • 创建了StreamAllocation,用于Socket管理
    • 处理重定向
    • 失败重连

    先看源码

    @Override 
    public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();
        //初始化一个socket连接对象
        streamAllocation = new StreamAllocation(
            client.connectionPool(), createAddress(request.url()), callStackTrace);
    
        int followUpCount = 0;
        Response priorResponse = null;
        while (true) {
          if (canceled) {
            streamAllocation.release();
            throw new IOException("Canceled");
          }
    
          Response response = null;
          boolean releaseConnection = true;
          try {
            response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
            releaseConnection = false;
          } catch (RouteException e) {
            // The attempt to connect via a route failed. The request will not have been sent.
            if (!recover(e.getLastConnectException(), false, request)) {
              throw e.getLastConnectException();
            }
            releaseConnection = false;
            continue;
          } catch (IOException e) {
            // An attempt to communicate with a server failed. The request may have been sent.
            boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
            if (!recover(e, requestSendStarted, request)) throw e;
            releaseConnection = false;
            continue;
          } finally {
            // We're throwing an unchecked exception. Release any resources.
            if (releaseConnection) {
              streamAllocation.streamFailed(null);
              streamAllocation.release();
            }
          }
    
          // Attach the prior response if it exists. Such responses never have a body.
          if (priorResponse != null) {
            response = response.newBuilder()
                .priorResponse(priorResponse.newBuilder()
                        .body(null)
                        .build())
                .build();
          }
    
          Request followUp = followUpRequest(response);
    
          if (followUp == null) {
            if (!forWebSocket) {
              streamAllocation.release();
            }
            return response;
          }
    
          closeQuietly(response.body());
    
          if (++followUpCount > MAX_FOLLOW_UPS) {
            streamAllocation.release();
            throw new ProtocolException("Too many follow-up requests: " + followUpCount);
          }
    
          if (followUp.body() instanceof UnrepeatableRequestBody) {
            streamAllocation.release();
            throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
          }
    
          if (!sameConnection(response, followUp.url())) {
            streamAllocation.release();
            streamAllocation = new StreamAllocation(
                client.connectionPool(), createAddress(followUp.url()), callStackTrace);
          } else if (streamAllocation.codec() != null) {
            throw new IllegalStateException("Closing the body of " + response
                + " didn't close its backing stream. Bad interceptor?");
          }
    
          request = followUp;
          priorResponse = response;
        }
    }
    

    1) 初始化StreamAllocation

    StreamAllocation对象用于分配一个到特定的服务器地址的流,有两个实现:Http1Codec 和 Http2Codec,分别对应 HTTP/1.1 和 HTTP/2 版本的实现。这个流可能是从ConnectionPool中取得的之前没有释放的连接,也可能是重新分配的。这涉及到连接池复用及TCP建立连接、释放连接的过程。

    streamAllocation = new StreamAllocation(
            client.connectionPool(), createAddress(request.url()), callStackTrace);
    
    • 首先从OkHttpClient中获取ConnectionPool对象(OkHttpClient构建时创建)
    • 用请求的URL创建Address对象 (Address描述某一个特定的服务器地址)

    创建好StreamAllocation后并未使用,而是交给后面CallServerInterceptor使用。
    随后将Request交由下一个Interceptor处理并获取响应

    2)出错重试机制

    重试和重定向伪代码如下

    while (true) {
         try {
            response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
          } catch (Exception e) {
           //判断重试
            continue;
          }
        //判断重定向
        request = followUp; //Request重新赋值
    }
    

    在获取响应过程中如果发生异常将Catch住,根据不同的异常类型执行不同的重试机制,重试机制主要在recover中完成

    private boolean recover(IOException e, boolean requestSendStarted, Request userRequest) {
        streamAllocation.streamFailed(e);
    
        // The application layer has forbidden retries.
        if (!client.retryOnConnectionFailure()) return false;
    
        // We can't send the request body again.
        if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
    
        // This exception is fatal.
        if (!isRecoverable(e, requestSendStarted)) return false;
    
        // No more routes to attempt.
        if (!streamAllocation.hasMoreRoutes()) return false;
    
        // For failure recovery, use the same route selector with a new connection.
        return true;
      }
    
    • 如果用户在OkHttpClient中设置了retryOnConnectionFailure = false,表示失败请求失败时不重试(默认为true) ,那用户不让重试也没办法了;
    • 请求Request不能重复发送,也不能重试;
    • 四种情况不能恢复:
      • 协议错误(ProtocolException)
      • 中断异常(InterruptedIOException)
      • SSL握手错误(SSLHandshakeException && CertificateException)
      • certificate pinning错误(SSLPeerUnverifiedException)
    • 没有更多线路可供选择

    如判断可恢复,将跳出该循环,重新执行

    response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
    

    从而完成失败重试。

    3)处理重定向

    RetryAndFollowUpInterceptor通过followUpRequest()从响应的信息中提取出重定向的信息,并构造新的Request

    /**
       * Figures out the HTTP request to make in response to receiving {@code userResponse}. This will
       * either add authentication headers, follow redirects or handle a client request timeout. If a
       * follow-up is either unnecessary or not applicable, this returns null.
       */
      private Request followUpRequest(Response userResponse) throws IOException {
        if (userResponse == null) throw new IllegalStateException();
        Connection connection = streamAllocation.connection();
        Route route = connection != null
            ? connection.route()
            : null;
        int responseCode = userResponse.code();
    
        final String method = userResponse.request().method();
        switch (responseCode) {
          case HTTP_PROXY_AUTH:
            Proxy selectedProxy = route != null
                ? route.proxy()
                : client.proxy();
            if (selectedProxy.type() != Proxy.Type.HTTP) {
              throw new ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy");
            }
            return client.proxyAuthenticator().authenticate(route, userResponse);
    
          case HTTP_UNAUTHORIZED:
            return client.authenticator().authenticate(route, userResponse);
    
          case HTTP_PERM_REDIRECT:
          case HTTP_TEMP_REDIRECT:
            // "If the 307 or 308 status code is received in response to a request other than GET
            // or HEAD, the user agent MUST NOT automatically redirect the request"
            if (!method.equals("GET") && !method.equals("HEAD")) {
              return null;
            }
            // fall-through
          case HTTP_MULT_CHOICE:
          case HTTP_MOVED_PERM:
          case HTTP_MOVED_TEMP:
          case HTTP_SEE_OTHER:
            // Does the client allow redirects?
            if (!client.followRedirects()) return null;
    
            String location = userResponse.header("Location");
            if (location == null) return null;
            HttpUrl url = userResponse.request().url().resolve(location);
    
            // Don't follow redirects to unsupported protocols.
            if (url == null) return null;
    
            // If configured, don't follow redirects between SSL and non-SSL.
            boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme());
            if (!sameScheme && !client.followSslRedirects()) return null;
    
            // Most redirects don't include a request body.
            Request.Builder requestBuilder = userResponse.request().newBuilder();
            if (HttpMethod.permitsRequestBody(method)) {
              final boolean maintainBody = HttpMethod.redirectsWithBody(method);
              if (HttpMethod.redirectsToGet(method)) {
                requestBuilder.method("GET", null);
              } else {
                RequestBody requestBody = maintainBody ? userResponse.request().body() : null;
                requestBuilder.method(method, requestBody);
              }
              if (!maintainBody) {
                requestBuilder.removeHeader("Transfer-Encoding");
                requestBuilder.removeHeader("Content-Length");
                requestBuilder.removeHeader("Content-Type");
              }
            }
    
            // When redirecting across hosts, drop all authentication headers. This
            // is potentially annoying to the application layer since they have no
            // way to retain them.
            if (!sameConnection(userResponse, url)) {
              requestBuilder.removeHeader("Authorization");
            }
    
            return requestBuilder.url(url).build();
    
          case HTTP_CLIENT_TIMEOUT:
            // 408's are rare in practice, but some servers like HAProxy use this response code. The
            // spec says that we may repeat the request without modifications. Modern browsers also
            // repeat the request (even non-idempotent ones.)
            if (userResponse.request().body() instanceof UnrepeatableRequestBody) {
              return null;
            }
    
            return userResponse.request();
    
          default:
            return null;
        }
    }
    
    • 根据followUpRequest的返回值,如果不是重定向,就返回response
    • 重定向的最大次数为20,超过20抛出异常
    if (++followUpCount > MAX_FOLLOW_UPS) {//有最大次数限制20次
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }
    
       request = followUp;//把重定向的请求赋值给request,以便再次进入循环执行
       priorResponse = response;
    

    BridgeInterceptor

    BridgeInterceptor紧随RetryAndFollowUpInterceptor,主要的职责如下:

    • 在请求阶段补全HTTP Header;
    • 响应阶段保存Cookie
    • 响应阶段处理Gzip解压缩;
    @Override
    public Response intercept(Chain chain) throws IOException {
        Request userRequest = chain.request();
        Request.Builder requestBuilder = userRequest.newBuilder();
    
        RequestBody body = userRequest.body();
        if (body != null) {
          MediaType contentType = body.contentType();
          if (contentType != null) {
            requestBuilder.header("Content-Type", contentType.toString());
          }
    
          long contentLength = body.contentLength();
          if (contentLength != -1) {
            requestBuilder.header("Content-Length", Long.toString(contentLength));
            requestBuilder.removeHeader("Transfer-Encoding");
          } else {
            requestBuilder.header("Transfer-Encoding", "chunked");
            requestBuilder.removeHeader("Content-Length");
          }
        }
    
        if (userRequest.header("Host") == null) {
          requestBuilder.header("Host", hostHeader(userRequest.url(), false));
        }
    
        if (userRequest.header("Connection") == null) {
          requestBuilder.header("Connection", "Keep-Alive");
        }
    
        // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
        // the transfer stream.
        boolean transparentGzip = false;
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
          transparentGzip = true;
          requestBuilder.header("Accept-Encoding", "gzip");
        }
    
        List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
        if (!cookies.isEmpty()) {
          requestBuilder.header("Cookie", cookieHeader(cookies));
        }
    
        if (userRequest.header("User-Agent") == null) {
          requestBuilder.header("User-Agent", Version.userAgent());
        }
    
        Response networkResponse = chain.proceed(requestBuilder.build());
    
        HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    
        Response.Builder responseBuilder = networkResponse.newBuilder()
            .request(userRequest);
    
        if (transparentGzip
            && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
          GzipSource responseBody = new GzipSource(networkResponse.body().source());
          Headers strippedHeaders = networkResponse.headers().newBuilder()
              .removeAll("Content-Encoding")
              .removeAll("Content-Length")
              .build();
          responseBuilder.headers(strippedHeaders);
          responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
        }
    
        return responseBuilder.build();
    }
    
    

    1) 补全HTTP Header

    包括Content-Type、Content-Length、Transfer-Encoding、Host、Connection、Accept-Encoding、User-Agent、Cookie等
    其中Cookie的加载由CookieJar提供,CookieJar可用OkHttpClient在初始化设置

    OkHttpClient okHttpClient = new OkHttpClient.Builder()
        .cookieJar(new CookieJar() {
                    @Override
                    public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
                        // 可将cookie保存到SharedPreferences中
                    }
    
                    @Override
                    public List<Cookie> loadForRequest(HttpUrl url) {
                      // 从保存位置读取,注意此处不能为空,否则会导致空指针
                        return new ArrayList<>();
                    }
                })
        .build();
    

    2)保存Cookie

    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    
    public static void receiveHeaders(CookieJar cookieJar, HttpUrl url, Headers headers) {
        if (cookieJar == CookieJar.NO_COOKIES) return;
    
        List<Cookie> cookies = Cookie.parseAll(url, headers);
        if (cookies.isEmpty()) return;
        //保存到SP中
        cookieJar.saveFromResponse(url, cookies);
      }
    

    3)处理Gzip

    if (transparentGzip
            && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
          GzipSource responseBody = new GzipSource(networkResponse.body().source());
          Headers strippedHeaders = networkResponse.headers().newBuilder()
              .removeAll("Content-Encoding")
              .removeAll("Content-Length")
              .build();
          responseBuilder.headers(strippedHeaders);
          responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
    }
    

    gzip由okio完成,随后将Content-Encoding、Content-Length从Header中移除

    CacheInterceptor

    Okhttp的网络缓存是基于http协议 可参考 HTTP 协议缓存机制详解

    015353_P04w_568818.png

    使用OkHttp缓存的前提是需要在构建OkHttpClient时指定一个Cache

    OkHttpClient httpClient = new OkHttpClient.Builder()
                  .cache(new Cache(this.getCacheDir(), 10240 * 1024))
                  .build();
    

    拦截器整体代码如下:

    @Override 
    public Response intercept(Chain chain) throws IOException {
       //读取缓存
        Response cacheCandidate = cache != null
            ? cache.get(chain.request())
            : null;
    
        long now = System.currentTimeMillis();
    
        //CacheStrategy类似一个mapping操作,将request和cacheCandidate输入,得到两个输出networkRequest和cacheResponse
        CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
        Request networkRequest = strategy.networkRequest;
        Response cacheResponse = strategy.cacheResponse;
    
        //缓存存在,进行缓存监控(命中次数)
        if (cache != null) {
          cache.trackResponse(strategy);
        }
        //缓存存在,经过CacheStrategy输出的缓存无效,关闭原始缓存
        if (cacheCandidate != null && cacheResponse == null) {
          closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
        }
    
        //only-if-cached(表明不进行网络请求,且缓存不存在或者过期,一定会返回504错误)
        // If we're forbidden from using the network and the cache is insufficient, fail.
        if (networkRequest == null && cacheResponse == null) {
          return new Response.Builder()
              .request(chain.request())
              .protocol(Protocol.HTTP_1_1)
              .code(504)
              .message("Unsatisfiable Request (only-if-cached)")
              .body(Util.EMPTY_RESPONSE)
              .sentRequestAtMillis(-1L)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();
        }
    
        // If we don't need the network, we're done.
        //不需要网络请求,返回缓存
        if (networkRequest == null) {
          return cacheResponse.newBuilder()
              .cacheResponse(stripBody(cacheResponse))
              .build();
        }
    
        Response networkResponse = null;
        try {
          //执行网络请求
          networkResponse = chain.proceed(networkRequest);
        } finally {
          // If we're crashing on I/O or otherwise, don't leak the cache body.
          if (networkResponse == null && cacheCandidate != null) {
            closeQuietly(cacheCandidate.body());
          }
        }
    
        // If we have a cache response too, then we're doing a conditional get.
        if (cacheResponse != null) {
          //本地缓存有效,服务器资源未修改,需要更新Header
          if (networkResponse.code() == HTTP_NOT_MODIFIED) {
            Response response = cacheResponse.newBuilder()
                .headers(combine(cacheResponse.headers(), networkResponse.headers()))
                .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
                .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
                .cacheResponse(stripBody(cacheResponse))
                .networkResponse(stripBody(networkResponse))
                .build();
            networkResponse.body().close();
    
            // Update the cache after combining headers but before stripping the
            // Content-Encoding header (as performed by initContentStream()).
            cache.trackConditionalCacheHit();
            cache.update(cacheResponse, response);
            return response;
          } else {
            closeQuietly(cacheResponse.body());
          }
        }
    
        //使用网络响应
        Response response = networkResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
    
        //更新缓存
        if (cache != null) {
          if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
            // Offer this request to the cache.
            CacheRequest cacheRequest = cache.put(response);
            return cacheWritingResponse(cacheRequest, response);
          }
    
          if (HttpMethod.invalidatesCache(networkRequest.method())) {
            try {
              cache.remove(networkRequest);
            } catch (IOException ignored) {
              // The cache cannot be written.
            }
          }
        }
        return response;
    }
    

    1)读取缓存

    Response get(Request request) {
        String key = key(request.url());
        DiskLruCache.Snapshot snapshot;
        Entry entry;
        try {
          snapshot = cache.get(key);
          if (snapshot == null) {
            return null;
          }
        } catch (IOException e) {
          // Give up because the cache cannot be read.
          return null;
        }
    
        try {
          entry = new Entry(snapshot.getSource(ENTRY_METADATA));
        } catch (IOException e) {
          Util.closeQuietly(snapshot);
          return null;
        }
    
        Response response = entry.response(snapshot);
    
        if (!entry.matches(request, response)) {
          Util.closeQuietly(response.body());
          return null;
        }
    
        return response;
      }
    
    • url作为输入,md5\hex加密后得到key;
    • 根据key得到Snapshot,关联起文件系统中的缓存文件;
    • 根据snapshot生成Entry,根据Entry生成Response返回

    2)缓存策略配置

    缓存策略通过CacheStrategy来实现,CacheStrategy构建分为两步

    ① Factory解析Header参数

    public Factory(long nowMillis, Request request, Response cacheResponse) {
          this.nowMillis = nowMillis;
          this.request = request;
          this.cacheResponse = cacheResponse;
    
          if (cacheResponse != null) {
            this.sentRequestMillis = cacheResponse.sentRequestAtMillis();
            this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis();
            Headers headers = cacheResponse.headers();
            for (int i = 0, size = headers.size(); i < size; i++) {
              String fieldName = headers.name(i);
              String value = headers.value(i);
              if ("Date".equalsIgnoreCase(fieldName)) {
                servedDate = HttpDate.parse(value);
                servedDateString = value;
              } else if ("Expires".equalsIgnoreCase(fieldName)) {
                expires = HttpDate.parse(value);
              } else if ("Last-Modified".equalsIgnoreCase(fieldName)) {
                lastModified = HttpDate.parse(value);
                lastModifiedString = value;
              } else if ("ETag".equalsIgnoreCase(fieldName)) {
                etag = value;
              } else if ("Age".equalsIgnoreCase(fieldName)) {
                ageSeconds = HttpHeaders.parseSeconds(value, -1);
              }
            }
          }
     }
    
    • Factory中主要解析缓存中与响应有关的头,Date、Expires、Last-Modified、ETag、Age等;
    • 注意Headers并不是一个Map,而是一个数组,奇数位存key,偶数位存value;

    ② get返回CacheStrategy实例

    public CacheStrategy get() {
          CacheStrategy candidate = getCandidate();
          if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
            // We're forbidden from using the network and the cache is insufficient.
            return new CacheStrategy(null, null);
          }
    
          return candidate;
    }
    
    

    get内部主要是getCandidate实现

    private CacheStrategy getCandidate() {
      //如果缓存没有命中(即null),网络请求也不需要加缓存Header了
      if (cacheResponse == null) {
        //`没有缓存的网络请求,查上文的表可知是直接访问
        return new CacheStrategy(request, null);
      }
    
      // 如果缓存的TLS握手信息丢失,返回进行直接连接
      if (request.isHttps() && cacheResponse.handshake() == null) {
        //直接访问
        return new CacheStrategy(request, null);
      }
    
      //检测response的状态码,Expired时间,是否有no-cache标签
      if (!isCacheable(cacheResponse, request)) {
        //直接访问
        return new CacheStrategy(request, null);
      }
    
      CacheControl requestCaching = request.cacheControl();
      //如果请求报文使用了`no-cache`标签(这个只可能是开发者故意添加的)
      //或者有ETag/Since标签(也就是条件GET请求)
      if (requestCaching.noCache() || hasConditions(request)) {
        //直接连接,把缓存判断交给服务器
        return new CacheStrategy(request, null);
      }
      //根据RFC协议计算
      //计算当前age的时间戳
      //now - sent + age (s)
      long ageMillis = cacheResponseAge();
      //大部分情况服务器设置为max-age
      long freshMillis = computeFreshnessLifetime();
    
      if (requestCaching.maxAgeSeconds() != -1) {
        //大部分情况下是取max-age
        freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds()));
      }
    
      long minFreshMillis = 0;
      if (requestCaching.minFreshSeconds() != -1) {
        //大部分情况下设置是0
        minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds());
      }
    
      long maxStaleMillis = 0;
      //ParseHeader中的缓存控制信息
      CacheControl responseCaching = cacheResponse.cacheControl();
      if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) {
        //设置最大过期时间,一般设置为0
        maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds());
      }
    
      //缓存在过期时间内,可以使用
      //大部分情况下是进行如下判断
      //now - sent + age + 0 < max-age + 0
      if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
        //返回上次的缓存
        Response.Builder builder = cacheResponse.newBuilder();
        return new CacheStrategy(null, builder.build());
      }
    
      //缓存失效, 如果有etag等信息
      //进行发送`conditional`请求,交给服务器处理
      Request.Builder conditionalRequestBuilder = request.newBuilder();
    
      if (etag != null) {
        conditionalRequestBuilder.header("If-None-Match", etag);
      } else if (lastModified != null) {
        conditionalRequestBuilder.header("If-Modified-Since", lastModifiedString);
      } else if (servedDate != null) {
        conditionalRequestBuilder.header("If-Modified-Since", servedDateString);
      }
      //下面请求实质还说网络请求
      Request conditionalRequest = conditionalRequestBuilder.build();
      return hasConditions(conditionalRequest) ? new CacheStrategy(conditionalRequest,
          cacheResponse) : new CacheStrategy(conditionalRequest, null);
    }
    

    3)缓存监控

    if (cache != null) {
        cache.trackResponse(strategy);
    }
    
    synchronized void trackResponse(CacheStrategy cacheStrategy) {
        requestCount++;
    
        if (cacheStrategy.networkRequest != null) {
          // If this is a conditional request, we'll increment hitCount if/when it hits.
          networkCount++;
        } else if (cacheStrategy.cacheResponse != null) {
          // This response uses the cache and not the network. That's a cache hit.
          hitCount++;
        }
    }
    

    可见,缓存监控主要是监控请求次数,细分为网络请求次数和缓存命中次数。

    ConnectInterceptor

    ConnectInterceptor用来与服务器建立连接

    @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        StreamAllocation streamAllocation = realChain.streamAllocation();
    
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
        RealConnection connection = streamAllocation.connection();
    
        return realChain.proceed(request, streamAllocation, httpCodec, connection);
    }
    

    ConnectInterceptor代码很简洁,逻辑处理交由其他类去实现了。
    主要做了以下几件事情:

    1. 获取到StreamAllocation对象;
    • 通过StreamAllocation对象创建RealConnection;
    • 通过StreamAllocation对象创建HttpCodec;

    RealInterceptorChain中的四个重要属性将在ConnectInterceptor中全部创建完毕

    • Request
    • StreamAllocation
    • HttpCodec
    • Connection

    其中,Request一开始就有,StreamAllocation在RetryAndFollowUpInterceptor创建,因此ConnectInterceptor中主要分析Connection和HttpCodec的创建过程

    Connection和HttpCodec创建过程

    HttpCodec用来编解码HTTP请求和响应,通过streamAllocation.newStream方法可以创建一个HttpCodec和RealConnection

    //StreamAllocation
    public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
        int connectTimeout = client.connectTimeoutMillis();
        int readTimeout = client.readTimeoutMillis();
        int writeTimeout = client.writeTimeoutMillis();
        boolean connectionRetryEnabled = client.retryOnConnectionFailure();
    
        try {
          RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
              writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
          HttpCodec resultCodec = resultConnection.newCodec(client, this);
    
          synchronized (connectionPool) {
            codec = resultCodec;
            return resultCodec;
          }
        } catch (IOException e) {
          throw new RouteException(e);
        }
    }
    

    HttpCodec的创建分为两步:

    • 获取连接RealConnection(能复用就复用,不能复用就新建)
    • 根据RealConnection创建HttpCodec

    ① 获取RealConnection

     private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
          int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
          throws IOException {
        while (true) {
          RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
              connectionRetryEnabled);
    
          // If this is a brand new connection, we can skip the extensive health checks.
          synchronized (connectionPool) {
            if (candidate.successCount == 0) {
              return candidate;
            }
          }
    
          // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
          // isn't, take it out of the pool and start again.
          if (!candidate.isHealthy(doExtensiveHealthChecks)) {
            noNewStreams();
            continue;
          }
    
          return candidate;
        }
    }
    
    private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
        boolean connectionRetryEnabled) throws IOException {
      Route selectedRoute;
      synchronized (connectionPool) {
        if (released) throw new IllegalStateException("released");
        if (codec != null) throw new IllegalStateException("codec != null");
        if (canceled) throw new IOException("Canceled");
        // 使用已存在的连接
        // Attempt to use an already-allocated connection.
        RealConnection allocatedConnection = this.connection;
        if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
          return allocatedConnection;
        }
        // 从缓存中获取
        // Attempt to get a connection from the pool.
        Internal.instance.get(connectionPool, address, this);
        if (connection != null) {
          return connection;
        }
        selectedRoute = route;
      }
      // 线路的选择,多ip的支持
      // If we need a route, make one. This is a blocking operation.
      if (selectedRoute == null) {
        selectedRoute = routeSelector.next();
      }
      // Create a connection and assign it to this allocation immediately. This makes it possible for
      // an asynchronous cancel() to interrupt the handshake we're about to do.
      // 以上都不符合,创建一个连接(RealConnection)
      RealConnection result;
      synchronized (connectionPool) {
        route = selectedRoute;
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        acquire(result);
        if (canceled) throw new IOException("Canceled");
      }
      //Socket连接
      // Do TCP + TLS handshakes. This is a blocking operation.
      result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
      routeDatabase().connected(result.route());
      Socket socket = null;
      // 更新缓存
      synchronized (connectionPool) {
        // Pool the connection.
        Internal.instance.put(connectionPool, result);
        // If another multiplexed connection to the same address was created concurrently, then
        // release this connection and acquire that one.
        if (result.isMultiplexed()) {
          socket = Internal.instance.deduplicate(connectionPool, address, this);
          result = connection;
        }
      }
      closeQuietly(socket);
      return result;
    }
    
    读取缓存
     Internal.instance.get(connectionPool, address, this, null);
    

    Internal.instance在OkHttpClient静态代码块创建

    @Override 
    public RealConnection get(ConnectionPool pool, Address address,
              StreamAllocation streamAllocation, Route route) {
         return pool.get(address, streamAllocation, route);
    }
    
    //ConnectionPool
    RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
        assert (Thread.holdsLock(this));
        for (RealConnection connection : connections) {
          if (connection.isEligible(address, route)) {
            streamAllocation.acquire(connection);
            return connection;
          }
        }
        return null;
      }
    

    遍历所有的Connection,Address或Route匹配则返回

    建立Socket连接
    public void connect(
          int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
        if (protocol != null) throw new IllegalStateException("already connected");
    
        RouteException routeException = null;
        List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
        ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
    
        if (route.address().sslSocketFactory() == null) {
          if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
            throw new RouteException(new UnknownServiceException(
                "CLEARTEXT communication not enabled for client"));
          }
          String host = route.address().url().host();
          if (!Platform.get().isCleartextTrafficPermitted(host)) {
            throw new RouteException(new UnknownServiceException(
                "CLEARTEXT communication to " + host + " not permitted by network security policy"));
          }
        }
        //建立连接
        while (true) {
          try {
            if (route.requiresTunnel()) {
              connectTunnel(connectTimeout, readTimeout, writeTimeout);
            } else {
              //正常走这条逻辑
              connectSocket(connectTimeout, readTimeout);
            }
            establishProtocol(connectionSpecSelector);
            break;
          } catch (IOException e) {
            //异常处理省略
          }
        }
       //......
      }
    
    private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
        //获得代理
        Proxy proxy = route.proxy();
        Address address = route.address();
        //根据代理类型创建Socket
        rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
            ? address.socketFactory().createSocket()
            : new Socket(proxy);
        //设置超时时间
        rawSocket.setSoTimeout(readTimeout);
        try { 
          //建立Socket连接
          Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
        } catch (ConnectException e) {
          ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
          ce.initCause(e);
          throw ce;
        }
        //okio读取输入流和输出流
        source = Okio.buffer(Okio.source(rawSocket));
        sink = Okio.buffer(Okio.sink(rawSocket));
      }
    
    • Sink可看做OutputStream,Source可看做InputStream;
    更新缓存
    Internal.instance.put(connectionPool, result);
    
    //ConnectionPool
    void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        if (!cleanupRunning) {
          cleanupRunning = true;
          executor.execute(cleanupRunnable);
        }
        connections.add(connection);
      }
    

    cleanup的逻辑后面分析

    ② 根据RealConnection创建HttpCodec

    public HttpCodec newCodec(
          OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
        if (http2Connection != null) {
          return new Http2Codec(client, streamAllocation, http2Connection);
        } else {
          //正常走下面
          socket.setSoTimeout(client.readTimeoutMillis());
          source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
          sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
          // source和sink是Socket连接后返回
          return new Http1Codec(client, streamAllocation, source, sink);
        }
      }
    

    CallServerInterceptor

    @Override 
    public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        //取出在前面拦截器中创建的四个对象,他们保存在RealInterceptorChain中
        HttpCodec httpCodec = realChain.httpStream();
        StreamAllocation streamAllocation = realChain.streamAllocation();
        RealConnection connection = (RealConnection) realChain.connection();
        Request request = realChain.request();
    
        long sentRequestMillis = System.currentTimeMillis();
        //写入请求头
        httpCodec.writeRequestHeaders(request);
    
        Response.Builder responseBuilder = null;
        if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
          // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
          // Continue" response before transmitting the request body. If we don't get that, return what
          // we did get (such as a 4xx response) without ever transmitting the request body.
          if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            httpCodec.flushRequest();
            responseBuilder = httpCodec.readResponseHeaders(true);
          }
    
          if (responseBuilder == null) {
            //写入请求体
            // Write the request body if the "Expect: 100-continue" expectation was met.
            Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
            request.body().writeTo(bufferedRequestBody);
            bufferedRequestBody.close();
          } else if (!connection.isMultiplexed()) {
            // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
            // being reused. Otherwise we're still obligated to transmit the request body to leave the
            // connection in a consistent state.
            streamAllocation.noNewStreams();
          }
        }
    
        httpCodec.finishRequest();
    
        if (responseBuilder == null) {
          //读取响应头
          responseBuilder = httpCodec.readResponseHeaders(false);
        }
    
        //构建响应
        Response response = responseBuilder
            .request(request)
            .handshake(streamAllocation.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
    
        int code = response.code();
        if (forWebSocket && code == 101) {
          // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
          response = response.newBuilder()
              .body(Util.EMPTY_RESPONSE)
              .build();
        } else {
          //读取响应体
          response = response.newBuilder()
              .body(httpCodec.openResponseBody(response))
              .build();
        }
    
        if ("close".equalsIgnoreCase(response.request().header("Connection"))
            || "close".equalsIgnoreCase(response.header("Connection"))) {
          streamAllocation.noNewStreams();
        }
    
        if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
          throw new ProtocolException(
              "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
        }
    
        return response;
    }
    

    在前面ConnectInterceptor中建立Socket连接后,okio会解析输入输出流,保存在source和sink中,此时只是建立了Socket连接,并未进行数据传输,CallServerInterceptor的作用就是根据HTTP协议标准,对Request发送以及对Response进行解析。

    在CallServerInterceptor中,首先会从RealInterceptorChain中取出在前面拦截器中创建的四个对象HttpCodec、StreamAllocation、RealConnection、Request。

    过程分析如下:

    1)发送HTTP请求数据(Header&Body)

    首先在sink中写入请求头

    httpCodec.writeRequestHeaders(request);
    
    //Http1Codec
    @Override 
    public void writeRequestHeaders(Request request) throws IOException {
        String requestLine = RequestLine.get(
            request, streamAllocation.connection().route().proxy().type());
        writeRequest(request.headers(), requestLine);
      }
    
    /** Returns bytes of a request header for sending on an HTTP transport. */
    public void writeRequest(Headers headers, String requestLine) throws IOException {
        if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
        sink.writeUtf8(requestLine).writeUtf8("\r\n");
        for (int i = 0, size = headers.size(); i < size; i++) {
          sink.writeUtf8(headers.name(i))
              .writeUtf8(": ")
              .writeUtf8(headers.value(i))
              .writeUtf8("\r\n");
        }
        sink.writeUtf8("\r\n");
        state = STATE_OPEN_REQUEST_BODY;
    }
    
    
    • 读取请求行,这里返回 GET /api/data/Android/10/1 HTTP/1.1
    • 从Request中获取Header,循环写入到sink中

    其次,如果http请求有body(POST请求),再将body写入sink,发送给server

    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
          // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
          // Continue" response before transmitting the request body. If we don't get that, return what
          // we did get (such as a 4xx response) without ever transmitting the request body.
          if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            httpCodec.flushRequest();
            responseBuilder = httpCodec.readResponseHeaders(true);
          }
    
          if (responseBuilder == null) {
            // Write the request body if the "Expect: 100-continue" expectation was met.
            Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
            request.body().writeTo(bufferedRequestBody);
            bufferedRequestBody.close();
          } else if (!connection.isMultiplexed()) {
            // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
            // being reused. Otherwise we're still obligated to transmit the request body to leave the
            // connection in a consistent state.
            streamAllocation.noNewStreams();
          }
    }
    

    最后,把sink中的数据刷出去

    httpCodec.finishRequest();
    

    2)读取响应数据

    分为两个步骤

    ① 读取响应头
    //CallServerInterceptor
    if (responseBuilder == null) {
          responseBuilder = httpCodec.readResponseHeaders(false);
     }
    Response response = responseBuilder
            .request(request)
            .handshake(streamAllocation.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
    
    //Http1Codec
    @Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
        if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
          throw new IllegalStateException("state: " + state);
        }
    
        try {
          StatusLine statusLine = StatusLine.parse(source.readUtf8LineStrict());
    
          Response.Builder responseBuilder = new Response.Builder()
              .protocol(statusLine.protocol)
              .code(statusLine.code)
              .message(statusLine.message)
              .headers(readHeaders());
    
          if (expectContinue && statusLine.code == HTTP_CONTINUE) {
            return null;
          }
    
          state = STATE_OPEN_RESPONSE_BODY;
          return responseBuilder;
        } catch (EOFException e) {
          // Provide more context if the server ends the stream before sending a response.
          IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
          exception.initCause(e);
          throw exception;
        }
      }
    
    • 首先解析响应行、协议、状态吗、响应头
    • 利用responseBuilder构建Response
    ② 读取响应体

    只要不是websocket并且状态码为101(服务器转换协议:服务器将遵从客户的请求转换到另外一种协议)都会读取响应体

    //CallServerInterceptor
    int code = response.code();
      if (forWebSocket && code == 101) {
        // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
        response = response.newBuilder()
            .body(Util.EMPTY_RESPONSE)
            .build();
      } else {
        response = response.newBuilder()
            .body(httpCodec.openResponseBody(response))
            .build();
     }
    
    //Http1Codec
    @Override 
    public ResponseBody openResponseBody(Response response) throws IOException {
        Source source = getTransferStream(response);
        return new RealResponseBody(response.headers(), Okio.buffer(source));
    }
    
    private Source getTransferStream(Response response) throws IOException {
        if (!HttpHeaders.hasBody(response)) {
          return newFixedLengthSource(0);
        }
    
        if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
          return newChunkedSource(response.request().url());
        }
    
        long contentLength = HttpHeaders.contentLength(response);
        if (contentLength != -1) {
          return newFixedLengthSource(contentLength);
        }
    
        // Wrap the input stream from the connection (rather than just returning
        // "socketIn" directly here), so that we can control its use after the
        // reference escapes.
        return newUnknownLengthSource();
    }
    

    然后将ResponseBody更新到Response中的body中。
    至此,整个请求过程执行完毕

    总结

    每个拦截器各司其职,环环相扣,非常优雅地完成了网络请求的流程。最后借piasy一张图,希望读者对OkHttp能有一个更加清晰的认知。

    okhttp_full_process.png

    参考

    https://blog.piasy.com/2016/07/11/Understand-OkHttp/
    http://lowett.com/2017/02/24/okhttp-4/
    http://lowett.com/2017/03/02/okhttp-5/
    http://lowett.com/2017/03/09/okhttp-6/
    http://lowett.com/2017/03/21/okhttp-7/
    http://lowett.com/2017/03/30/okhttp-8/

    相关文章

      网友评论

        本文标题:OkHttp源码分析(三)内置拦截器解析

        本文链接:https://www.haomeiwen.com/subject/gjfytxtx.html