美文网首页网络
Okhttp源码分析之责任链模式

Okhttp源码分析之责任链模式

作者: 小菜鸟程序媛 | 来源:发表于2019-07-15 19:47 被阅读9次

    创建Okhttp对象

    // 使用默认的设置创建OkHttpClient
    public final OkHttpClient client = new OkHttpClient();
    
    //自定义设置
    public final OkHttpClient client = new OkHttpClient.Builder()
            //添加日志拦截器
           .addInterceptor(new HttpLoggingInterceptor())
           .cache(new Cache(cacheDir, cacheSize))
           .build();
    

    接着看构造函数中是如何实现对象的创建的。

      public OkHttpClient() {
        this(new Builder());
      }
      
     OkHttpClient(Builder builder) {
        this.dispatcher = builder.dispatcher;  //异步请求分发器
        this.proxy = builder.proxy; //代理设置
        this.protocols = builder.protocols; //Okhttp实现的协议
        this.connectionSpecs = builder.connectionSpecs; //连接配置
        this.interceptors = Util.immutableList(builder.interceptors); // 拦截器
        this.networkInterceptors = Util.immutableList(builder.networkInterceptors); //网络拦截器
        this.eventListenerFactory = builder.eventListenerFactory; //事件监听器工厂类,用于创建事件监听器以监听Http请求的数量、大小和持续时间。
        this.proxySelector = builder.proxySelector; //代理选择器
        this.cookieJar = builder.cookieJar; //提供Cookie的策略和持久化
        this.cache = builder.cache; //缓存Http和Https响应到文件系统,以便重用他们
        this.internalCache = builder.internalCache; 
        this.socketFactory = builder.socketFactory; //创建Socket
    
        boolean isTLS = false;
        for (ConnectionSpec spec : connectionSpecs) {
          isTLS = isTLS || spec.isTls();
        }
    
        if (builder.sslSocketFactory != null || !isTLS) {
          this.sslSocketFactory = builder.sslSocketFactory;
          this.certificateChainCleaner = builder.certificateChainCleaner;
        } else {
          X509TrustManager trustManager = systemDefaultTrustManager();
          this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
          this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
        }
    
        this.hostnameVerifier = builder.hostnameVerifier;
        this.certificatePinner = builder.certificatePinner.withCertificateChainCleaner(
            certificateChainCleaner);
        this.proxyAuthenticator = builder.proxyAuthenticator;
        this.authenticator = builder.authenticator;
        this.connectionPool = builder.connectionPool; //连接池,用于连接的复用
        this.dns = builder.dns;
        this.followSslRedirects = builder.followSslRedirects;
        this.followRedirects = builder.followRedirects;
        this.retryOnConnectionFailure = builder.retryOnConnectionFailure;
        this.connectTimeout = builder.connectTimeout; //连接超时时间
        this.readTimeout = builder.readTimeout; //读取超时时间
        this.writeTimeout = builder.writeTimeout; // 写入超时时间
        this.pingInterval = builder.pingInterval;
    
        if (interceptors.contains(null)) {
          throw new IllegalStateException("Null interceptor: " + interceptors);
        }
        if (networkInterceptors.contains(null)) {
          throw new IllegalStateException("Null network interceptor: " + networkInterceptors);
        }
      }
    

    创建完OkhttpClient,接下来就开始通过client对象创建请求

     @Override public Call newCall(Request request) {
        return RealCall.newRealCall(this, request, false /* for web socket */);
     }
    

    实际上调用的是RealCall.newRealCall方法,接着看该方法做了什么

    static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        // Safely publish the Call instance to the EventListener.
        // 创建RealCall对象
        RealCall call = new RealCall(client, originalRequest, forWebSocket);
        call.eventListener = client.eventListenerFactory().create(call);
        return call;
    }
    

    创建完请求之后,就是执行请求,这里分为同步和异步两种方法,我们先来看同步

      @Override public Response execute() throws IOException {
        //使用同步代码块检测是否已经执行
        synchronized (this) {
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        captureCallStackTrace();
        //开始执行
        eventListener.callStart(this);
        try {
          //使用OkHttpClient的Dispatcher将该请求添加到runningSyncCalls队列中
          client.dispatcher().executed(this);
          //构建一个拦截器链
          Response result = getResponseWithInterceptorChain();
          if (result == null) throw new IOException("Canceled");
          return result;
        } catch (IOException e) {
          eventListener.callFailed(this, e);
          throw e;
        } finally {
          client.dispatcher().finished(this);
        }
      }
    

    接着重点看拦截器链这一部分

      Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List<Interceptor> interceptors = new ArrayList<>();
        interceptors.addAll(client.interceptors()); //此处是自定义的拦截器,例如日志拦截器
        interceptors.add(retryAndFollowUpInterceptor); //失败重试和重定向的拦截器
        interceptors.add(new BridgeInterceptor(client.cookieJar())); //用来构建网络请求
        interceptors.add(new CacheInterceptor(client.internalCache())); //缓存拦截器
        interceptors.add(new ConnectInterceptor(client)); //连接拦截器,打开与网络服务器的连接
        if (!forWebSocket) {
          interceptors.addAll(client.networkInterceptors()); //网络拦截器,用于观察单个网络请求和响应
        }
        interceptors.add(new CallServerInterceptor(forWebSocket)); //链中的最后一个拦截器,用于向服务器进行网络调用
    
        //创建一个拦截器链,然后按顺序的执行拦上面的所有拦截器
        //参数0 代表 当前执行的是拦截器的索引
        Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
            originalRequest, this, eventListener, client.connectTimeoutMillis(),
            client.readTimeoutMillis(), client.writeTimeoutMillis());
    
        return chain.proceed(originalRequest);
      }
    

    这里执行拦截器使用的是责任链模式,不熟悉的可以查找看一下。接着开始看proceed方法:

    public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
          RealConnection connection) throws IOException {
        //首先判断当前拦截器索引是否大于等于拦截器列表的长度
        if (index >= interceptors.size()) throw new AssertionError();
        
        calls++;
    
        //以下两个方法用来确定
        // If we already have a stream, confirm that the incoming request will use it.
        if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
          throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
              + " must retain the same host and port");
        }
    
        // If we already have a stream, confirm that this is the only call to chain.proceed().
        if (this.httpCodec != null && calls > 1) {
          throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
              + " must call proceed() exactly once");
        }
    
        // Call the next interceptor in the chain.
        RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
            connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
            writeTimeout);
        //此处开始指定索引为index的拦截器,index从0开始
        //同时会创建索引为index+1的新的拦截器链传递给将要执行的拦截器,方便在该拦截器执行的合适时候去执行下一个拦截器
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
    
        // Confirm that the next interceptor made its required call to chain.proceed().
        if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
          throw new IllegalStateException("network interceptor " + interceptor
              + " must call proceed() exactly once");
        }
    
        // Confirm that the intercepted response isn't null.
        if (response == null) {
          throw new NullPointerException("interceptor " + interceptor + " returned null");
        }
    
        if (response.body() == null) {
          throw new IllegalStateException(
              "interceptor " + interceptor + " returned a response with no body");
        }
    
        return response;
      }
    

    此处借用Okhttp源码解析的一幅画,来说明一下拦截器的具体执行顺序。

    image.png

    BridgeInterceptor

    现在我们先放弃我们的自定义拦截器,然后从BridgeInterceptor拦截器开始,先看一下它的Process方法。

    @Override public Response intercept(Chain chain) throws IOException {
        Request userRequest = chain.request();
        Request.Builder requestBuilder = userRequest.newBuilder();
    
        RequestBody body = userRequest.body();
        if (body != null) {
          MediaType contentType = body.contentType();
          if (contentType != null) {
            requestBuilder.header("Content-Type", contentType.toString());
          }
    
          long contentLength = body.contentLength();
          if (contentLength != -1) {
            //如果存在body,设置Content-Length header
            requestBuilder.header("Content-Length", Long.toString(contentLength));
            //移除传输编码
            requestBuilder.removeHeader("Transfer-Encoding");
          } else {
            //如果没有设置body,那么就添加传输编码
            requestBuilder.header("Transfer-Encoding", "chunked");
            //移除内容编码
            requestBuilder.removeHeader("Content-Length");
          }
        }
    
        //设置主机
        if (userRequest.header("Host") == null) {
          requestBuilder.header("Host", hostHeader(userRequest.url(), false));
        }
    
        //设置连接类型
        if (userRequest.header("Connection") == null) {
          requestBuilder.header("Connection", "Keep-Alive");
        }
    
        // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
        // the transfer stream.
        boolean transparentGzip = false;
        //设置接受的编码列表
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
          transparentGzip = true;
          requestBuilder.header("Accept-Encoding", "gzip");
        }
    
        // 如果存在Cookies,就设置Cookies到header中
        //但是在初始化OkhttpClient的时候,我们使用了NO_COOKIES,所以此处cookies是空的
        List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
        if (!cookies.isEmpty()) {
          requestBuilder.header("Cookie", cookieHeader(cookies));
        }
    
        //设置身份标识字符串
        if (userRequest.header("User-Agent") == null) {
          requestBuilder.header("User-Agent", Version.userAgent());
        }
    
        // 此处开始执行第二个拦截器,也就是CacheInterceptor
        Response networkResponse = chain.proceed(requestBuilder.build());
    
        
        HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    
        Response.Builder responseBuilder = networkResponse.newBuilder()
            .request(userRequest);
        //如果请求头中有gzip,那么就压缩数据,然后返回构建响应头,并返回响应信息
        if (transparentGzip
            && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
          GzipSource responseBody = new GzipSource(networkResponse.body().source());
          Headers strippedHeaders = networkResponse.headers().newBuilder()
              .removeAll("Content-Encoding")
              .removeAll("Content-Length")
              .build();
          responseBuilder.headers(strippedHeaders);
          String contentType = networkResponse.header("Content-Type");
          responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
        }
    
        return responseBuilder.build();
    }
    

    BridgeInterceptor拦截器相当于是给请求添加了一些header,然后执行下一个拦截器,最后等到从所有拦截器执行完再次回到这里的时候对响应数据进行处理。

    CacheInterceptor 缓存拦截器

    接着分析一下CacheInterceptor,直接从proceed方法开始。

      @Override public Response intercept(Chain chain) throws IOException {
        //判断是否设置了缓存, 如果设置就从缓存中通过request拿response信息
        Response cacheCandidate = cache != null
            ? cache.get(chain.request())
            : null;
    
        long now = System.currentTimeMillis();
        //根据请求和缓存数据创建缓存策略
        //如果缓存存在,给缓存更新一下响应头
        CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
        Request networkRequest = strategy.networkRequest;
        Response cacheResponse = strategy.cacheResponse;
        
        if (cache != null) {
          //此处根据缓存策略来更新:请求数、网络请求数、缓存命中数
          cache.trackResponse(strategy);
        }
    
        //缓存不适用的话,就关闭该缓存
        if (cacheCandidate != null && cacheResponse == null) {
          closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
        }
    
        // If we're forbidden from using the network and the cache is insufficient, fail.
        // 如果禁止使用网络并且缓存不足的时候,就直接返回失败的请求
        if (networkRequest == null && cacheResponse == null) {
          return new Response.Builder()
              .request(chain.request())
              .protocol(Protocol.HTTP_1_1)
              .code(504)
              .message("Unsatisfiable Request (only-if-cached)")
              .body(Util.EMPTY_RESPONSE)
              .sentRequestAtMillis(-1L)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();
        }
    
        // If we don't need the network, we're done.
        // 如果我们不需要网络的话,到这就结束了,直接返回缓存的响应信息
        if (networkRequest == null) {
          return cacheResponse.newBuilder()
              .cacheResponse(stripBody(cacheResponse))
              .build();
        }
    
        Response networkResponse = null;
        try {
          //执行下一个拦截器,传入网络请求,获取网络响应信息,下一个拦截器为ConnectInterceptor
          //此处的response为从下个拦截器中返回回来的响应信息
          networkResponse = chain.proceed(networkRequest);
        } finally {
          // If we're crashing on I/O or otherwise, don't leak the cache body.
          if (networkResponse == null && cacheCandidate != null) {
            //如果缓存存在,不要忘记关闭
            closeQuietly(cacheCandidate.body());
          }
        }
    
        // If we have a cache response too, then we're doing a conditional get.
        //如果我们既存在缓存,又获取到了网络响应信息
        if (cacheResponse != null) {
          if (networkResponse.code() == HTTP_NOT_MODIFIED) {
            Response response = cacheResponse.newBuilder()
                .headers(combine(cacheResponse.headers(), networkResponse.headers()))
                .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
                .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
                .cacheResponse(stripBody(cacheResponse))
                .networkResponse(stripBody(networkResponse))
                .build();
            networkResponse.body().close();
    
            // Update the cache after combining headers but before stripping the
            // Content-Encoding header (as performed by initContentStream()).
            cache.trackConditionalCacheHit();
            //更新缓存
            cache.update(cacheResponse, response);
            return response;
          } else {
            closeQuietly(cacheResponse.body());
          }
        }
    
        //如果没有网络信息,就构建网络响应信息
        Response response = networkResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
    
        if (cache != null) {
          if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
            // Offer this request to the cache.
            //添加响应到缓存
            CacheRequest cacheRequest = cache.put(response);
            return cacheWritingResponse(cacheRequest, response);
          }
    
          if (HttpMethod.invalidatesCache(networkRequest.method())) {
            try {
              cache.remove(networkRequest);
            } catch (IOException ignored) {
              // The cache cannot be written.
            }
          }
        }
    
        return response;
      }
    

    从文件系统中获取缓存的方法:

      @Nullable Response get(Request request) {
        //根据url生成一个缓存的key
        //ByteString.encodeUtf8(url.toString()).md5().hex();
        String key = key(request.url());
        DiskLruCache.Snapshot snapshot;
        Entry entry;
        try {
          //根据key返回该条目的快照,如果不存在返回null,如果存在,则返回,并且将该条目移到LRU队列的头部
          //下面的都是从快照拿到数据的一些方法,待会分析
          snapshot = cache.get(key);
          if (snapshot == null) {
            return null;
          }
        } catch (IOException e) {
          // Give up because the cache cannot be read.
          return null;
        }
    
        try {
          entry = new Entry(snapshot.getSource(ENTRY_METADATA));
        } catch (IOException e) {
          Util.closeQuietly(snapshot);
          return null;
        }
    
        Response response = entry.response(snapshot);
    
        if (!entry.matches(request, response)) {
          Util.closeQuietly(response.body());
          return null;
        }
    
        return response;
      }
    

    ConnectInterceptor 连接拦截器

    接着分析ConnectInterceptor

      @Override public Response intercept(Chain chain) throws IOException {
        //获取拦截器链
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        //从拦截器中获取请求信息
        Request request = realChain.request();
        //StreamAllocation是用来协调Connections、Streams、Calls三者之间的关系
        // Connections: 到远程服务器的物理连接,建立比较慢,需要可以取消
        // Streams: 定义了连接可以携带多少并发流,Http/1.x是一个,Http/2是2个
        // Calls: 流的逻辑序列
        StreamAllocation streamAllocation = realChain.streamAllocation();
    
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        //此处返回一个HttpCodec,他是用来对请求进行编码,并且对响应进行解码的一个类。
        HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
        RealConnection connection = streamAllocation.connection();
        
        return realChain.proceed(request, streamAllocation, httpCodec, connection);
      }
    

    CallServerInterceptor

    这是最后一个拦截器。

      @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        HttpCodec httpCodec = realChain.httpStream();
        StreamAllocation streamAllocation = realChain.streamAllocation();
        RealConnection connection = (RealConnection) realChain.connection();
        Request request = realChain.request();
    
        long sentRequestMillis = System.currentTimeMillis();
    
        realChain.eventListener().requestHeadersStart(realChain.call());
        //整理请求头并写入
        httpCodec.writeRequestHeaders(request);
        realChain.eventListener().requestHeadersEnd(realChain.call(), request);
    
        Response.Builder responseBuilder = null;
        if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
          // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
          // Continue" response before transmitting the request body. If we don't get that, return
          // what we did get (such as a 4xx response) without ever transmitting the request body.
          if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            httpCodec.flushRequest();
            realChain.eventListener().responseHeadersStart(realChain.call());
            responseBuilder = httpCodec.readResponseHeaders(true);
          }
    
          //写入请求体
          if (responseBuilder == null) {
            // Write the request body if the "Expect: 100-continue" expectation was met.
            realChain.eventListener().requestBodyStart(realChain.call());
            long contentLength = request.body().contentLength();
            CountingSink requestBodyOut =
                new CountingSink(httpCodec.createRequestBody(request, contentLength));
            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
    
            request.body().writeTo(bufferedRequestBody);
            bufferedRequestBody.close();
            realChain.eventListener()
                .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
          } else if (!connection.isMultiplexed()) {
            // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
            // from being reused. Otherwise we're still obligated to transmit the request body to
            // leave the connection in a consistent state.
            streamAllocation.noNewStreams();
          }
        }
    
        httpCodec.finishRequest();
        //得到响应头
        if (responseBuilder == null) {
          realChain.eventListener().responseHeadersStart(realChain.call());
          responseBuilder = httpCodec.readResponseHeaders(false);
        }
        //构造response
        Response response = responseBuilder
            .request(request)
            .handshake(streamAllocation.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
    
        int code = response.code();
        if (code == 100) {
          // server sent a 100-continue even though we did not request one.
          // try again to read the actual response
          //如果code为100,需要继续读取真正的响应
          responseBuilder = httpCodec.readResponseHeaders(false);
    
          response = responseBuilder
                  .request(request)
                  .handshake(streamAllocation.connection().handshake())
                  .sentRequestAtMillis(sentRequestMillis)
                  .receivedResponseAtMillis(System.currentTimeMillis())
                  .build();
    
          code = response.code();
        }
    
        realChain.eventListener()
                .responseHeadersEnd(realChain.call(), response);
    
        if (forWebSocket && code == 101) {
          // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
          response = response.newBuilder()
              .body(Util.EMPTY_RESPONSE)
              .build();
        } else {
        //读取body
          response = response.newBuilder()
              .body(httpCodec.openResponseBody(response))
              .build();
        }
    
        if ("close".equalsIgnoreCase(response.request().header("Connection"))
            || "close".equalsIgnoreCase(response.header("Connection"))) {
          streamAllocation.noNewStreams();
        }
    
        if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
          throw new ProtocolException(
              "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
        }
    
        return response;
      }
    

    异步请求

    同步请求使用的是execute方法,异步请求使用的是enqueue,接下来我们就看一下RealCall中的异步请求方法

      @Override public void enqueue(Callback responseCallback) {
        synchronized (this) {
          //这里也是用来判断是否执行过
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        captureCallStackTrace();
        eventListener.callStart(this);
        //此处将回调函数封装成了一个AsyncCall,并且将该请求添加到了一个runningAsyncCalls队列中
        //AsyncCall实际上就是一个Runnable
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
      }
    

    往runningAsyncCalls队列中添加的逻辑如下

      private int maxRequests = 64;
      private int maxRequestsPerHost = 5;
      //如果队列中的请求数小于最大请求数或者对于每个host的请求数小于5才会被添加进去
      //否则将被添加到准备队列中
      synchronized void enqueue(AsyncCall call) {
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);
          //此处使用了线程池来进行操作
          executorService().execute(call);
        } else {
          readyAsyncCalls.add(call);
        }
      }
    

    接着我们看一下executorService方法

      public synchronized ExecutorService executorService() {
        if (executorService == null) {
          //此处创建了一个线程池
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
    

    接着看一下AsyncCallexecute方法,会发现跟同步请求是一样的

        @Override protected void execute() {
          boolean signalledCallback = false;
          try {
           //此处就是我们在上面分析的责任链模式
            Response response = getResponseWithInterceptorChain();
            if (retryAndFollowUpInterceptor.isCanceled()) {
              signalledCallback = true;
              responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
              signalledCallback = true;
              responseCallback.onResponse(RealCall.this, response);
            }
          } catch (IOException e) {
            if (signalledCallback) {
              // Do not signal the callback twice!
              Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
            } else {
              eventListener.callFailed(RealCall.this, e);
              responseCallback.onFailure(RealCall.this, e);
            }
          } finally {
            client.dispatcher().finished(this);
          }
        }
    

    总结

    画了一个简易流程图,然后配合上面的文字说明看会更好理解一点,接下来也会分模块的进行分析。

    image.png

    相关文章

      网友评论

        本文标题:Okhttp源码分析之责任链模式

        本文链接:https://www.haomeiwen.com/subject/xfpokctx.html