美文网首页Android篇
OkHttp源码分析:五大拦截器详解

OkHttp源码分析:五大拦截器详解

作者: w达不溜w | 来源:发表于2021-03-26 18:54 被阅读0次

    OkHttp源码分析:五大拦截器详解

    一、RetryAndFollowUpInterceptor(重试与重定向拦截器)

    主要完成两件事:重试与重定向

    @Override public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();
        //创建StreamAllocation对象(包含http请求组件)
        StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(request.url()), call, eventListener, callStackTrace);
        
        Response priorResponse = null;
        while (true) {
          if (canceled) {
            streamAllocation.release();
            throw new IOException("Canceled");
          }
    
          Response response;
          boolean releaseConnection = true;
          try {
            response = realChain.proceed(request, streamAllocation, null, null);
            releaseConnection = false;
          } catch (RouteException e) {
            // ① 路由异常,连接未成功,请求还没发出去
            if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
              throw e.getLastConnectException();
            }
            releaseConnection = false;
            continue;
          } catch (IOException e) {
            //②请求发出去了,但是和服务器通信失败了(socket流正在读写数据的时候断开连接)
            boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
            if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
            releaseConnection = false;
            continue;
          } finally {
            // ③不是前两种失败,直接关闭和清理所有资源
            if (releaseConnection) {
              streamAllocation.streamFailed(null);
              streamAllocation.release();
            }
          }
                //...
          //重定向
          Request followUp = followUpRequest(response, streamAllocation.route());
          if (followUp == null) {
            if (!forWebSocket) {
              streamAllocation.release();
            }
            //不需要重定向直接返回response终止循环
            return response;
          }
          //...
        }
      }
    

    重试与重定向拦截器主要处理Response,可以看到RouteException和IOException都是调用了recover,返回true表示允许重试。允许重试—>continue—> while (true)—>realChain.proceed,这就完成了重试的过程。

    private boolean recover(IOException e, StreamAllocation streamAllocation,
                            boolean requestSendStarted, Request userRequest) {
      streamAllocation.streamFailed(e);
    
      // ①OkHttpClient是否设置了不允许重试(默认允许),则一旦请求失败就不再重试——>全局配置
      if (!client.retryOnConnectionFailure()) return false;
      // ②针对某个请求配置是否需要重试 用户自己实现UnrepeatableRequestBody请求体——>单个请求配置
      if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
      // ③是否属于重试的异常(协议异常、超时异常、证书异常、SSL握手未授权异常)
      if (!isRecoverable(e, requestSendStarted)) return false;
      // ④是否存在更多的路由
      if (!streamAllocation.hasMoreRoutes()) return false;
    
      return true;
    }
    
    重试.png

    接着看重定向

    Request followUp = followUpRequest(response, streamAllocation.route());
    
    private Request followUpRequest(Response userResponse, Route route) throws IOException {
       
        int responseCode = userResponse.code();
    
        switch (responseCode) {
          //407 客户端使用了HTTP代理服务器,在请求头中添加"Proxy-Authorization"让代理服务器授权
          case HTTP_PROXY_AUTH:
            return client.proxyAuthenticator().authenticate(route, userResponse);
                //401 服务器接口需要验证使用者身份,在请求头中添加"Authorization"
          case HTTP_UNAUTHORIZED:
            return client.authenticator().authenticate(route, userResponse);
                //308 永久重定向
          case HTTP_PERM_REDIRECT:
          //307临时重定向
          case HTTP_TEMP_REDIRECT:
            //如果请求方式不是GET或HEAD,框架不会自动重定向请求
            if (!method.equals("GET") && !method.equals("HEAD")) {
              return null;
            }
          // 300 301 302 303 各种重定向
          case HTTP_MULT_CHOICE:
          case HTTP_MOVED_PERM:
          case HTTP_MOVED_TEMP:
          case HTTP_SEE_OTHER:
            // 客户端不允许重定向,返回null
            if (!client.followRedirects()) return null;
                    //①从响应头取出location
            String location = userResponse.header("Location");
            if (location == null) return null;
            //②根据location配置新的请求url
            HttpUrl url = userResponse.request().url().resolve(location);
            // 取不出HttpUrl,返回null,不进行重定向
            if (url == null) return null;
    
            //如果重定向在http到https之间切换,检查用户是否允许(默认允许)
            boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme());
            if (!sameScheme && !client.followSslRedirects()) return null;
    
            Request.Builder requestBuilder = userResponse.request().newBuilder();
            //请求不是get与head
            if (HttpMethod.permitsRequestBody(method)) {
              final boolean maintainBody = HttpMethod.redirectsWithBody(method);
              //除了 PROPFIND 请求之外都改成GET请求
              if (HttpMethod.redirectsToGet(method)) {
                requestBuilder.method("GET", null);
              } else {
                RequestBody requestBody = maintainBody ? userResponse.request().body() : null;
                requestBuilder.method(method, requestBody);
              }
              // 不是 PROPFIND 的请求,把请求头中关于请求体的数据删掉
              if (!maintainBody) {
                requestBuilder.removeHeader("Transfer-Encoding");
                requestBuilder.removeHeader("Content-Length");
                requestBuilder.removeHeader("Content-Type");
              }
            }
    
            // 在跨主机重定向时,删除身份验证请求头
            if (!sameConnection(userResponse, url)) {
              requestBuilder.removeHeader("Authorization");
            }
                    //③构建Request
            return requestBuilder.url(url).build();
                //408 客户端请求超时
          case HTTP_CLIENT_TIMEOUT:
            // 判断用户是否允许重试
            if (!client.retryOnConnectionFailure()) {
              return null;
            }
    
            if (userResponse.request().body() instanceof UnrepeatableRequestBody) {
              return null;
            }
                    //本次重试结果还是408,就放弃,不再重复请求。
            if (userResponse.priorResponse() != null
                && userResponse.priorResponse().code() == HTTP_CLIENT_TIMEOUT) {
              // We attempted to retry and got another timeout. Give up.
              return null;
            }
                    //如果服务器告诉我们了 Retry-After 多久后重试,那框架不管了。
            if (retryAfter(userResponse, 0) > 0) {
              return null;
            }
    
            return userResponse.request();
                // 503 服务不可用
          case HTTP_UNAVAILABLE:
            //再次请求还是503,就放弃,不再重复请求。
            if (userResponse.priorResponse() != null
                && userResponse.priorResponse().code() == HTTP_UNAVAILABLE) {
              // We attempted to retry and got another timeout. Give up.
              return null;
            }
                    //服务器告诉我们 Retry-After:0(意思就是立即重试) 才重请求
            if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
              // specifically received an instruction to retry without delay
              return userResponse.request();
            }
    
            return null;
    
          default:
            return null;
        }
      }
    

    重定向总结

    响应码 说明 重定向条件
    407 代理需要授权:如付费代理,需要验证身份 通过proxyAuthenticator获得到了Request(添加"Proxy-Authorization"请求头)
    401 服务器需要授权:如某些接口需要登录后才能使用(不安全) 通过authenticator获得到了Request(添加"Authorization"请求头)
    300、301、302、303、307、308 重定向响应 307和308必须为GET/HEAD请求再继续判断①用户允许自动重定向(默认允许) ②能够获取到Location响应头,并且值为有效的url ③允许http到https切换(默认允许)
    408 客户端请求超时 ①用户允许自动重定向(默认允许) ②本次重试结果不是408 ③服务器未响应Retry-After(稍后重试),或响应Retry-After为0
    503 服务不可用 ①本次重试结果不是503②服务器响应Retry-After为0,立即重试

    另附HTTP响应状态码分类:

    分类 描述
    1xx 信息,服务器收到请求,需要请求者继续执行操作
    2xx 成功,操作被成功接收并处理
    3xx 重定向,需要进一步的操作以完成请求
    4xx 客户端错误,请求包含语法错误或无法完成请求
    5xx 服务端错误,服务器在处理请求的过程中发生了错误

    小结:RetryAndFollowUpInterceptor是整个责任链中的第一个,首次接触到Request和最后接收Response的角色,它的主要功能是判断是否需要重试与重定向。

    重试的前提是出现了RouteException或IOException,会通过recover方法进行判断是否进行重试。

    重定向是发生在重试判定后,不满足重试的条件,会进一步调用followUpRequest根据Response的响应码进行重定向操作。

    二、BridgeInterceptor(桥接拦截器)
    @Override public Response intercept(Chain chain) throws IOException {
        Request userRequest = chain.request();
        Request.Builder requestBuilder = userRequest.newBuilder();
    
        RequestBody body = userRequest.body();
        if (body != null) {
          MediaType contentType = body.contentType();
          if (contentType != null) {
            requestBuilder.header("Content-Type", contentType.toString());
          }
    
          long contentLength = body.contentLength();
          if (contentLength != -1) {
            requestBuilder.header("Content-Length", Long.toString(contentLength));
            requestBuilder.removeHeader("Transfer-Encoding");
          } else {
            requestBuilder.header("Transfer-Encoding", "chunked");
            requestBuilder.removeHeader("Content-Length");
          }
        }
    
        if (userRequest.header("Host") == null) {
          requestBuilder.header("Host", hostHeader(userRequest.url(), false));
        }
    
        if (userRequest.header("Connection") == null) {
          requestBuilder.header("Connection", "Keep-Alive");
        }
    
        // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
        // the transfer stream.
        boolean transparentGzip = false;
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
          transparentGzip = true;
          requestBuilder.header("Accept-Encoding", "gzip");
        }
    
        //读取对应的cookie数据设置进请求头
        List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
        if (!cookies.isEmpty()) {
          requestBuilder.header("Cookie", cookieHeader(cookies));
        }
    
        if (userRequest.header("User-Agent") == null) {
          requestBuilder.header("User-Agent", Version.userAgent());
        }
    
        Response networkResponse = chain.proceed(requestBuilder.build());
            //保存cookie,默认cookieJar不提供的实现(我们可以通过okHttpClient.cookieJarokHttpClient.cookieJar(cookieJar)去设置)
        HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    
        Response.Builder responseBuilder = networkResponse.newBuilder()
            .request(userRequest);
         //如果使用gzip返回的数据,则使用 GzipSource 包装便于解析。
        if (transparentGzip
            && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
          GzipSource responseBody = new GzipSource(networkResponse.body().source());
          Headers strippedHeaders = networkResponse.headers().newBuilder()
              .removeAll("Content-Encoding")
              .removeAll("Content-Length")
              .build();
          responseBuilder.headers(strippedHeaders);
          responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
        }
    
        return responseBuilder.build();
      }
    

    补全请求头:

    请求头 说明
    Content-Type 请求体类型(如 application/x-www-form-urlencoded)
    Content-Length/Transfer-Encoding 请求体解析方式
    Host 请求的主机站点
    Connection:Keep-Alive 默认保持长连接
    Accept-Encoding:gzip 接受响应体使用gzip压缩
    Cookie Cookie身份识别
    User-Agent 用户信息,如操作系统、浏览器等

    小结:BridgeInterceptor是连接应用程序和服务器的桥梁,它为我们补全请求头,将请求转化为符合网络规范的Request。得到响应后:1.保存Cookie,在下次请求会读取对应的cookie数据设置进请求头,默认cookieJar不提供的实现 2.如果使用gzip返回的数据,则使用 GzipSource 包装便于解析。

    三、CacheInterceptor(缓存拦截器)
    @Override public Response intercept(Chain chain) throws IOException {
      //cache不为空从cache中取(GET请求才有缓存)
      Response cacheCandidate = cache != null
          ? cache.get(chain.request())
          : null;
    
      long now = System.currentTimeMillis();
        //缓存策略
      CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
      Request networkRequest = strategy.networkRequest;
      Response cacheResponse = strategy.cacheResponse;
    
      if (cache != null) {
        cache.trackResponse(strategy);
      }
    
      if (cacheCandidate != null && cacheResponse == null) {
        closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
      }
    
      //没有网络也没有缓存 ——> 请求失败直接返回504
      if (networkRequest == null && cacheResponse == null) {
        return new Response.Builder()
            .request(chain.request())
            .protocol(Protocol.HTTP_1_1)
            .code(504)
            .message("Unsatisfiable Request (only-if-cached)")
            .body(Util.EMPTY_RESPONSE)
            .sentRequestAtMillis(-1L)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
      }
    
      // 没有请求,有缓存 ——> 使用缓存
      if (networkRequest == null) {
        return cacheResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .build();
      }
    
      //有网络,无缓存 ——> 去发起请求
      Response networkResponse = null;
      try {
        networkResponse = chain.proceed(networkRequest);
      } finally {
        // If we're crashing on I/O or otherwise, don't leak the cache body.
        if (networkResponse == null && cacheCandidate != null) {
          closeQuietly(cacheCandidate.body());
        }
      }
      // If we have a cache response too, then we're doing a conditional get.
      if (cacheResponse != null) {
        //有网络,有缓存 ——> 服务器返回304(表示无修改),则更新缓存响应并返回。
        if (networkResponse.code() == HTTP_NOT_MODIFIED) {
          Response response = cacheResponse.newBuilder()
              .headers(combine(cacheResponse.headers(), networkResponse.headers()))
              .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
              .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
              .cacheResponse(stripBody(cacheResponse))
              .networkResponse(stripBody(networkResponse))
              .build();
          networkResponse.body().close();
    
          // Update the cache after combining headers but before stripping the
          // Content-Encoding header (as performed by initContentStream()).
          cache.trackConditionalCacheHit();
          cache.update(cacheResponse, response);
          return response;
        } else {
          closeQuietly(cacheResponse.body());
        }
      }
    
      Response response = networkResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .networkResponse(stripBody(networkResponse))
          .build();
    
      if (cache != null) {
        if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
          //cache不为空,有请求头和缓存策略时,通过cache.put进行缓存
          // Offer this request to the cache.
          CacheRequest cacheRequest = cache.put(response);
          return cacheWritingResponse(cacheRequest, response);
        }
    
        if (HttpMethod.invalidatesCache(networkRequest.method())) {
          try {
            cache.remove(networkRequest);
          } catch (IOException ignored) {
            // The cache cannot be written.
          }
        }
      }
    
      return response;
    }
    

    缓存拦截器顾名思义处理缓存的,但是要建立在get请求的基础上,我们可以去通过okHttpClient.cache(cache)去设置。缓存拦截器的处理流程:

    1.从缓存中取出对应请求的响应缓存

    2.通过CacheStrategy判断使用缓存或发起网络请求,此对象中的networkRequest代表需要发起网络请求,cacheResponse表示直接使用缓存。

    networkRequest cacheResponse 说明
    null null 直接返回504(网关超时)
    null not null 直接使用缓存
    not null null 发起请求
    not null not null 发起请求,若得到响应为304(表无修改),则更新缓存响应并返回

    即:networkRequest存在则优先发起网络请求,否则使用cacheResponse缓存,若都不存在则请求失败。

    1. cache不为空,有请求头和缓存策略时,通过cache.put进行缓存

    如果最终判定不能使用缓存,需要发起网络请求,则来到下一个拦截器ConnectInterceptor

    四、ConnectInterceptor(连接拦截器)
    Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        //从拦截器链获取StreamAllocation对象,这里的StreamAllocation对象是在第一个拦截器中初始化完成的(设置了连接池、url路径等),真正使用的地方在这里。
        StreamAllocation streamAllocation = realChain.streamAllocation();
    
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        //创建HttpCodec对象  HttpCodec:用来编码HTTP request和解码HTTP response
        HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
        //获取RealConnection
        RealConnection connection = streamAllocation.connection();
            //执行下一个拦截器,返回response
        return realChain.proceed(request, streamAllocation, httpCodec, connection);
      }
    

    StreamAllocation对象是在第一个拦截器RetryAndFollowUpInterceptor中初始化完成的(设置了连接池、url路径等),当一个请求发出,需要建立连接,建立连接之后需要使用流来读取数据,这个StreamAllocation就是协调请求、连接与数据流三者之前的关系,它负责为一次请求寻找连接,然后获得流来实现网络通信。

    public final class StreamAllocation {
        private final ConnectionPool connectionPool;
      private RealConnection connection;
        //...
      
      public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
        int connectTimeout = client.connectTimeoutMillis();
        int readTimeout = client.readTimeoutMillis();
        int writeTimeout = client.writeTimeoutMillis();
        boolean connectionRetryEnabled = client.retryOnConnectionFailure();
    
        try {
          RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
              writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
          HttpCodec resultCodec = resultConnection.newCodec(client, this);
    
          synchronized (connectionPool) {
            codec = resultCodec;
            return resultCodec;
          }
        } catch (IOException e) {
          throw new RouteException(e);
        }
      }
    }
    

    StreamAllocation对象有两个关键角色:

    • RealConnection:真正建立Socket连接的对象
    • ConnectionPool:连接池,用来管理和复用Socket连接

    真正的连接是在RealConnection中实现的,连接由ConnectionPool管理。

    //ConnectionPool.java
    public ConnectionPool() {
      //连接池最多保存5个连接的keep-alive,每个时长5分钟
      this(5, 5, TimeUnit.MINUTES);
    }
    
    public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
        this.maxIdleConnections = maxIdleConnections;
        this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
      }
    
    void put(RealConnection connection) {
      assert (Thread.holdsLock(this));
      if (!cleanupRunning) {
        cleanupRunning = true;
        //清理无效的连接
        executor.execute(cleanupRunnable);
      }
      connections.add(connection);
    }
    
    private final Runnable cleanupRunnable = new Runnable() {
        @Override public void run() {
          while (true) {
            long waitNanos = cleanup(System.nanoTime());
            if (waitNanos == -1) return;
            if (waitNanos > 0) {
              long waitMillis = waitNanos / 1000000L;
              waitNanos -= (waitMillis * 1000000L);
              synchronized (ConnectionPool.this) {
                try {
                  ConnectionPool.this.wait(waitMillis, (int) waitNanos);
                } catch (InterruptedException ignored) {
                }
              }
            }
          }
        }
      };
    
    //获取可复用的连接
    RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
      assert (Thread.holdsLock(this));
      for (RealConnection connection : connections) {
        //要拿到的连接与连接池中的连接,连接的配置(DNS、代理、SSL证书、服务器域名、端口等)一致,就可以复用
        if (connection.isEligible(address, route)) {
          streamAllocation.acquire(connection, true);
          return connection;
        }
      }
      return null;
    }
    
    long cleanup(long now) {
        int inUseConnectionCount = 0;
        int idleConnectionCount = 0;
        RealConnection longestIdleConnection = null;
        long longestIdleDurationNs = Long.MIN_VALUE;
    
        // Find either a connection to evict, or the time that the next eviction is due.
        synchronized (this) {
          for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            RealConnection connection = i.next();
                    //检查连接是否正在使用
            // If the connection is in use, keep searching.
            if (pruneAndGetAllocationCount(connection, now) > 0) {
              inUseConnectionCount++;
              continue;
            }
                    //否则记录闲置连接数
            idleConnectionCount++;
    
            // If the connection is ready to be evicted, we're done.
            //计算这个连接已经闲置多久
            long idleDurationNs = now - connection.idleAtNanos;
            //找到闲置最久的连接
            if (idleDurationNs > longestIdleDurationNs) {
              longestIdleDurationNs = idleDurationNs;
              longestIdleConnection = connection;
            }
          }
                //keep-alive时间>=5分钟||连接池内闲置连接>5,立即移除
          if (longestIdleDurationNs >= this.keepAliveDurationNs
              || idleConnectionCount > this.maxIdleConnections) {
            connections.remove(longestIdleConnection);
          } else if (idleConnectionCount > 0) {
            //池内存在闲置连接,就等待 (保活时间-最长闲置时间,即到期时间)
            return keepAliveDurationNs - longestIdleDurationNs;
          } else if (inUseConnectionCount > 0) {
            //有正在使用的连接,5分钟后再清理
            return keepAliveDurationNs;
          } else {
            //无连接,停止清理(下次put会再次启动)
            // No connections, idle or in use.
            cleanupRunning = false;
            return -1;
          }
        }
            //关闭连接,返回时间0,立即再次进行请求(cleanupRunnable的while (true)会立即执行)
        closeQuietly(longestIdleConnection.socket());
        // Cleanup again immediately.
        return 0;
      }
    

    接着我们看下RealConnection的创建和连接的建立:

    streamAllocation.newStream—>findHealthyConnection—>findConnection

    private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
          boolean connectionRetryEnabled) throws IOException {
        Route selectedRoute;
        synchronized (connectionPool) {
          if (released) throw new IllegalStateException("released");
          if (codec != null) throw new IllegalStateException("codec != null");
          if (canceled) throw new IOException("Canceled");
                //①StreamAllocation的connection如果可以复用则复用
          // Attempt to use an already-allocated connection.
          RealConnection allocatedConnection = this.connection;
          if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
            return allocatedConnection;
          }
                
          //②如果connection不能复用,则从连接池中获取RealConnection对象,获取成功则返回
          // Attempt to get a connection from the pool.
          Internal.instance.get(connectionPool, address, this, null);
          if (connection != null) {
            return connection;
          }
    
          selectedRoute = route;
        }
    
        // If we need a route, make one. This is a blocking operation.
        if (selectedRoute == null) {
          selectedRoute = routeSelector.next();
        }
    
            RealConnection result;
        synchronized (connectionPool) {
          if (canceled) throw new IOException("Canceled");
    
          // Now that we have an IP address, make another attempt at getting a connection from the pool.
          // This could match due to connection coalescing.
          Internal.instance.get(connectionPool, address, this, selectedRoute);
          if (connection != null) {
            route = selectedRoute;
            return connection;
          }
    
          // Create a connection and assign it to this allocation immediately. This makes it possible
          // for an asynchronous cancel() to interrupt the handshake we're about to do.
          route = selectedRoute;
          refusedStreamCount = 0;
          //③如果连接池里没有,则new一个RealConnection对象
          result = new RealConnection(connectionPool, selectedRoute);
          acquire(result);
        }
     
        // Do TCP + TLS handshakes. This is a blocking operation.
        //④调用RealConnection的connect()方法发起请求
        result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
        routeDatabase().connected(result.route());
    
        Socket socket = null;
        synchronized (connectionPool) {
          // ⑤将RealConnection对象存进连接池中,以便下次复用
          Internal.instance.put(connectionPool, result);
    
          // If another multiplexed connection to the same address was created concurrently, then
          // release this connection and acquire that one.
          if (result.isMultiplexed()) {
            socket = Internal.instance.deduplicate(connectionPool, address, this);
            result = connection;
          }
        }
        closeQuietly(socket);
            //⑥返回RealConnection对象
        return result;
      }
    

    findConnection:

    ①StreamAllocation的connection如果可以复用则复用

    ②如果connection不能复用,则从连接池中获取RealConnection对象,获取成功则返回

    ③如果连接池里没有,则new一个RealConnection对象

    ④调用RealConnection的connect()方法发起请求

    ⑤将RealConnection对象存进连接池中,以便下次复用

    ⑥返回RealConnection对象

    //RealConnection.java
    public void connect(
          int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
             //...
           //进行Socket连接
           connectSocket(connectTimeout, readTimeout);
           //...
      }
    
    
    private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
        Proxy proxy = route.proxy();
        Address address = route.address();
    
        rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
            ? address.socketFactory().createSocket()
            : new Socket(proxy);
    
        rawSocket.setSoTimeout(readTimeout);
        try {
          //建立socket连接  最终调用Java里的套接字Socket里的connect()方法。
          Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
        } catch (ConnectException e) {
          ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
          ce.initCause(e);
          throw ce;
        }
    

    小结:

    ConnectInterceptor拦截器从拦截器链中获取StreamAllocation对象,这个对象在第一个拦截器中创建,在ConnectInterceptor中才用到。

    执行StreamAllocation对象的newStream方法创建HttpCodec对象,用来编码HTTP request和解码HTTP response。

    newStream方法里面通过findConnection方法返回了一个RealConnection对象。

    StreamAllocation对象的connect方法拿到上面返回的RealConnection对象,这个RealConnection对象是用来进行实际的网络IO传输的。

    五、CallServerInterceptor(请求服务器拦截器)
    @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        //①获取拦截器链中的HttpCodec、StreamAllocation、RealConnection对象
        HttpCodec httpCodec = realChain.httpStream();
        StreamAllocation streamAllocation = realChain.streamAllocation();
        RealConnection connection = (RealConnection) realChain.connection();
        Request request = realChain.request();
    
        long sentRequestMillis = System.currentTimeMillis();
    
        realChain.eventListener().requestHeadersStart(realChain.call());
        //②调用httpCodec.writeRequestHeaders(request)将请求头写入缓存(见下面Http2Codec的writeRequestHeaders方法)
        httpCodec.writeRequestHeaders(request);
        realChain.eventListener().requestHeadersEnd(realChain.call(), request);
    
        Response.Builder responseBuilder = null;
        if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
          //③判断是否有请求体,如果有,请求头通过携带特殊字段 Expect:100-continue来询问服务器是否愿意接受请求体。(一般用于上传大容量请求体或者需要验证)
          if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            //调用真正发送给服务器
            httpCodec.flushRequest();
            realChain.eventListener().responseHeadersStart(realChain.call());
            //服务端返回100,表示愿意接受请求体responseBuilder为null(见下面Http2Codec的readResponseHeaders方法)
            responseBuilder = httpCodec.readResponseHeaders(true);
          }
    
          if (responseBuilder == null) {
            //服务器愿意会响应100(responseBuilder 即为nul)。这时候才能够继续发送剩余请求数据。
            // Write the request body if the "Expect: 100-continue" expectation was met.
            realChain.eventListener().requestBodyStart(realChain.call());
            long contentLength = request.body().contentLength();
            CountingSink requestBodyOut =
                new CountingSink(httpCodec.createRequestBody(request, contentLength));
            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
                    //写入请求体
            request.body().writeTo(bufferedRequestBody);
            bufferedRequestBody.close();
            realChain.eventListener()
                .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
          } else if (!connection.isMultiplexed()) {
            // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
            // from being reused. Otherwise we're still obligated to transmit the request body to
            // leave the connection in a consistent state.
            //服务器不愿意接受请求体,调用noNewStreams关闭相关socket
            streamAllocation.noNewStreams();
          }
        }
            //④结束请求
        httpCodec.finishRequest();
      
      /**代码走到这里的responseBuilder情况为:
        * 1.post请求,请求头包含Expect,服务端允许接受请求体,并且已经发出了请求体,responseBuilder为null
        * 2.post请求,请求头包含Expect,服务端不允许接受请求体,responseBuilder不为null
        * 3.post请求,没有请求体,responseBuilder为null
        *   3.get请求,responseBuilder为null
        */
      
        if (responseBuilder == null) {
          realChain.eventListener().responseHeadersStart(realChain.call());
        //根据服务器返回的数据构建 responseBuilder对象  (传入的expectContinue为false,不会return null)
          responseBuilder = httpCodec.readResponseHeaders(false);
        }
    
        //⑤构建Response对象
        Response response = responseBuilder
            .request(request)
            .handshake(streamAllocation.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
            //第一次查询服务器是否支持接受请求体的,而不是真正的请求对应的结果响应。所以接着:
        int code = response.code();
        if (code == 100) {
          // server sent a 100-continue even though we did not request one.
          // try again to read the actual response
          //如果响应了100,这代表了请求Expect: 100-continue成功响应,需要马上再次读取一份响应头,这才是真正的请求对应的响应头。
          responseBuilder = httpCodec.readResponseHeaders(false);
                //⑤构建Response对象
          response = responseBuilder
                  .request(request)
                  .handshake(streamAllocation.connection().handshake())
                  .sentRequestAtMillis(sentRequestMillis)
                  .receivedResponseAtMillis(System.currentTimeMillis())
                  .build();
    
          code = response.code();
        }
    
        realChain.eventListener()
                .responseHeadersEnd(realChain.call(), response);
    
        if (forWebSocket && code == 101) {
          //WebSocket请求
          // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
          response = response.newBuilder()
              .body(Util.EMPTY_RESPONSE)
              .build();
        } else {
          //读取响应体数据
          response = response.newBuilder()
              .body(httpCodec.openResponseBody(response))
              .build();
        }
            //客户端或者服务端不希望长连接,那么就关闭socket
        if ("close".equalsIgnoreCase(response.request().header("Connection"))
            || "close".equalsIgnoreCase(response.header("Connection"))) {
          streamAllocation.noNewStreams();
        }
            //如果服务器返回204/205(表示没有响应体),但是解析Content-Lenght>0(表示响应体字节长度),出现冲突,抛出协议异常
        if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
          throw new ProtocolException(
              "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
        }
            //⑥返回response
        return response;
      }
    

    writeRequestHeaders和readResponseHeaders(以Http2Codec为例)

    //Http2Codec.java
    
     private Http2Stream stream;
    
    @Override public void writeRequestHeaders(Request request) throws IOException {
      if (stream != null) return;
      boolean hasRequestBody = request.body() != null;
      List<Header> requestHeaders = http2HeadersList(request);
      stream = connection.newStream(requestHeaders, hasRequestBody);
      stream.readTimeout().timeout(chain.readTimeoutMillis(), TimeUnit.MILLISECONDS);
      stream.writeTimeout().timeout(chain.writeTimeoutMillis(), TimeUnit.MILLISECONDS);
    }
    
    @Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
        List<Header> headers = stream.takeResponseHeaders();
        Response.Builder responseBuilder = readHttp2HeadersList(headers);
        if (expectContinue && Internal.instance.code(responseBuilder) == HTTP_CONTINUE) {
          //注意传过来的expectContinue
          //服务端返回100,表示愿意接受请求体 responseBuilder为null
          return null;
        }
        return responseBuilder;
      }
    

    小结:CallServerInterceptor完成HTTP协议报文的封装和解析。

    ①获取拦截器链中的HttpCodec、StreamAllocation、RealConnection对象

    ②调用httpCodec.writeRequestHeaders(request)将请求头写入缓存

    ③判断是否有请求体,如果有,请求头通过携带特殊字段 Expect:100-continue来询问服务器是否愿意接受请求体。(一般用于上传大容量请求体或者需要验证)

    ④通过httpCodec.finishRequest()结束请求

    ⑤通过responseBuilder构建Response

    ⑥返回Response

    相关文章

      网友评论

        本文标题:OkHttp源码分析:五大拦截器详解

        本文链接:https://www.haomeiwen.com/subject/rhckhltx.html