美文网首页
OKHttp源码解析

OKHttp源码解析

作者: changchengfeng | 来源:发表于2017-08-20 15:50 被阅读35次

    源码解析之前先看之前请求过程

     private  OkHttpClient mClient=new OkHttpClient();
    
        public static final String BASE_URL="http://greatfeng.top/app";
    
        public void run() throws Exception {
            Request request = new Request.Builder()
                    .url(BASE_URL+"/weather?cityname="+"上海")
                    .build();
    
            mClient.newCall(request).enqueue(new Callback() {
                @Override public void onFailure(Call call, IOException e) {
                    e.printStackTrace();
                }
    
                @Override public void onResponse(Call call, Response response) throws IOException {
                    if (!response.isSuccessful()) throw new IOException("Unexpected code " + response);
    
                    Headers responseHeaders = response.headers();
                    for (int i = 0, size = responseHeaders.size(); i < size; i++) {
                        System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
                    }
    
                    System.out.println(response.body().string());
                }
            });
        }
    

    1.0 初始化一个OkHttpClient对象

    new OkHttpClient();
    

    1.1. 新建一个对象,会先加载这个类,会初始化这个类的变量和静态代码块

    // 默认支持的http协议版本 http 2.0,http 1.1
      static final List<Protocol> DEFAULT_PROTOCOLS = Util.immutableList(
          Protocol.HTTP_2, Protocol.HTTP_1_1);
      // 默认支持的 https,http
      static final List<ConnectionSpec> DEFAULT_CONNECTION_SPECS = Util.immutableList(
          ConnectionSpec.MODERN_TLS, ConnectionSpec.CLEARTEXT);
    // //新建一个内部使用的辅助类,基本上就是调用第一个参数的,同名方法,后面的参数作为方法的参数
    static {
        Internal.instance = new Internal() {
          @Override public void addLenient(Headers.Builder builder, String line) {
            builder.addLenient(line);
          }
    ...
    ...
          @Override public Call newWebSocketCall(OkHttpClient client, Request originalRequest) {
            return new RealCall(client, originalRequest, true);
          }
        };
      }
    

    1.2 执行OkHttpClient构造函数

    public OkHttpClient() {
        this(new Builder());
      }
      
    

    1.3 初始化builder的成员变量,执行builder的构造函数

    /* 这两个拦截器主要是用来我们对请求过程进行拦截处理,添加一些我们直接的处理,
        如打印请求日志,为每个请求添加相同的请求头字段,如 apikey 等
        两个区别在于interceptors是在和服务器未连接的时候拦截处理,
        networkInterceptors是和服务器建立连接后添加的拦截处理
    */
    
        final List<Interceptor> interceptors = new ArrayList<>(); // 拦截器
        final List<Interceptor> networkInterceptors = new ArrayList<>(); // 网络拦截器
        @Nullable Cache cache; // 配置缓存的
         @Nullable Proxy proxy; //配置代理服务器
         SocketFactory socketFactory; // http 用于创建 Socket 对象 
         @Nullable SSLSocketFactory sslSocketFactory; // https 用于配置创建 SSLSocket 对象 
     public Builder() {
        // 新建一个 Dispatcher 对象,主要是用来构建线程池分发请求的
          dispatcher = new Dispatcher(); 
          protocols = DEFAULT_PROTOCOLS;  // 协议版本 http 2.0, http 1.1
          connectionSpecs = DEFAULT_CONNECTION_SPECS; // 协议类型 https,http
          /* 
          事件监听器 默认为NONE,如果要对网络请求过程(请求开始,连接 请求结束,
          发送请求头,发送请求体,dns解析)进行监听,可以进行自定义 EventListener
          */
          eventListenerFactory = EventListener.factory(EventListener.NONE); 
          // 设置默认代理选择器,是 sun.net.spi.DefaultProxySelector 
          proxySelector = ProxySelector.getDefault();
          // 处理 cookie 
          cookieJar = CookieJar.NO_COOKIES;
          socketFactory = SocketFactory.getDefault(); // 设置默认的 SocketFactory
          hostnameVerifier = OkHostnameVerifier.INSTANCE; // 配置 https 检测服务器返回的证书信息与域名匹配认证
          certificatePinner = CertificatePinner.DEFAULT; // 配置 https证书校验信息sha256
          // 当有设置代理服务器访问时,代理服务器需要账号密码认证,默认不启用代理
          proxyAuthenticator = Authenticator.NONE;
          // http 认证,有些服务器进行访问时需要账号密码的,如 basic认证 默认为空
          authenticator = Authenticator.NONE;
          // 缓存连接池
          connectionPool = new ConnectionPool();
          // DNS解析器,默认使用系统的,可以自己自定义的,也可以使用阿里的HttpDns
          dns = Dns.SYSTEM;
          // https连接是否重定向
          followSslRedirects = true;
          // http 连接是否重定向
          followRedirects = true;
          // 连接失败是否重试
          retryOnConnectionFailure = true;
          // 连接超时时间
          connectTimeout = 10_000; 
          // 读取服务器数据超时时间
          readTimeout = 10_000;
          // 发送给服务器数据超时时间
          writeTimeout = 10_000;
          // http 2.0 版本新增的功能(用于计算往返时间,执行“ 活性” 检活)。设置间隔多少毫秒执行一次,为 0 默认不启用
          pingInterval = 0; 
        }
    

    1.4. Dispatcher对象的初始化,里面有个线程池,使用懒加载的形式,只有发生第一次请求的时候才会初始化线程池,使用线程池进行管理发送的网络请求的Call

      // 线程池中最大同时请求个数,超过个数得进行排队 。可进行设置
      private int maxRequests = 64;
      // 为避免线程池中请求中被同一个host的请求占满了,同时也是减轻服务器压力,可设置每个host的最大请求数 ,若超过就会进入排队序列
      private int maxRequestsPerHost = 5;
      
      private @Nullable Runnable idleCallback;
    
      private @Nullable ExecutorService executorService;
    
     /* 准备执行的异常请求集合,由于设置了线程池中最大同时请求个数,每个host的最大请求数,
        如果超过就会存起来,当正在请求的 Call请求完成了,就会调度该集合里面的 Call 去请求
     */
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
    
    // 正在执行的异步请求 Call集合
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
    
     // 正在执行的同步请求 Call 集合,只做统计
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
      
      // 单例模式,懒加载,线程池
       public synchronized ExecutorService executorService() {
        if (executorService == null) {
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
      
    

    1.5. ConnectionPool对象的初始化

    
      private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
          Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
    
      /** The maximum number of idle connections for each address. */
      private final int maxIdleConnections;
      private final long keepAliveDurationNs;
      private final Runnable cleanupRunnable = new Runnable() {
        @Override public void run() {
          while (true) {
            long waitNanos = cleanup(System.nanoTime());
            if (waitNanos == -1) return;
            if (waitNanos > 0) {
              long waitMillis = waitNanos / 1000000L;
              waitNanos -= (waitMillis * 1000000L);
              synchronized (ConnectionPool.this) {
                try {
                  ConnectionPool.this.wait(waitMillis, (int) waitNanos);
                } catch (InterruptedException ignored) {
                }
              }
            }
          }
        }
      };
    
      private final Deque<RealConnection> connections = new ArrayDeque<>();
      final RouteDatabase routeDatabase = new RouteDatabase();
    
    
    public ConnectionPool() {
        this(5, 5, TimeUnit.MINUTES);
      }
    
      public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
        this.maxIdleConnections = maxIdleConnections;
        this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
    
        // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
        if (keepAliveDuration <= 0) {
          throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
        }
      }
    

    1.6. 对应1.2中OKHttpClient构造函数调用,OkHttpClient对象已生成

    OkHttpClient(Builder builder) {
        this.dispatcher = builder.dispatcher;
        this.proxy = builder.proxy;
       .......
        this.pingInterval = builder.pingInterval;
      }
    

    2.0 创建Request对象,包含一个网络请求的信息,有请求头,请求体,请求网址等

            Request request = new Request.Builder()
                    .url(BASE_URL+"/weather?cityname="+"上海")
                    .build();
    

    3.0 进行网络请求

     mClient.newCall(request).enqueue(new Callback() {})
    

    3.1 创建RealCall对象

      @Override public Call newCall(Request request) {
        return new RealCall(this, request, false /* for web socket */);
      }
      
      
      RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        final EventListener.Factory eventListenerFactory = client.eventListenerFactory();
    
        this.client = client;
        this.originalRequest = originalRequest;
        this.forWebSocket = forWebSocket;
        this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
    
        // TODO(jwilson): this is unsafe publication and not threadsafe.
        this.eventListener = eventListenerFactory.create(this);
      }
      
      
    

    3.2 把请求发送到Dispatcher的线程池队列

      @Override public void enqueue(Callback responseCallback) {
        synchronized (this) {
        //新建的RealCall的executed 变量为false,执行完为ture,这就是RealCall对象不能执行两次的原因
          if (executed) throw new IllegalStateException("Already Executed");
          executed = true;
        }
        captureCallStackTrace();
        // 创建AsyncCall对象加入到线程池
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
      }
    

    3.3 AsyncCall继承NamedRunnable 实现了Runnable接口

      final class AsyncCall extends NamedRunnable {
       
        }
        
        public abstract class NamedRunnable implements Runnable {
     
      @Override public final void run() {
          execute();
      }
    
      protected abstract void execute();
    }
    

    3.4 加入线程池后就会执行在子线程中执行AsyncCall 的execute的方法

      synchronized void enqueue(AsyncCall call) {
        // 1.4已说明
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);
          executorService().execute(call);
        } else {
          readyAsyncCalls.add(call);
        }
      }
    

    3.5 execute 调用getResponseWithInterceptorChain方法获取到Response对象,即服务器返回数据

     @Override protected void execute() {
            Response response = getResponseWithInterceptorChain();
            if (retryAndFollowUpInterceptor.isCanceled()) {
              signalledCallback = true;
              // 若请求失败会回调 3.0 中设置Callback onFailure方法
              responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
            } else {
              signalledCallback = true;
              // 若请求成功会回调 3.0 中设置 Callback onResponse方法
              responseCallback.onResponse(RealCall.this, response);
            }
    finally {
            client.dispatcher().finished(this);
          }
        
      }
      
    

    3.6 getResponseWithInterceptorChain

    // 把之前设置的自定义interceptors,RetryAndFollowUpInterceptor,BridgeInterceptor
    //,CacheInterceptor,ConnectInterceptor,自定义的networkInterceptors,CallServerInterceptor对象
    // 保存在interceptors里面作为参数进行构造RealInterceptorChain
      Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List<Interceptor> interceptors = new ArrayList<>();
        // addInterceptor
        interceptors.addAll(client.interceptors());
        interceptors.add(retryAndFollowUpInterceptor);
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        interceptors.add(new CacheInterceptor(client.internalCache()));
        interceptors.add(new ConnectInterceptor(client));
        // addNetworkInterceptor
        if (!forWebSocket) {
          interceptors.addAll(client.networkInterceptors());
        }
        interceptors.add(new CallServerInterceptor(forWebSocket));
       
        Interceptor.Chain chain = new RealInterceptorChain(
            interceptors, null, null, null, 0, originalRequest);
        return chain.proceed(originalRequest);
      }
      
    
    

    3.7. interceptors 保存很多拦截器

    OkHttpClient Builder有两个添加Interceptor 两者处于不通的调用阶段,我们可以在网络请求的这两个阶段自定义拦截器,做不同的处理如上篇博文提到的HttpLoggingInterceptor HttpCacheInterceptor等

        
        public Builder addInterceptor(Interceptor interceptor) {
          interceptors.add(interceptor);
          return this;
        }
        
     public Builder addNetworkInterceptor(Interceptor interceptor) {
          networkInterceptors.add(interceptor);
          return this;
        }
    
    

    Interceptor 和 Chain 接口都比较简单

      public interface Interceptor {
      Response intercept(Chain chain) throws IOException;
    
      interface Chain {
        Request request();
    
        Response proceed(Request request) throws IOException;
    
        /**
         * Returns the connection the request will be executed on. This is only available in the chains
         * of network interceptors; for application interceptors this is always null.
         */
        @Nullable Connection connection();
      }
    }
    

    3.8 创建RealInterceptorChain

      public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
          HttpCodec httpCodec, RealConnection connection, int index, Request request) {
        this.interceptors = interceptors;
        this.connection = connection;
        this.streamAllocation = streamAllocation;
        this.httpCodec = httpCodec;
        this.index = index;
        this.request = request;
      }
    

    3.9 调用proceed方法

    • proceed方法先进行了数据的校验,数据如果不正确,抛出异常
    • proceed每次都会用之前构造的这个RealInterceptorChain 的interceptors, streamAllocation, httpCodec, connection, index + 1, request 或者传入单个或者多个新的值去替换部分原来的值 从而去构建新的RealInterceptorChain对象,然后获取到第index个Interceptor,调用Interceptor的intercept方法,并把新建的RealInterceptorChain对象作为参数传入
    • 然后Interceptor的intercept方法又会调用上部分传入的RealInterceptorChain对象的proceed方法 这就成了一个循环直到最后一个CallServerInterceptor,没有调用 RealInterceptorChain的proceed方法跳出循环返回服务器返回的Response,请求完成

    这个就使得,interceptors中的上一个Interceptor的intercept方法依赖下一个Interceptor的intercept的返回结果,依次调用完interceptors中的所有Interceptor的intercept方法。

    @Override public Response proceed(Request request) throws IOException {
        return proceed(request, streamAllocation, httpCodec, connection);
      }
    
      public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
          RealConnection connection) throws IOException {
    
        // Call the next interceptor in the chain.
        RealInterceptorChain next = new RealInterceptorChain(
            interceptors, streamAllocation, httpCodec, connection, index + 1, request);
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
    
        return response;
      }
    

    所以一个正常的请求会依次调用下列Interceptor的intercept方法得到返回Response

    4.0 RetryAndFollowUpInterceptor

    4.1 在RealCall构造函数中新建RetryAndFollowUpInterceptor 对象

     
    // 见 3.1
    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
    // 见 3.6
    interceptors.add(retryAndFollowUpInterceptor);
    

    4.2 若未添加自定义Intercept,这是第一个调用的Intercept的intercept

    ==followUpRequest方法和HTTP协议相关是对HTTP协议的实现==

     @Override public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();
    // 新建 StreamAllocation对象 
        streamAllocation = new StreamAllocation(
            client.connectionPool(), createAddress(request.url()), callStackTrace);
    
        int followUpCount = 0;
        Response priorResponse = null;
        // 无限循环,可以多次重试请求,followUpCount请求次数大于MAX_FOLLOW_UPS(20)会抛出ProtocolException,跳出循环
        while (true) {
        //进行取消请求,跳出循环就是由这个标志位控制的
          if (canceled) {
            streamAllocation.release();
            throw new IOException("Canceled");
          }
    
          Response response = null;
          boolean releaseConnection = true;
        
          // 把streamAllocation对象传入下一个 BridgeInterceptor
          
            response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
            releaseConnection = false;
    ...
          // Attach the prior response if it exists. Such responses never have a body.
          // 有些请求可能要多次和服务器进行通信才能完成,如代理,认证等,若之前有先前有返回响应体,存储到Response的priorResponse中
          if (priorResponse != null) {
            response = response.newBuilder()
                .priorResponse(priorResponse.newBuilder()
                        .body(null)
                        .build())
                .build();
          }
    
    // 根据服务器返回代码和HTTP协议格式,判断是否需要多次和服务器进行通信
          Request followUp = followUpRequest(response);
    // 若不需要,直接返回请求结果,循环结束
          if (followUp == null) {
            if (!forWebSocket) {
              streamAllocation.release();
            }
            return response;
          }
    // 关闭这个返回体的流,进行判断重试次数,及异常检查 继续进行下一次循环
    
    ...
          closeQuietly(response.body());
          
          if (++followUpCount > MAX_FOLLOW_UPS) {
            streamAllocation.release();
            throw new ProtocolException("Too many follow-up requests: " + followUpCount);
          }
    
          request = followUp;
          priorResponse = response;
        }
      }
    

    4.3 新建Address对象

      private Address createAddress(HttpUrl url) {
        SSLSocketFactory sslSocketFactory = null;
        HostnameVerifier hostnameVerifier = null;
        CertificatePinner certificatePinner = null;
        if (url.isHttps()) {
          sslSocketFactory = client.sslSocketFactory();
          hostnameVerifier = client.hostnameVerifier();
          certificatePinner = client.certificatePinner();
        }
    
        return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
            sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
            client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
      }
    

    4.4 新建StreamAllocation对象

      public StreamAllocation(ConnectionPool connectionPool, Address address, Object callStackTrace) {
        this.connectionPool = connectionPool;
        this.address = address;
        this.routeSelector = new RouteSelector(address, routeDatabase());
        this.callStackTrace = callStackTrace;
      }
      
    
    

    4.5 构建RouteSelector

      
        public RouteSelector(Address address, RouteDatabase routeDatabase) {
        this.address = address;
        this.routeDatabase = routeDatabase;
    
        resetNextProxy(address.url(), address.proxy());
      }
      
      
        /** Prepares the proxy servers to try. */
      private void resetNextProxy(HttpUrl url, Proxy proxy) {
        if (proxy != null) {
          // If the user specifies a proxy, try that and only that.
          proxies = Collections.singletonList(proxy);
        } else {
          // Try each of the ProxySelector choices until one connection succeeds.
          List<Proxy> proxiesOrNull = address.proxySelector().select(url.uri());
          proxies = proxiesOrNull != null && !proxiesOrNull.isEmpty()
              ? Util.immutableList(proxiesOrNull)
              : Util.immutableList(Proxy.NO_PROXY);
        }
        nextProxyIndex = 0;
      }
    

    5.0 BridgeInterceptor

    5.1

         //  见3.6
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    
      public BridgeInterceptor(CookieJar cookieJar) {
        this.cookieJar = cookieJar;
      }
    

    5.2 构建完整的请求,把一些默认的请求头添加上去,默认 Keep-Alive ,对请求进行gzip压缩传输,用okio库中GzipSource进行解压

     @Override public Response intercept(Chain chain) throws IOException {
        Request userRequest = chain.request();
        Request.Builder requestBuilder = userRequest.newBuilder();
    
        RequestBody body = userRequest.body();
        if (body != null) {
          MediaType contentType = body.contentType();
         ...
    
        if (userRequest.header("Connection") == null) {
          requestBuilder.header("Connection", "Keep-Alive");
        }
    
        // If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
        // the transfer stream.
        boolean transparentGzip = false;
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
          transparentGzip = true;
          requestBuilder.header("Accept-Encoding", "gzip");
        }
    
        List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
        if (!cookies.isEmpty()) {
          requestBuilder.header("Cookie", cookieHeader(cookies));
        }
    
        if (userRequest.header("User-Agent") == null) {
          requestBuilder.header("User-Agent", Version.userAgent());
        }
    
    
    // 把构建好的新的请求传入CacheInterceptor替换原来的Request对象
        Response networkResponse = chain.proceed(requestBuilder.build());
    
        HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    
        if (transparentGzip
            && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
          GzipSource responseBody = new GzipSource(networkResponse.body().source());
          Headers strippedHeaders = networkResponse.headers().newBuilder()
              .removeAll("Content-Encoding")
              .removeAll("Content-Length")
              .build();
          responseBuilder.headers(strippedHeaders);
          responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
        }
    
        return responseBuilder.build();
      }
    
    

    6.0 CacheInterceptor

    6.1 Cache的使用见 OkHttp使用介绍
    中Cache-Control 字段

    // 见3.6 
    interceptors.add(new CacheInterceptor(client.internalCache()));
    // 默认情况下 Okhttp是没有设置cache和internalCache的
      InternalCache internalCache() {
        return cache != null ? cache.internalCache : internalCache;
      }
      
      public CacheInterceptor(InternalCache cache) {
        this.cache = cache;
      }
    

    6.2 我们考虑设置了Cache的情况

    ==Cache的情况也是HTTP协议相关是对HTTP协议的实现==

    使用的是DiskLruCache作为缓存,磁盘缓存库

      @Override public Response intercept(Chain chain) throws IOException {
        Response cacheCandidate = cache != null
        // 6.3
            ? cache.get(chain.request())
            : null;
    
        long now = System.currentTimeMillis();
    // 6.4
        CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
        Request networkRequest = strategy.networkRequest;
        Response cacheResponse = strategy.cacheResponse;
        
        // If we don't need the network, we're done.
        if (networkRequest == null) {
          return cacheResponse.newBuilder()
              .cacheResponse(stripBody(cacheResponse))
              .build();
        }
    
    // ...
    
        Response networkResponse = null;
        try {
        
        
         // 若没有缓存,进入ConnectInterceptor的intercept方法
          networkResponse = chain.proceed(networkRequest);
          
        } finally {
          // If we're crashing on I/O or otherwise, don't leak the cache body.
          if (networkResponse == null && cacheCandidate != null) {
            closeQuietly(cacheCandidate.body());
          }
        }
    
      // ...
    
        return response;
      }
    

    6.3.1

        @Override public Response get(Request request) throws IOException {
          return Cache.this.get(request);
        }
    

    6.3.2

        @Nullable Response get(Request request) {
        String key = key(request.url());
        DiskLruCache.Snapshot snapshot;
        Entry entry;
        try {
          snapshot = cache.get(key);
          if (snapshot == null) {
            return null;
          }
        } catch (IOException e) {
          // Give up because the cache cannot be read.
          return null;
        }
    
        try {
          entry = new Entry(snapshot.getSource(ENTRY_METADATA));
        } catch (IOException e) {
          Util.closeQuietly(snapshot);
          return null;
        }
    
        Response response = entry.response(snapshot);
    
        if (!entry.matches(request, response)) {
          Util.closeQuietly(response.body());
          return null;
        }
    
        return response;
      }
    
    

    6.3.3

    public Response response(DiskLruCache.Snapshot snapshot) {
          String contentType = responseHeaders.get("Content-Type");
          String contentLength = responseHeaders.get("Content-Length");
          Request cacheRequest = new Request.Builder()
              .url(url)
              .method(requestMethod, null)
              .headers(varyHeaders)
              .build();
          return new Response.Builder()
              .request(cacheRequest)
              .protocol(protocol)
              .code(code)
              .message(message)
              .headers(responseHeaders)
              .body(new CacheResponseBody(snapshot, contentType, contentLength))
              .handshake(handshake)
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(receivedResponseMillis)
              .build();
        }
      }
    

    6.4.1

    public Factory(long nowMillis, Request request, Response cacheResponse) {
          this.nowMillis = nowMillis;
          this.request = request;
          this.cacheResponse = cacheResponse;
    
          if (cacheResponse != null) {
            this.sentRequestMillis = cacheResponse.sentRequestAtMillis();
            this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis();
            Headers headers = cacheResponse.headers();
            for (int i = 0, size = headers.size(); i < size; i++) {
              String fieldName = headers.name(i);
              String value = headers.value(i);
              if ("Date".equalsIgnoreCase(fieldName)) {
                servedDate = HttpDate.parse(value);
                servedDateString = value;
              } else if ("Expires".equalsIgnoreCase(fieldName)) {
                expires = HttpDate.parse(value);
              } else if ("Last-Modified".equalsIgnoreCase(fieldName)) {
                lastModified = HttpDate.parse(value);
                lastModifiedString = value;
              } else if ("ETag".equalsIgnoreCase(fieldName)) {
                etag = value;
              } else if ("Age".equalsIgnoreCase(fieldName)) {
                ageSeconds = HttpHeaders.parseSeconds(value, -1);
              }
            }
          }
        }
        
    
    

    6.4.2

        public CacheStrategy get() {
          CacheStrategy candidate = getCandidate();
    
          if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
            // We're forbidden from using the network and the cache is insufficient.
            return new CacheStrategy(null, null);
          }
    
          return candidate;
        }
    
      private CacheStrategy getCandidate() {
          // No cached response.
          if (cacheResponse == null) {
            return new CacheStrategy(request, null);
          }
    
          // Drop the cached response if it's missing a required handshake.
          if (request.isHttps() && cacheResponse.handshake() == null) {
            return new CacheStrategy(request, null);
          }
    
          // If this response shouldn't have been stored, it should never be used
          // as a response source. This check should be redundant as long as the
          // persistence store is well-behaved and the rules are constant.
          if (!isCacheable(cacheResponse, request)) {
            return new CacheStrategy(request, null);
          }
    
          CacheControl requestCaching = request.cacheControl();
          if (requestCaching.noCache() || hasConditions(request)) {
            return new CacheStrategy(request, null);
          }
    
          long ageMillis = cacheResponseAge();
          long freshMillis = computeFreshnessLifetime();
    
          if (requestCaching.maxAgeSeconds() != -1) {
            freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds()));
          }
    
          long minFreshMillis = 0;
          if (requestCaching.minFreshSeconds() != -1) {
            minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds());
          }
    
          long maxStaleMillis = 0;
          CacheControl responseCaching = cacheResponse.cacheControl();
          if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) {
            maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds());
          }
    
          if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
            Response.Builder builder = cacheResponse.newBuilder();
            if (ageMillis + minFreshMillis >= freshMillis) {
              builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"");
            }
            long oneDayMillis = 24 * 60 * 60 * 1000L;
            if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
              builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"");
            }
            return new CacheStrategy(null, builder.build());
          }
    
          // Find a condition to add to the request. If the condition is satisfied, the response body
          // will not be transmitted.
          String conditionName;
          String conditionValue;
          if (etag != null) {
            conditionName = "If-None-Match";
            conditionValue = etag;
          } else if (lastModified != null) {
            conditionName = "If-Modified-Since";
            conditionValue = lastModifiedString;
          } else if (servedDate != null) {
            conditionName = "If-Modified-Since";
            conditionValue = servedDateString;
          } else {
            return new CacheStrategy(request, null); // No condition! Make a regular request.
          }
    
          Headers.Builder conditionalRequestHeaders = request.headers().newBuilder();
          Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue);
    
          Request conditionalRequest = request.newBuilder()
              .headers(conditionalRequestHeaders.build())
              .build();
          return new CacheStrategy(conditionalRequest, cacheResponse);
        }
    

    7.0 ConnectInterceptor

      @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        // 见4.2 获取新建的StreamAllocation对象
        StreamAllocation streamAllocation = realChain.streamAllocation();
    
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        
      
        HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
        
        RealConnection connection = streamAllocation.connection();
    
        // 把request, streamAllocation, httpCodec, connection传入
        
        //进入CallServerInterceptor的intercept
        return realChain.proceed(request, streamAllocation, httpCodec, connection);
      }
    
    public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
    // ...
    
    // 7.1.0
          RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
              writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
              
    // 7.2.0        
          HttpCodec resultCodec = resultConnection.newCodec(client, this);
    
          synchronized (connectionPool) {
            codec = resultCodec;
            return resultCodec;
          }
    
    

    7.1.1

      private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
          int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
          throws IOException {
        while (true) {
          RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
              connectionRetryEnabled);
    
         .....
    
          return candidate;
        }
      }
    
    

    7.1.2

    private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
          boolean connectionRetryEnabled) throws IOException {
        Route selectedRoute;
        synchronized (connectionPool) {
      ...
    
          // Attempt to use an already-allocated connection.
          // 尝试使用自身存储的连接
          RealConnection allocatedConnection = this.connection;
          if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
            return allocatedConnection;
          }
    
          // Attempt to get a connection from the pool.
          
          //尝试从连接池中获取
          Internal.instance.get(connectionPool, address, this, null);
          if (connection != null) {
            return connection;
          }
    
          selectedRoute = route;
        }
    
        // If we need a route, make one. This is a blocking operation.
        if (selectedRoute == null) {
          selectedRoute = routeSelector.next();
        }
    
        RealConnection result;
        synchronized (connectionPool) {
          if (canceled) throw new IOException("Canceled");
    
          // Now that we have an IP address, make another attempt at getting a connection from the pool.
          // This could match due to connection coalescing.
          
          //有了selectedRoute后再次从连接池中获取连接
          Internal.instance.get(connectionPool, address, this, selectedRoute);
          if (connection != null) {
            route = selectedRoute;
            return connection;
          }
    
          // Create a connection and assign it to this allocation immediately. This makes it possible
          // for an asynchronous cancel() to interrupt the handshake we're about to do.
          route = selectedRoute;
          refusedStreamCount = 0;
          
        // 新建连接,并把这个RealConnection对象和StreamAllocation对象绑定
          
          result = new RealConnection(connectionPool, selectedRoute);
          acquire(result);
        }
    
    
        // Do TCP + TLS handshakes. This is a blocking operation.
        result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
        
        Socket socket = null;
        synchronized (connectionPool) {
          // Pool the connection.
          // 把新建的RealConnection对象,放入到连接池
          Internal.instance.put(connectionPool, result);
    
          // If another multiplexed connection to the same address was created concurrently, then
          // release this connection and acquire that one.
          if (result.isMultiplexed()) {
            socket = Internal.instance.deduplicate(connectionPool, address, this);
            result = connection;
          }
        }
        closeQuietly(socket);
    
        return result;
      }
    
    

    7.1.3

     public void connect(
          int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
       // ....
    
        while (true) {
          try {
          // 若为https
            if (route.requiresTunnel()) {
              connectTunnel(connectTimeout, readTimeout, writeTimeout);
            } else {
          //如果为http
          
          // 7.1.3.1.0
              connectSocket(connectTimeout, readTimeout);
            }
        // 7.1.3.2.0
            establishProtocol(connectionSpecSelector);
            break;
          }
    
     
    

    7.1.3.1.1

    ==和服务建立socket连接,source,sink是Okio库的IO读写类==,它补充了java.io和java.nio的不足,以便能够更加方便,快速的访问、存储和处理你的数据.

    private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
        Proxy proxy = route.proxy();
        Address address = route.address();
    
        rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
            ? address.socketFactory().createSocket() //创建 Socket
            : new Socket(proxy);
    
        rawSocket.setSoTimeout(readTimeout);
        try {
          Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout); //和服务器建立连接 
        } catch (ConnectException e) {
     //   ...
        }
    // ...
          source = Okio.buffer(Okio.source(rawSocket));
          sink = Okio.buffer(Okio.sink(rawSocket));
     // ...  
      }
    

    7.1.3.1.2

    public Socket createSocket() {
            return new Socket();
        }
    
     public void connectSocket(Socket socket, InetSocketAddress address,
          int connectTimeout) throws IOException {
        socket.connect(address, connectTimeout);
      }
    

    7.1.3.2.2

     private void establishProtocol(ConnectionSpecSelector connectionSpecSelector) throws IOException {
        if (route.address().sslSocketFactory() == null) {
          protocol = Protocol.HTTP_1_1;
    // 普通http,socket连接
          socket = rawSocket;
          return;
        }
    
    // 构建https连接,暂不分析
        connectTls(connectionSpecSelector);
    
        if (protocol == Protocol.HTTP_2) {
          socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
          http2Connection = new Http2Connection.Builder(true)
              .socket(socket, route.address().url().host(), source, sink)
              .listener(this)
              .build();
          http2Connection.start();
        }
      }
    

    7.2 构建Http1Codec对象

      public HttpCodec newCodec(
          OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
     // 若为http,http2Connection为null
        if (http2Connection != null) {
          return new Http2Codec(client, streamAllocation, http2Connection);
        } else {
          socket.setSoTimeout(client.readTimeoutMillis());
          source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
          sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
          
          // 把source, sink 负责和服务器IO读写流传入
          return new Http1Codec(client, streamAllocation, source, sink);
        }
      }
    

    8.0 CallServerInterceptor

    @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
    // ...
    
    // 8.1
        httpCodec.writeRequestHeaders(request);
    
        Response.Builder responseBuilder = null;
    // 8.2  向服务器传输请求体
        if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
          // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
          // Continue" response before transmitting the request body. If we don't get that, return what
          // we did get (such as a 4xx response) without ever transmitting the request body.
          if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            httpCodec.flushRequest();
            responseBuilder = httpCodec.readResponseHeaders(true);
          }
    
          if (responseBuilder == null) {
            // Write the request body if the "Expect: 100-continue" expectation was met.
            Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
            request.body().writeTo(bufferedRequestBody);
            bufferedRequestBody.close();
          } else if (!connection.isMultiplexed()) {
            // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection from
            // being reused. Otherwise we're still obligated to transmit the request body to leave the
            // connection in a consistent state.
            streamAllocation.noNewStreams();
          }
        }
    // ...  flush()请求
        httpCodec.finishRequest();
    // 8.3
        if (responseBuilder == null) {
          responseBuilder = httpCodec.readResponseHeaders(false);
        }
    
    // ...
    
        int code = response.code();
        if (forWebSocket && code == 101) {
          // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
          response = response.newBuilder()
              .body(Util.EMPTY_RESPONSE)
              .build();
        } else {
    // 8.4
          response = response.newBuilder()
              .body(httpCodec.openResponseBody(response))
              .build();
        }
    
    //...
    
        return response;
      }
    

    ==HTTP请求格式==

    image

    8.1 向服务器传输请求头

      @Override public void writeRequestHeaders(Request request) throws IOException {
        String requestLine = RequestLine.get(
            request, streamAllocation.connection().route().proxy().type());
        writeRequest(request.headers(), requestLine);
      }
    
    public static String get(Request request, Proxy.Type proxyType) {
        StringBuilder result = new StringBuilder();
        result.append(request.method());
        result.append(' ');
    
        if (includeAuthorityInRequestLine(request, proxyType)) {
          result.append(request.url());
        } else {
          result.append(requestPath(request.url()));
        }
    
        result.append(" HTTP/1.1");
        return result.toString();
      }
      
      
    public void writeRequest(Headers headers, String requestLine) throws IOException {
        if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
        sink.writeUtf8(requestLine).writeUtf8("\r\n");
        for (int i = 0, size = headers.size(); i < size; i++) {
          sink.writeUtf8(headers.name(i))
              .writeUtf8(": ")
              .writeUtf8(headers.value(i))
              .writeUtf8("\r\n");
        }
        sink.writeUtf8("\r\n");
        state = STATE_OPEN_REQUEST_BODY;
      }
    

    8.3 获取响应头

    
    @Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
        if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
          throw new IllegalStateException("state: " + state);
        }
    
        try {
          StatusLine statusLine = StatusLine.parse(source.readUtf8LineStrict());
    
          Response.Builder responseBuilder = new Response.Builder()
              .protocol(statusLine.protocol)
              .code(statusLine.code)
              .message(statusLine.message)
              .headers(readHeaders());
    
          if (expectContinue && statusLine.code == HTTP_CONTINUE) {
            return null;
          }
    
          state = STATE_OPEN_RESPONSE_BODY;
          return responseBuilder;
        } catch (EOFException e) {
          // Provide more context if the server ends the stream before sending a response.
          IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
          exception.initCause(e);
          throw exception;
        }
      }
    
    

    8.4 获取响应体

      @Override public ResponseBody openResponseBody(Response response) throws IOException {
        Source source = getTransferStream(response);
        return new RealResponseBody(response.headers(), Okio.buffer(source));
      }
      
      private Source getTransferStream(Response response) throws IOException {
        if (!HttpHeaders.hasBody(response)) {
          return newFixedLengthSource(0);
        }
    
        if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
          return newChunkedSource(response.request().url());
        }
    
        long contentLength = HttpHeaders.contentLength(response);
        if (contentLength != -1) {
          return newFixedLengthSource(contentLength);
        }
    
        // Wrap the input stream from the connection (rather than just returning
        // "socketIn" directly here), so that we can control its use after the
        // reference escapes.
        return newUnknownLengthSource();
      }
    

    9. 整个网络请求过程已经完成

    见3.5 Responsed对象返回回调Callback

    // 不管请求成功还是失败都会调用finished方法
    finally {
            client.dispatcher().finished(this);
          }
      void finished(AsyncCall call) {
        finished(runningAsyncCalls, call, true);
      }
      private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
    ...
    ...
          if (promoteCalls) promoteCalls();
    }
    
    // 每次运行完一个AsyncCall如果运行的个数不超过限制,就添加一个请求进入线程池
      private void promoteCalls() {
        if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
        if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    
        for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
          AsyncCall call = i.next();
    
          if (runningCallsForHost(call) < maxRequestsPerHost) {
            i.remove();
            runningAsyncCalls.add(call);
            executorService().execute(call);
          }
    
          if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
        }
      }
    

    相关文章

      网友评论

          本文标题:OKHttp源码解析

          本文链接:https://www.haomeiwen.com/subject/zgljrxtx.html