美文网首页
OkHttp源码解析 -- 拦截器链模式网络请求

OkHttp源码解析 -- 拦截器链模式网络请求

作者: PuHJ | 来源:发表于2018-03-31 17:27 被阅读48次

    前言:

    拦截器的介绍如下,它是OkHttp的一个重要功能
    /**
     * Observes, modifies, and potentially short-circuits requests going out and the corresponding
     * responses coming back in. Typically interceptors add, remove, or transform headers on the request
     * or response.
     */
    观察,修改并潜在地缩短请求发出的请求并返回相应的响应。通常,拦截器会在请求或响应中添加,删除或转换标头。
    
    官网拦截器链

    其中应用拦截器和网络拦截器都是我们自己定义的。中间的OkHttp core是系统为我们提供的。系统共为我们提供了五个拦截器。调用顺序如下:


    责任链顺序

    这五个拦截器的大概作用:
    RetryAndFollowUpInterceptor:重试和重定向拦截器
    BridgeInterceptor : 适配桥接拦截器,补充必须的请求头
    CacheIntercotor: 缓存拦截器,做些缓存操作
    ConnectInterceptor:连接拦截器,处理些可用连接操作
    CallServerInterceptor:请求服务器拦截器,将请求信息写入,并接受服务器的输出流

    1、责任链

    getResponseWithInterceptorChain是外部的调用的入口,开始处理整个责任链

    Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        // 保存所有的责任链(我们自己定义的和系统提供的五大责任链)
        List<Interceptor> interceptors = new ArrayList<>();
        interceptors.addAll(client.interceptors());
       //retryAndFollowUpInterceptor 这个责任链在初始化RealCall的时候就初始化了,因为需要用到一些他的方法
        interceptors.add(retryAndFollowUpInterceptor);
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        interceptors.add(new CacheInterceptor(client.internalCache()));
        interceptors.add(new ConnectInterceptor(client));
        // 这个参数是从OkHttpClient中传递过来的
        if (!forWebSocket) {
          interceptors.addAll(client.networkInterceptors());
        }
        interceptors.add(new CallServerInterceptor(forWebSocket));
        // 构建开始责任链, 他需要的参数有:
        // 整个责任链集合和index坐标,通过这两个就可以找到处理的Interceptor
       // originalRequest 原始请求Request 和 OkHttpClient中传递的连接、读写时间
        Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
            originalRequest, this, eventListener, client.connectTimeoutMillis(),
            client.readTimeoutMillis(), client.writeTimeoutMillis());
        // 最后执行整个责任链
        return chain.proceed(originalRequest);
      }
    

    通过这几天,我对责任链的理解如下:
    1.就是将处理类以链式的方法处理;
    2.对于组成一条链,需要两个方法,获取请求参数和得到响应回复;
    3.怎么组成链式了,也就是怎么创建下一个拦截器链。在链式响应的时候创建一个链并处理下个拦截器方法。

    为了方便理解,模仿写了一个拦截器:

    // 拦截器和链的接口定义
    public interface Interceptor {
    
        // 执行拦截方法,并传入下一个拦截器链这样才能组成一条链
        String intercept(Chain chain);
    
        // 拦截器链
        interface Chain{
       
            // 获取请求参数
            int request();
            
            // 创建爱你
            String proceed(int request);
        }
    }
    
    
    // 责任链的实现类
    public class RealChain implements Interceptor.Chain {
    
    
        private int index;
        private int request;
        private List<Interceptor> interceptors;
    
        public RealChain(List<Interceptor> interceptors,int index,int request){
            this.index = index;
            this.interceptors = interceptors;
            this.request = request;
        }
    
        @Override
        public int request() {
            return request;
        }
    
        // 责任链处理的时候,生成下一个Chain,并用当前拦截器拦截处理
        @Override
        public String proceed(int request) {
    
            request++;
            RealChain next = new RealChain(interceptors,index+1,request);
    
            return interceptors.get(index).intercept(next);
        }
    }
    

    拦截器实现类

    public class InterceptorOne implements Interceptor {
    
        @Override
        public String intercept(Chain chain) {
            int request = chain.request();
            return chain.proceed(request);
        }
    }
    
    public class InterceptorTwo implements Interceptor {
        @Override
        public String intercept(Chain chain) {
            int request = chain.request();
            return chain.proceed(request);
        }
    }
    
    // 最后一个拦截器自己处理了,而不是将处理委托给下一个拦截器
    public class InterceptorThree implements Interceptor {
    
        @Override
        public String intercept(Chain chain) {
            int request = chain.request();
            return "http 回复";
        }
    }
    

    调用实现责任链

        String getResponseWithInterceptorChain() {
            // Build a full stack of interceptors.
            List<com.phj.okhttp.Interceptor> interceptors = new ArrayList<>();
            com.phj.okhttp.Interceptor interceptor = new InterceptorOne();
            interceptors.add(interceptor);
            interceptors.add(new InterceptorTwo());
            interceptors.add(new InterceptorThree());
    
            RealChain chain = new RealChain(interceptors, 0, 1);
            return chain.proceed(1);
        }
    

    上面模仿了实现OkHttp责任链拦截器,其实责任链设计模式不确定,大体一样。上一个处理的类中要存有下一个处理类的引用即可处理。

    2、RetryAndFollowUpInterceptor

    /**
     * This interceptor recovers from failures and follows redirects as necessary. It may throw an
     * {@link IOException} if the call was canceled.
     */
    这个拦截器从故障中恢复并在必要时遵循重定向。 如果呼叫被取消,它可能会抛出IOException
    

    最主要的拦截代码
    网络请求中会遇到很多问题,这时候的返回码就不是200,response可能不一定能用,也可能抛出了异常

      @Override public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Call call = realChain.call();
        EventListener eventListener = realChain.eventListener();
        // 进行流分配的,是为了创建Http请求的那些组件的;这这里创建,但不在这里使用,会在connectInterceptor中使用,用于连接服务端的输入输出流
     参数分别对应:连接池,连接线路Address, 事件接口回调,堆栈对象
        streamAllocation = new StreamAllocation(client.connectionPool(), createAddress(request.url()),
            call, eventListener, callStackTrace);
    
        int followUpCount = 0;
        Response priorResponse = null;
        while (true) {
          // 取消了 则抛出异常
          if (canceled) {
            streamAllocation.release();
            throw new IOException("Canceled");
          }
    
          Response response;
          boolean releaseConnection = true;
          try {
           // 网络请求
            response = realChain.proceed(request, streamAllocation, null, null);
            releaseConnection = false;
          } catch (RouteException e) {
            // The attempt to connect via a route failed. The request will not have been sent.
            if (!recover(e.getLastConnectException(), false, request)) {
              throw e.getLastConnectException();
            }
            releaseConnection = false;
            continue;
          } catch (IOException e) {
            // An attempt to communicate with a server failed. The request may have been sent.
            boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
            if (!recover(e, requestSendStarted, request)) throw e;
            releaseConnection = false;
            continue;
          } finally {
            // We're throwing an unchecked exception. Release any resources.
            if (releaseConnection) {
              streamAllocation.streamFailed(null);
              streamAllocation.release();
            }
          }
    
          // Attach the prior response if it exists. Such responses never have a body.
          if (priorResponse != null) {
            response = response.newBuilder()
                .priorResponse(priorResponse.newBuilder()
                        .body(null)
                        .build())
                .build();
          }
         // 判断是不是要进一步的处理,返回null则不需要再处理了,直接返回response
          Request followUp = followUpRequest(response);
    
          if (followUp == null) {
            if (!forWebSocket) {
              streamAllocation.release();
            }
            return response;
          }
    
         // 关闭response.body()
          closeQuietly(response.body());
         // 判断最大的重来次数 太多了则释放streamAllocation
          if (++followUpCount > MAX_FOLLOW_UPS) {
            streamAllocation.release();
            throw new ProtocolException("Too many follow-up requests: " + followUpCount);
          }
    
          if (followUp.body() instanceof UnrepeatableRequestBody) {
            streamAllocation.release();
            throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
          }
         // 是否有相同的连接
          if (!sameConnection(response, followUp.url())) {
            streamAllocation.release();
            streamAllocation = new StreamAllocation(client.connectionPool(),
                createAddress(followUp.url()), call, eventListener, callStackTrace);
          } else if (streamAllocation.codec() != null) {
            throw new IllegalStateException("Closing the body of " + response
                + " didn't close its backing stream. Bad interceptor?");
          }
    
          request = followUp;
          priorResponse = response;
        }
      }
    

    总结:
    1、创建streamAllocation对象,它是创建网络请求响应的所有组件
    2、调用下一个拦截器链进行网络请求response = realChain.proceed(request, streamAllocation, null, null);
    3、根据异常结果或者响应结果判断是否需要重新请求

    3、BridgeInterceptor拦截器

    /**
     * Bridges from application code to network code. First it builds a network request from a user
     * request. Then it proceeds to call the network. Finally it builds a user response from the network
     * response.
     */
    从应用程序代码到网络代码的桥梁。 首先它从用户请求建立一个网络请求。 然后继续呼叫网络。 最后,它从网络响应中建立用户响应。
    

    BridgeInterceptor会对补充没有填写的但必须的请求参数,将应用的请求参数和网络的参数进行转化成各自需要的参数。

    拦截方法如下:
    cookieJar 主要就是存取Cookie的

     @Override public Response intercept(Chain chain) throws IOException {
        Request userRequest = chain.request();
        // 生成新的Request.Builder
        Request.Builder requestBuilder = userRequest.newBuilder();
    
        RequestBody body = userRequest.body();
        if (body != null) {
          MediaType contentType = body.contentType();
          if (contentType != null) {
            requestBuilder.header("Content-Type", contentType.toString());
          }
    
          long contentLength = body.contentLength();
          if (contentLength != -1) {
            requestBuilder.header("Content-Length", Long.toString(contentLength));
            requestBuilder.removeHeader("Transfer-Encoding");
          } else {
            requestBuilder.header("Transfer-Encoding", "chunked");
            requestBuilder.removeHeader("Content-Length");
          }
        }
    
        if (userRequest.header("Host") == null) {
          requestBuilder.header("Host", hostHeader(userRequest.url(), false));
        }
        // Keep-Alive 是进行连接复用的基础
        if (userRequest.header("Connection") == null) {
          requestBuilder.header("Connection", "Keep-Alive");
        }
    
        // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
        // the transfer stream.
       // 是否支持Gzip
        boolean transparentGzip = false;
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
          transparentGzip = true;
          requestBuilder.header("Accept-Encoding", "gzip");
        }
        // cookie的处理
        List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
        if (!cookies.isEmpty()) {
          requestBuilder.header("Cookie", cookieHeader(cookies));
        }
    
        if (userRequest.header("User-Agent") == null) {
          requestBuilder.header("User-Agent", Version.userAgent());
        }
        // 网络请求
        Response networkResponse = chain.proceed(requestBuilder.build());
        // 保存cookie
        HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    
        Response.Builder responseBuilder = networkResponse.newBuilder()
            .request(userRequest);
    
        // gzip的处理,交给Okio处理
        if (transparentGzip
            && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
          // 转化成GzipSource类型
          GzipSource responseBody = new GzipSource(networkResponse.body().source());
          Headers strippedHeaders = networkResponse.headers().newBuilder()
              .removeAll("Content-Encoding")
              .removeAll("Content-Length")
              .build();
          responseBuilder.headers(strippedHeaders);
          String contentType = networkResponse.header("Content-Type");
          // 交给Okio处理压缩
          responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
        }
    
        return responseBuilder.build();
      }
    

    1、补充头部信息,让request成为可以发送网络请求的
    2、默认连接是Keep-Alive,他可以让一定的时间内不会关闭TCP连接,达到复用作用
    3、对返回的Response进行处理,如gzip处理等,转化成我们需要的Response

    四、CacheInterceptor 缓存拦截器

    /** Serves requests from the cache and writes responses to the cache. */
    提供来自缓存的请求并将响应写入缓存
    

    如何使用:
    在构建OkHttpClient的时候加入

     cache(new Cache(new File("cache"),10*1024*1024))
    

    4.1 Cache 类

    先来看看Cache这个类,先看介绍中的内容;

    他的作用就是在一定的时间段内,从缓存中获取Response返回
     * <h3>Force a Network Response</h3>
     *
     * <p>In some situations, such as after a user clicks a 'refresh' button, it may be necessary to
     * skip the cache, and fetch data directly from the server. To force a full refresh, add the {@code
     * no-cache} directive: <pre>   {@code
     *    // 跳过缓存,强制从服务器中获取新的数据
     *   Request request = new Request.Builder()
     *       .cacheControl(new CacheControl.Builder().noCache().build())
     *       .url("http://publicobject.com/helloworld.txt")
     *       .build();
     * }</pre>
     *
     * If it is only necessary to force a cached response to be validated by the server, use the more
     * efficient {@code max-age=0} directive instead: <pre>   {@code
     *   // 只需要让服务器来处理缓存
     *   Request request = new Request.Builder()
     *       .cacheControl(new CacheControl.Builder()
     *           .maxAge(0, TimeUnit.SECONDS)
     *           .build())
     *       .url("http://publicobject.com/helloworld.txt")
     *       .build();
     * }</pre>
    
     *
     * <h3>Force a Cache Response</h3>
     *
     * <p>Sometimes you'll want to show resources if they are available immediately, but not otherwise.
     * This can be used so your application can show <i>something</i> while waiting for the latest data
     * to be downloaded. To restrict a request to locally-cached resources, add the {@code
     * only-if-cached} directive: <pre>   {@code
     *      //只读取缓存中的内容
     *     Request request = new Request.Builder()
     *         .cacheControl(new CacheControl.Builder()
     *             .onlyIfCached()
     *             .build())
     *         .url("http://publicobject.com/helloworld.txt")
     *         .build();
     *     Response forceCacheResponse = client.newCall(request).execute();
     *     if (forceCacheResponse.code() != 504) {
     *       // The resource was cached! Show it.
     *     } else {
     *       // The resource was not cached.
     *     }
     * }</pre>
    

    在Cache类中存在InternalCache这个接口实现对象,它的所有的方法实际上都是Cache这个类执行的。

     final InternalCache internalCache = new InternalCache() {
        @Override public Response get(Request request) throws IOException {
          return Cache.this.get(request);
        }
    
        @Override public CacheRequest put(Response response) throws IOException {
          return Cache.this.put(response);
        }
    
        @Override public void remove(Request request) throws IOException {
          Cache.this.remove(request);
        }
    
        @Override public void update(Response cached, Response network) {
          Cache.this.update(cached, network);
        }
    
        @Override public void trackConditionalCacheHit() {
          Cache.this.trackConditionalCacheHit();
        }
    
        @Override public void trackResponse(CacheStrategy cacheStrategy) {
          Cache.this.trackResponse(cacheStrategy);
        }
      };
    

    为此再来看下Cache中的put方法

     @Nullable CacheRequest put(Response response) {
        String requestMethod = response.request().method();
    
        if (HttpMethod.invalidatesCache(response.request().method())) {
          try {
            remove(response.request());
          } catch (IOException ignored) {
            // The cache cannot be written.
          }
          return null;
        }
       // 只对GET方法起作用 其他的不存在缓存
        if (!requestMethod.equals("GET")) {
          // Don't cache non-GET responses. We're technically allowed to cache
          // HEAD requests and some POST requests, but the complexity of doing
          // so is high and the benefit is low.
          return null;
        }
    
        if (HttpHeaders.hasVaryAll(response)) {
          return null;
        }
          // 保存在缓存中的实体,将Response封装成Entry对象处理,包装类而已
        Entry entry = new Entry(response);
        // 缓存使用DiskLruCache策略操作完成的,Okhttp对DiskLruCache有改动
        DiskLruCache.Editor editor = null;
        try {
           // 将URL转换成对应的key,进行加密,作为键
          editor = cache.edit(key(response.request().url()));
          if (editor == null) {
            return null;
          }
           // 写入磁盘中,写入的有响应的头部信息和请求的头部信息,如果是HTTPS则还有握手信息
          entry.writeTo(editor);
         // 暴露给缓存拦截器使用
          return new CacheRequestImpl(editor);
        } catch (IOException e) {
          abortQuietly(editor);
          return null;
        }
      }
    
    

    Cache中的get方法,根据url读取相应体Response

    @Nullable Response get(Request request) {
        String key = key(request.url());
        DiskLruCache.Snapshot snapshot;
        Entry entry;
        try {
          snapshot = cache.get(key);
          if (snapshot == null) {
            return null;
          }
        } catch (IOException e) {
          // Give up because the cache cannot be read.
          return null;
        }
    
        try {
        /// 生成该Entry对象
          entry = new Entry(snapshot.getSource(ENTRY_METADATA));
        } catch (IOException e) {
          Util.closeQuietly(snapshot);
          return null;
        }
        // 组成Response并返回
        Response response = entry.response(snapshot);
    
        if (!entry.matches(request, response)) {
          Util.closeQuietly(response.body());
          return null;
        }
    
        return response;
      }
    
    

    请求拦截器的操作

     @Override public Response intercept(Chain chain) throws IOException {
      // 尝试获取缓存
        Response cacheCandidate = cache != null
            ? cache.get(chain.request())
            : null;
    
        long now = System.currentTimeMillis();
      /// 缓存的策略,内部维护了request和response对象,来决定使用网络还是缓存还是网络和缓存并用对比的策略
        CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
        Request networkRequest = strategy.networkRequest;
        Response cacheResponse = strategy.cacheResponse;
    
        if (cache != null) {
        // 缓存的命中率
          cache.trackResponse(strategy);
        }
    
        if (cacheCandidate != null && cacheResponse == null) {
          closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
        }
    
        // If we're forbidden from using the network and the cache is insufficient, fail.
        // 没有网络请求有没有缓存,构建一个504响应
        if (networkRequest == null && cacheResponse == null) {
          return new Response.Builder()
              .request(chain.request())
              .protocol(Protocol.HTTP_1_1)
              .code(504)
              .message("Unsatisfiable Request (only-if-cached)")
              .body(Util.EMPTY_RESPONSE)
              .sentRequestAtMillis(-1L)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();
        }
    
        // If we don't need the network, we're done.
    // 有缓存又不能使用网络,直接返回网络结果
        if (networkRequest == null) {
          return cacheResponse.newBuilder()
              .cacheResponse(stripBody(cacheResponse))
              .build();
        }
    
        Response networkResponse = null;
        try {
          // 进行网络请求
          networkResponse = chain.proceed(networkRequest);
        } finally {
          // If we're crashing on I/O or otherwise, don't leak the cache body.
          if (networkResponse == null && cacheCandidate != null) {
            closeQuietly(cacheCandidate.body());
          }
        }
    
        // If we have a cache response too, then we're doing a conditional get.
        if (cacheResponse != null) {
         // 回复一个304,代表从缓存中读取
          if (networkResponse.code() == HTTP_NOT_MODIFIED) {
            Response response = cacheResponse.newBuilder()
                .headers(combine(cacheResponse.headers(), networkResponse.headers()))
                .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
                .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
                .cacheResponse(stripBody(cacheResponse))
                .networkResponse(stripBody(networkResponse))
                .build();
            networkResponse.body().close();
    
            // Update the cache after combining headers but before stripping the
            // Content-Encoding header (as performed by initContentStream()).
            cache.trackConditionalCacheHit();
            cache.update(cacheResponse, response);
            return response;
          } else {
            closeQuietly(cacheResponse.body());
          }
        }
    
        Response response = networkResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
    
        if (cache != null) {
          if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
            // Offer this request to the cache.
            // 满足条件,有响应体又可以写入,那就想入response
            CacheRequest cacheRequest = cache.put(response);
            return cacheWritingResponse(cacheRequest, response);
          }
    
          if (HttpMethod.invalidatesCache(networkRequest.method())) {
            try {
              cache.remove(networkRequest);
            } catch (IOException ignored) {
              // The cache cannot be written.
            }
          }
        }
    
        return response;
      }
    

    5、ConnectInterceptor 连接拦截器

    /** Opens a connection to the target server and proceeds to the next interceptor. */
    打开到目标服务器的连接并转到下一个拦截器
    

    看下拦截方法

     @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
       // 获取从重定向拦截器中的StreamAllocation 
        StreamAllocation streamAllocation = realChain.streamAllocation();
    
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
          // 这是个编码Request和解码Response
        HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
       // 进行实际的网络IO传输的,传递给下一个拦截器使用
        RealConnection connection = streamAllocation.connection();
    
        return realChain.proceed(request, streamAllocation, httpCodec, connection);
      }
    

    streamAllocation.newStream(client, chain, doExtensiveHealthChecks)的分析,大体分为两步

    public HttpCodec newStream(
          OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
        int connectTimeout = chain.connectTimeoutMillis();
        int readTimeout = chain.readTimeoutMillis();
        int writeTimeout = chain.writeTimeoutMillis();
        boolean connectionRetryEnabled = client.retryOnConnectionFailure();
    
        try {
         // 找到一个健康可用的RealConnection 
          RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
              writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
          // 并通过RealConnection 生成HttpCodec 
          HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
    
          synchronized (connectionPool) {
            codec = resultCodec;
            return resultCodec;
          }
        } catch (IOException e) {
          throw new RouteException(e);
        }
      }
    
    RealConnection 的处理
      /**
       * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
       * until a healthy connection is found.
       */
      private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
          int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
          throws IOException {
        while (true) {
          RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
              connectionRetryEnabled);
    
          // If this is a brand new connection, we can skip the extensive health checks.
          synchronized (connectionPool) {
            if (candidate.successCount == 0) {
              return candidate;
            }
          }
    
          // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
          // isn't, take it out of the pool and start again.
          if (!candidate.isHealthy(doExtensiveHealthChecks)) {
            noNewStreams();
            continue;
          }
    
          return candidate;
        }
      }
    
    
    
    private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
          boolean connectionRetryEnabled) throws IOException {
    
    尝试复用连接
     if (this.connection != null) {
            // We had an already-allocated connection and it's good.
            result = this.connection;
            releasedConnection = null;
          }
    开始连接
        // Do TCP + TLS handshakes. This is a blocking operation.
        result.connect(
            connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled, call, eventListener);
        routeDatabase().connected(result.route());
    
    放入连接池中
          // Pool the connection.
          Internal.instance.put(connectionPool, result);
    }
    

    小结:
    1、创建一个RealConnection连接对象
    2、选择不同的链接方式,Socket或者隧道连接
    3、开始CallServerInterceptor拦截

    OkHttp连接池 ConnectionPool

    /**
     * Manages reuse of HTTP and HTTP/2 connections for reduced network latency. HTTP requests that
     * share the same {@link Address} may share a {@link Connection}. This class implements the policy
     * of which connections to keep open for future use.
     */
    用于管理RealConnection,在时间范围内实现连接复用。
    

    get() put()操作

      /**
       * Returns a recycled connection to {@code address}, or null if no such connection exists. The
       * route is null if the address has not yet been routed.
       */
      @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
        assert (Thread.holdsLock(this));
        for (RealConnection connection : connections) {
        // 判断连接是否可用
          if (connection.isEligible(address, route)) {
            streamAllocation.acquire(connection, true);
            return connection;
          }
        }
        return null;
      }
    
      public void acquire(RealConnection connection, boolean reportedAcquired) {
        assert (Thread.holdsLock(connectionPool));
        if (this.connection != null) throw new IllegalStateException();
    
        this.connection = connection;
        this.reportedAcquired = reportedAcquired;
       // 放入到一个List集合
        connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
      }
    
    
    put 方法
      void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        if (!cleanupRunning) {
          cleanupRunning = true;
          // 异步的执行清理工作
          executor.execute(cleanupRunnable);
        }
     ////添加到RrealConnection队列当中
        connections.add(connection);
      }
    
    

    线程池来回收

       // 创建一个无核心的线程池
      private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
          Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
    
    // 具体的回收工作
    private final Runnable cleanupRunnable = new Runnable() {
        @Override public void run() {
          // 阻塞线程
          while (true) {
             // 下一次清理的时间
            long waitNanos = cleanup(System.nanoTime());
            if (waitNanos == -1) return;
            if (waitNanos > 0) {
              long waitMillis = waitNanos / 1000000L;
              waitNanos -= (waitMillis * 1000000L);
              synchronized (ConnectionPool.this) {
                try {
                  // wait 等待waitMillis
                  ConnectionPool.this.wait(waitMillis, (int) waitNanos);
                } catch (InterruptedException ignored) {
                }
              }
            }
          }
        }
      };
    
    在Cleanup中实现了对gc回收算法,标记清除算法
    long cleanup(long now) {
        int inUseConnectionCount = 0;
        int idleConnectionCount = 0;
        RealConnection longestIdleConnection = null;
        long longestIdleDurationNs = Long.MIN_VALUE;
    
        // Find either a connection to evict, or the time that the next eviction is due.
        synchronized (this) {
         // 标记泄露的链接
          for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            RealConnection connection = i.next();
    
            // If the connection is in use, keep searching.
           // 如何找到嘴不活跃的链接,就是看弱引用集合的值是不是为null来判断
            if (pruneAndGetAllocationCount(connection, now) > 0) {
              inUseConnectionCount++;
              continue;
            }
    
            idleConnectionCount++;
    
            // If the connection is ready to be evicted, we're done.
            long idleDurationNs = now - connection.idleAtNanos;
            if (idleDurationNs > longestIdleDurationNs) {
              longestIdleDurationNs = idleDurationNs;
              longestIdleConnection = connection;
            }
          }
    
          if (longestIdleDurationNs >= this.keepAliveDurationNs
              || idleConnectionCount > this.maxIdleConnections) {
            // We've found a connection to evict. Remove it from the list, then close it below (outside
            // of the synchronized block).
            connections.remove(longestIdleConnection);
          } else if (idleConnectionCount > 0) {
            // A connection will be ready to evict soon.
            return keepAliveDurationNs - longestIdleDurationNs;
          } else if (inUseConnectionCount > 0) {
            // All connections are in use. It'll be at least the keep alive duration 'til we run again.
            return keepAliveDurationNs;
          } else {
            // No connections, idle or in use.
            cleanupRunning = false;
            return -1;
          }
        }
    
        closeQuietly(longestIdleConnection.socket());
    
        // Cleanup again immediately.
        return 0;
      }
    

    五、CallServerInterceptor

    /** This is the last interceptor in the chain. It makes a network call to the server. */
    最后一个拦截器,他向服务器发起正在的网络请求,并接受服务器返回的响应
    
     @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        HttpCodec httpCodec = realChain.httpStream();
        StreamAllocation streamAllocation = realChain.streamAllocation();
        RealConnection connection = (RealConnection) realChain.connection();
        Request request = realChain.request();
    
        long sentRequestMillis = System.currentTimeMillis();
    
        realChain.eventListener().requestHeadersStart(realChain.call());
        // 向socket中写入请求头部信息
        httpCodec.writeRequestHeaders(request);
        realChain.eventListener().requestHeadersEnd(realChain.call(), request);
    
        Response.Builder responseBuilder = null;
        if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
          // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
          // Continue" response before transmitting the request body. If we don't get that, return
          // what we did get (such as a 4xx response) without ever transmitting the request body.
          if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            httpCodec.flushRequest();
            realChain.eventListener().responseHeadersStart(realChain.call());
            responseBuilder = httpCodec.readResponseHeaders(true);
          }
    
          if (responseBuilder == null) {
            // Write the request body if the "Expect: 100-continue" expectation was met.
            realChain.eventListener().requestBodyStart(realChain.call());
            long contentLength = request.body().contentLength();
            CountingSink requestBodyOut =
                new CountingSink(httpCodec.createRequestBody(request, contentLength));
            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
             /// 向socket中写入body信息
            request.body().writeTo(bufferedRequestBody);
            bufferedRequestBody.close();
            realChain.eventListener()
                .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
          } else if (!connection.isMultiplexed()) {
            // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
            // from being reused. Otherwise we're still obligated to transmit the request body to
            // leave the connection in a consistent state.
            streamAllocation.noNewStreams();
          }
        }
    
        httpCodec.finishRequest();
    
        if (responseBuilder == null) {
          realChain.eventListener().responseHeadersStart(realChain.call());
          /// 读取网络返回的头部信息
          responseBuilder = httpCodec.readResponseHeaders(false);
        }
       // 创建Response
        Response response = responseBuilder
            .request(request)
            .handshake(streamAllocation.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
    
        realChain.eventListener()
            .responseHeadersEnd(realChain.call(), response);
    
        int code = response.code();
        if (forWebSocket && code == 101) {
          // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
          response = response.newBuilder()
              .body(Util.EMPTY_RESPONSE)
              .build();
        } else {
          response = response.newBuilder()
              .body(httpCodec.openResponseBody(response))
              .build();
        }
    
        if ("close".equalsIgnoreCase(response.request().header("Connection"))
            || "close".equalsIgnoreCase(response.header("Connection"))) {
          streamAllocation.noNewStreams();
        }
    
        if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
          throw new ProtocolException(
              "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
        }
    
        return response;
      }
    

    相关文章

      网友评论

          本文标题:OkHttp源码解析 -- 拦截器链模式网络请求

          本文链接:https://www.haomeiwen.com/subject/gjnmcftx.html