OkHttp源码解析

作者: horseLai | 来源:发表于2019-02-09 21:55 被阅读4次

    一、引言

    在我们日常开发中,OkHttp可谓是最常用的开源库之一,目前就连Android API中的网络请求接口都是用的OkHttp,好吧,真的很强。

    在上学期间我也曾阅读和分析过OkHttp的源码,并记录在笔记中,不过现在再去翻看的时候发现当时很多地方并没有正确理解,因此也趁着这个过年假期重新阅读和整理一遍,感兴趣的童鞋可以看看。

    本文纯属基于个人理解,因此受限于知识水平,有些地方可能依然没有理解到位,还请发现问题的童鞋理性指出。

    温馨提示: 本文很长,源码基于 OKHttp-3.11.0

    二、从一个简单请求入手分析整体运作流程

    1. 先来点关于 OkHttpClient 的官方释义

    OkHttpClient作为Call的工厂类,用于发送HTTP请求并读取相应数据。

    OkHttpClient应当被共享.

    使用OkHttpClient的最佳使用方式是创建一个OkHttpClient单例,然后复用这个单例进行所有的HTTP请求。为啥呢?因为每个OkHttpClient自身都会持有一个连接池和线程池,所以符用连接和线程可以减少延迟、节约内存。相反地,如果给每个请求都创建一个OkHttpClient的话,那就是浪费闲置线程池的资源。

    可以如下使用new OkHttpClient()创建一个默认配置的共享OkHttpClient.

    public final OkHttpClient client = new OkHttpClient();
    

    或使用 new OkHttpClient.Builder()来创建一个自定义配置的共享实例:

    public final OkHttpClient client = new OkHttpClient.Builder()
        .addInterceptor(new HttpLoggingInterceptor())
        .cache(new Cache(cacheDir, cacheSize))
        .build();
    

    可以通过newBuilder()来自定义OkHttpClient,这样创建出来的OkHttpClient具有与原对象相同的连接池、线程池和配置。使用这个方法可以派生一个具有自己特殊配置的OkHttpClient以符合我们的特殊要求。

    如下示例演示的就是如何派生一个读超时为500毫秒的OkHttpClient:

    OkHttpClient eagerClient = client.newBuilder()
        .readTimeout(500, TimeUnit.MILLISECONDS)
        .build();
    Response response = eagerClient.newCall(request).execute();
    

    关闭(Shutdown)不是必须的

    线程和连接会一直被持有,直到当它们保持闲置时自动被释放。但是如果你编写的应用需要主动释放无用资源,那么你也可以主动去关闭。通过shutdown()方法关闭分发器dispatcher的执行服务,这将导致之后OkHttpClient收到的请求全部被拒绝掉。

    client.dispatcher().executorService().shutdown();
    

    清空连接池可以用evictAll(),不过连接池的守护线程可能不会马上退出。

    client.connectionPool().evictAll();
    

    如果相关比缓存,可以调用close(),注意如果缓存已经关闭了再创建call的话就会出现错误,并且会导致call崩溃。

    client.cache().close();
    

    OkHttp同样会为所有HTTP/2连接建立守护线程,并且再它们保持闲置状态时自动关闭掉它们。

    2. 常规使用

    上面的官方释义描述了OkHttpClient的最佳实践原则和清理操作,接下来我们根据一个简单的GET请求操作来引出我们要分析的问题:

    如下创建一个OkHttpClient实例,添加了Intercepter,并在工程目录下建了个名为cacheCache缓存:

    Interceptor logInterceptor = chain -> {
    
        Request request = chain.request();
        System.out.println(request.url());
        System.out.println(request.method());
        System.out.println(request.tag());
        System.out.println(request.headers());
    
        return chain.proceed(request);
    };
    
    okHttpClient = new OkHttpClient.Builder()
            .cache(new Cache(new File("cache/"), 10 * 1024 * 1024))
            .addInterceptor(logInterceptor)
            .build();
    

    然后一个普通的GET请求是这样的,这里以获取 玩Android 首页列表为例。

    public void getHomeList(int page){
        // 1. 建立HTTP请求
        Request request = new Request.Builder()
                .url(String.format("http://wanandroid.com/article/list/%d/json", page))
                .get()
                .build();
        // 2. 基于 Request创建 Call
        okhttp3.Call call = okHttpClient.newCall(request);
        // 3. 执行Call
        call.enqueue(new Callback() {
            @Override
            public void onFailure(okhttp3.Call call, IOException e) {
                e.printStackTrace();
            }
            @Override
            public void onResponse(okhttp3.Call call, Response response) throws IOException {
                System.out.println(response.message());
                System.out.println(response.code());
                System.out.println(response.headers());
    
                if (response.isSuccessful()){
                    ResponseBody body = response.body();
                    if (body == null) return;
    
                    System.out.println(body.string());
                    // 每个ResponseBody只能使用一次,使用后需要手动关闭
                    body.close();
                }
            }
        });
    }
    

    3. 执行流程分析

    注意到上面的okHttpClient.newCall(request),对应的源码如下,可知它创建的实际上是Call的实现类RealCall

    @Override public Call newCall(Request request) {
        return RealCall.newRealCall(this, request, false /* for web socket */);
    }
    
    static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
        RealCall call = new RealCall(client, originalRequest, forWebSocket);  
        call.eventListener = client.eventListenerFactory().create(call);
        return call;
    }
    

    Call提供请求任务的执行和取消和相关状态操作方法。类似于FutureTask,是任务执行单元,其核心的执行方法代码如下,包含同步执行(execute())和异步执行(enqueue())两种方式。对于同步方法而言,RealCall仅仅通过executed()方法将自身记录在Dispatcher(分发器)的同步请求队列中,这是为了在分发器中统计请求数量,在请求结束之后则通过finished()方法将自身从分发器中的同步请求队列中移除,而真正进行数据请求的是在拦截器Intercepter,如下源码:

    
    @Override public Response execute() throws IOException {
        // ...
        eventListener.callStart(this);
        try {
          // 1. 仅仅将这个 Call记录在分发器 ( Dispatcher )的同步执行队列中
          client.dispatcher().executed(this);
          // 2. 通过拦截器链获取响应数据,这里才会真正的执行请求
          Response result = getResponseWithInterceptorChain();
          if (result == null) throw new IOException("Canceled");
          return result;
        } catch (IOException e) {
          eventListener.callFailed(this, e);
          throw e;
        } finally {
          // 3.  拿到响应数据后从分发器的同步执行队列中移除当前请求
          client.dispatcher().finished(this);
        }
    }
    

    跟进至getResponseWithInterceptorChain(),可以注意到,除了我们在创建OkHttpClient时添加的拦截器外,每个HTTP请求都会默认添加几个固有的拦截器,如
    RetryAndFollowUpInterceptorBridgeInterceptorCacheInterceptorConnectInterceptorCallServerInterceptor

      Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List<Interceptor> interceptors = new ArrayList<>();
        interceptors.addAll(client.interceptors());
        interceptors.add(retryAndFollowUpInterceptor);
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        interceptors.add(new CacheInterceptor(client.internalCache()));
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
          interceptors.addAll(client.networkInterceptors());
        }
        interceptors.add(new CallServerInterceptor(forWebSocket));
    
        Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
            originalRequest, this, eventListener, client.connectTimeoutMillis(),
            client.readTimeoutMillis(), client.writeTimeoutMillis());
    
        return chain.proceed(originalRequest);
      }
    

    关于它们的源码实现会在后面的核心类解读中详细分析,这里先了解一个它们各自的作用:

    • RetryAndFollowUpInterceptor:用于失败时恢复以及在必要时进行重定向。
    • BridgeInterceptor:应用代码与网络代码的桥梁。首先根据用户请求建立网络请求,然后执行这个网络请求,最后根据网络请求的响应数据建立一个用户响应数据。
    • CacheInterceptor:用于从本地缓存中读取数据,以及将服务器数据写入到缓存。
    • ConnectInterceptor:用于打开一个到目标服务器的连接,并切换至下一个拦截器。
    • CallServerInterceptor:这是拦截器链的最后一环,至此将真正的进行服务器请求。

    请求时整个拦截器的调用链的执行次序如下

    拦截器执行链

    对于请求时拦截器的调用链你可能会有所疑惑,为什么它是按这个次序执行的呢?咱看看RealInterceptorChain#proceed(...)方法的主要源码,发现,虽然这里看起来只进行了一次调用,但是如果你结合这些拦截器一起分析的话,你就会发现,其实这里对拦截器集合进行了递归取值,因为每次执行proceed()方法时集合索引index+1, 并将index传入新建的RealInterceptorChain,而拦截器集合唯一,因此相当于每次proceed都是依次取得拦截器链中的下一个拦截器并使用这个新建的RealInterceptorChain,执行RealInterceptorChain#proceed方法,直到集合递归读取完成。

    public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException {
        // ...
        // 每次执行 proceed() 方法时 index+1, 然后传入新建的 RealInterceptorChain
        RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
            connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
            writeTimeout);
         // 但是 拦截器集合是相同的,因此相当于每次都是依次取得拦截器链中的下一个拦截器
        Interceptor interceptor = interceptors.get(index);
        Response response = interceptor.intercept(next);
        // ...
        return response;
    }
    

    递归? 是的,如果你观察的够仔细的话,你会发现,其实BridgeInterceptorRetryAndFollowUpInterceptorCacheInterceptorConnectInterceptor都会执行RealInterceptorChain#proceed方法,相当于这个方法在不断地调用自己,符合递归的执行特性,因此Response响应数据的返回次序刚好是与请求时相反的。BridgeInterceptor#intercept相应抽取的源码如下:

    public final class BridgeInterceptor implements Interceptor {
    
      @Override public Response intercept(Chain chain) throws IOException {
      // do something ...
        Response networkResponse = chain.proceed(requestBuilder.build());
      // do something ...
        return responseBuilder.build();
      }
    }
    

    因而拦截器链的响应数据返回次序如下:

    拦截器链的响应次序

    我靠,是不是觉得设计的非常巧妙,这也是我热衷于源码的重要原因之一,因为不看看别人的代码你就永远不知道别人有多骚。。

    根据上面的分析,我们已经知道了原来正真执行请求、处理响应数据是在拦截器,并且对于同步请求,分发器Dispatcher仅仅是记录下了同步请求的Call,用作请求数量统计用的,并没有参与到实际请求和执行中来。

    OK,来看看异步请求RealCall#enqueue()Dispatcher#enqueue(),毫无疑问,异步请求肯定是运行在线程池中了

    @Override public void enqueue(Callback responseCallback) {
        synchronized (this) {
            if (executed) throw new IllegalStateException("Already Executed");
            executed = true;
        }
        captureCallStackTrace();
        eventListener.callStart(this);
        client.dispatcher().enqueue(new AsyncCall(responseCallback));
    }
    

    Dispatcher#enqueue()

    synchronized void enqueue(AsyncCall call) {
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);
          executorService().execute(call);
        } else {
          readyAsyncCalls.add(call);
        }
    }
    

    对于上面的AsyncCall,核心源码如下,注意到getResponseWithInterceptorChain(),是不是非常地熟悉了,在上面的同步请求那里已经详细解释过了,就不再累赘了。

      final class AsyncCall extends NamedRunnable {
         // ...
        @Override protected void execute() {
          boolean signalledCallback = false;
          try {
            Response response = getResponseWithInterceptorChain();
            // ...
          } catch (IOException e) {
            // ...
          } finally {
            client.dispatcher().finished(this);
          }
        }
      }
    

    至此,OkHttp的主体运作流程是不是已经清晰了,不过有没有感觉还少点什么,我们只是分析了运作流程,具体到怎么连接的问题还没有分析。

    好吧,既然是建立连接,那么极速定位到ConnectInterceptor,没毛病吧, 核心源码如下:

    public final class ConnectInterceptor implements Interceptor {
      public final OkHttpClient client;
      // ...
      @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        StreamAllocation streamAllocation = realChain.streamAllocation();
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        // 注意这里
        HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
        RealConnection connection = streamAllocation.connection();
        return realChain.proceed(request, streamAllocation, httpCodec, connection);
      }
    }
    

    注意到上面的streamAllocation.newStream(..),源码如下:

    public HttpCodec newStream( OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
          // ... 
          // 1. 查找可用连接
          RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
              writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
          // 2. 建立 HTTP 或 HTTP2 连接
          HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
    
          synchronized (connectionPool) {
            codec = resultCodec;
            return resultCodec;
          }
    }
    

    继续定位到 findHealthyConnection

    private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException {
        while (true) { // 完全阻塞式查找,找不到不罢休
          // 1. 查找已有连接
          RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
              pingIntervalMillis, connectionRetryEnabled);
    
          // 2. 如果这是一条全新的连接,那么可以跳过大量的健康检查,直接返回
          synchronized (connectionPool) {
            if (candidate.successCount == 0) {
              return candidate;
            }
          }
          // 3. 做一个速度超慢的检查,以确保池中的连接仍然可用,
          //    如果不可用了就将其从池中剔除,然后继续查找
          if (!candidate.isHealthy(doExtensiveHealthChecks)) {
            noNewStreams();
            continue;
          }
          return candidate;
        }
    }
    

    定位到StreamAllocation#findConnection,这里查找连接的规则是:先查看当前是否存在可用连接,如果不存在,再从连接池中查找,如果还没有,那就新建一个,用来承载新的数据流。 需要注意的一个细节就是,从连接池查找连接时会查询两次,第一次只是根据当前目标服务器地址去查,如果没有查到,则第二次会重新选择路由表,然后用该地址去匹配。最终如果存在已经创建好的连接,则直接返回使用,如果不存在,则新建一个连接,进行TCPTLS握手,完事之后将这个连接的路由信息记录在路由表中,并把这个连接保存到连接池。还需要注意的一点是:如果有个连接与当前建立的连接的地址相同,那么将释放掉当前建立好的连接,而使用后面创建的连接(保证连接是最新的)

    private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
          int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
        synchronized (connectionPool) {
            // ... 
          // 1. 检查当前有没有可用连接,如果有,那么直接用当前连接
          releasedConnection = this.connection;
          toClose = releaseIfNoNewStreams();
          if (this.connection != null) { // 可用
            result = this.connection;
            releasedConnection = null;
          }
          // ...
          if (result == null) { 
            // 2. 不存在已有连接或者已有连接不可用,则尝试从连接池中获得可用连接
            Internal.instance.get(connectionPool, address, this, null);
            if (connection != null) {
              foundPooledConnection = true;
              result = connection;
            } else {
              selectedRoute = route;
            }
          }
        }
        closeQuietly(toClose);
        // ...
        if (result != null) { // 已有连接中找到了连接,完成任务
          return result;
        }
    
        boolean newRouteSelection = false;
        // 选择一条路由,这是个阻塞式操作
        if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
          newRouteSelection = true;
          routeSelection = routeSelector.next();
        }
    
        synchronized (connectionPool) {
          if (canceled) throw new IOException("Canceled");
          if (newRouteSelection) {
              // 路由已经选好了,此时再根据路由中的 IP集合去匹配连接池中的连接,
              // 这个可能因为连接合并的缘故而匹配到
            List<Route> routes = routeSelection.getAll();
            for (int i = 0, size = routes.size(); i < size; i++) {
              Route route = routes.get(i);
              Internal.instance.get(connectionPool, address, this, route);
              if (connection != null) {
                foundPooledConnection = true;
                result = connection;
                this.route = route;
                break;
              }
            }
          }
    
          if (!foundPooledConnection) {
            if (selectedRoute == null) {
              selectedRoute = routeSelection.next();
            }
    
            // 3. 最后实在没找到已有的连接,那么就只能重新建立连接了
            route = selectedRoute;
            refusedStreamCount = 0;
            result = new RealConnection(connectionPool, selectedRoute);
            acquire(result, false);
          }
        }
    
        // 根据路由匹配到了连接池中的连接
        if (foundPooledConnection) {
          eventListener.connectionAcquired(call, result);
          return result;
        }
    
        // 进行 TCP + TLS 握手. 这是阻塞式操作
        result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
            connectionRetryEnabled, call, eventListener);
        routeDatabase().connected(result.route()); // 路由表中记录下这个连接的路由信息
    
        Socket socket = null;
        synchronized (connectionPool) {
          reportedAcquired = true;
          // 将这个连接记录到连接池
          Internal.instance.put(connectionPool, result);
          // 如果多个连接指向当前创建的连接的相同地址,那么释放掉当前连接,使用后面创建的连接
          if (result.isMultiplexed()) {
            socket = Internal.instance.deduplicate(connectionPool, address, this);
            result = connection;
          }
        }
        closeQuietly(socket);
    
        eventListener.connectionAcquired(call, result);
        return result;
    }
    

    根据以上分析可得出以下主体执行流程:

    OkHttp主体执行流程.png

    当然这是同步请求的流程,而对于异步请求而言,也仅仅是把拦截器链放到了线程池执行器中执行而已。

    三、核心类解读

    至此,我们已经清楚了OkHttp的主干,当然,我们仅仅是把流程给走通了,在本节中,我们将根据源码具体分析OkHttp中各核心类的作用及其实现,内容很长,请做好心理准备。

    1. 拦截器(Intercepter

    1). RetryAndFollowUpInterceptor

    作用:用于失败时恢复以及在必要时进行重定向。

    作为核心方法,RetryAndFollowUpInterceptor#intercept体现了RetryAndFollowUpInterceptor的工作流程,源码如下,我们来分析分析它是怎么恢复和重定向的,具体实现流程还请看注释:

      @Override public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Call call = realChain.call();
        EventListener eventListener = realChain.eventListener();
    
        // 仅仅创建流的承载对象,此时并没有建立流
        StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(request.url()), call, eventListener, callStackTrace);
        this.streamAllocation = streamAllocation;
    
        int followUpCount = 0; // 用于记录重定向和需要授权请求的数量
        Response priorResponse = null;
        while (true) {
          // 1. 如果此时请求被取消了,那么关闭连接,释放资源
          if (canceled) { 
            streamAllocation.release();
            throw new IOException("Canceled");
          }
    
          Response response;
          boolean releaseConnection = true;
          try {
            // 2. 推进执行拦截器链,请求并返回响应数据
            response = realChain.proceed(request, streamAllocation, null, null);
            releaseConnection = false;
          } catch (RouteException e) {
            // 3. 如果连接失败了,尝试使用失败的地址恢复一下,此时请求可能还没有发送
            if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
              throw e.getFirstConnectException();
            }
            releaseConnection = false;
            continue;
          } catch (IOException e) {
            // 4. 尝试重新与交流失败的服务器重新交流,这个时候请求可能已经发送了
            boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
            if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
            releaseConnection = false;
            continue;
          } finally {
            // 5. 如果是位置异常,那么释放掉所有资源
            if (releaseConnection) {
              streamAllocation.streamFailed(null);
              streamAllocation.release();
            }
          }
          // 6. 如果记录的上一个请求大的响应数据存在,那么将其响应体置空
          if (priorResponse != null) {
            response = response.newBuilder()
                .priorResponse(priorResponse.newBuilder()
                        .body(null)
                        .build())
                .build();
          }
    
          Request followUp;
          try {
            // 7. 处理请求的认证头部、重定向或请求超时问题,如果这些操作都不必要
            //    或者应用不了,那么返回 null
            followUp = followUpRequest(response, streamAllocation.route());
          } catch (IOException e) {
            streamAllocation.release();
            throw e;
          }
          //需要处理认证、重定向和超时问题,那么结束处理,返回响应数据
          if (followUp == null) {
            if (!forWebSocket) {
              streamAllocation.release();
            }
            return response;
          }
          // 否则,关闭当前响应,进行后续重定向等问题的处理
          closeQuietly(response.body());
          if (++followUpCount > MAX_FOLLOW_UPS) {
            streamAllocation.release();
            throw new ProtocolException("Too many follow-up requests: " + followUpCount);
          }
          // ...
          if (!sameConnection(response, followUp.url())) {
            streamAllocation.release();
            streamAllocation = new StreamAllocation(client.connectionPool(),
                createAddress(followUp.url()), call, eventListener, callStackTrace);
            this.streamAllocation = streamAllocation;
          } else if (streamAllocation.codec() != null) {
            throw new IllegalStateException("Closing the body of " + response
                + " didn't close its backing stream. Bad interceptor?");
          }
          request = followUp;
          priorResponse = response;
        }
      }
    
    

    2). BridgeInterceptor

    作用:应用代码与网络代码的桥梁。首先根据用户请求建立网络请求,然后执行这个网络请求,最后根据网络请求的响应数据建立一个用户响应数据。

    BridgeInterceptor#intercept源码如下,主要做了以下事情:

    • 用于请求: 这个是在推进请求拦截器链时进行的,也就是说此时尚未真正地进行网络请求。此时会补充缺失的请求头参数,如 Content-TypeTransfer-EncodingHostConnectionAccept-EncodingUser-AgentCookie。如果在请求时添加了gzip请求头参数,即开启了gzip压缩,那么在取得响应数据时需要对数据进行解压。
    • 用于响应: 这个实在取得网络响应数据后回退拦截器链时进行的,即已经取得了网络响应数据。此时会对相应头部进行处理,如果请求时开启了gzip压缩,那么此时会对响应数据进行解压。
      @Override public Response intercept(Chain chain) throws IOException {
        Request userRequest = chain.request();
        Request.Builder requestBuilder = userRequest.newBuilder();
    
        // 这里处于请求之前
        // 1. 此时主要为请求添加缺失的请求头参数
        RequestBody body = userRequest.body();
        if (body != null) {
          MediaType contentType = body.contentType();
          if (contentType != null) {
            requestBuilder.header("Content-Type", contentType.toString());
          }
    
          long contentLength = body.contentLength();
          if (contentLength != -1) {
            requestBuilder.header("Content-Length", Long.toString(contentLength));
            requestBuilder.removeHeader("Transfer-Encoding");
          } else {
            requestBuilder.header("Transfer-Encoding", "chunked");
            requestBuilder.removeHeader("Content-Length");
          }
        }
        // ...
        // 如果启用了GZIP压缩,那么需要负责解压响应数据
        boolean transparentGzip = false;
        if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
          transparentGzip = true;
          requestBuilder.header("Accept-Encoding", "gzip");
        }
        //...
    
        // 2. 推进执行拦截器链,进行请求、返回数据
        Response networkResponse = chain.proceed(requestBuilder.build());
    
        // 取得网络响应数据后
        // 3. 处理响应头,如果请求时开启了GZIP压缩,那么这里需要将响应数据解压
        HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
    
        Response.Builder responseBuilder = networkResponse.newBuilder()
            .request(userRequest);
    
        if (transparentGzip
            && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
            && HttpHeaders.hasBody(networkResponse)) {
          GzipSource responseBody = new GzipSource(networkResponse.body().source());
          Headers strippedHeaders = networkResponse.headers().newBuilder()
              .removeAll("Content-Encoding")
              .removeAll("Content-Length")
              .build();
          responseBuilder.headers(strippedHeaders);
          String contentType = networkResponse.header("Content-Type");
          // 建立用户响应数据
          responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
        }
    
        return responseBuilder.build();
      }
    

    综上所述,BridgeInterceptor主要用于请求前对用户请求进行完善,补充缺失参数,然后推进请求拦截器链,并等待响应数据返回,取得响应数据后则是将其转换成用户响应数据,此时如果数据进行过gzip压缩,那么会在这里进行解压,然后重新封装成用户数据。

    3). CacheInterceptor

    作用:用于从本地缓存中读取数据,以及将服务器数据写入到缓存。

    CacheInterceptor#intercept源码如下,拦截器链执行到这一步主要做了如下事情:

    • 请求:

      如果开启了缓存,且请求策略是禁用网络仅读缓存的话,那么首先会根据当前请求去查找缓存,如果匹配到了缓存,则将缓存封装成响应数据返回,如果没有匹配到,那么返回一个504的响应,这将导致请求拦截器链执行终止,进而返回执行响应拦截器链。

      如果请求策略是网络加缓存,当那么然网络请求优先,所以就推进请求拦截器链执行请求,

    • 网络响应:

      在得到网络响应数据后,如果开启了缓存策略其匹配到了旧缓存,那么根据最新网络请求响应数据更新缓存,然后返回响应数据;如果没有匹配到缓存但是开启了缓存,那么将响应数据写入缓存后返回;而如果开启了缓存,但是并不使用缓存策略,那么根据响应数据移除缓存中对应的数据缓存。

      @Override public Response intercept(Chain chain) throws IOException {
        //  读取候选的旧缓存
        Response cacheCandidate = cache != null
            ? cache.get(chain.request())
            : null;
    
        long now = System.currentTimeMillis();
        // 解析请求和缓存策略
        CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
        Request networkRequest = strategy.networkRequest; // 如果仅读缓存,那么网络请求会为 null
        Response cacheResponse = strategy.cacheResponse;
        // ... 
        // 如果禁止使用网络而仅读取缓存的话,那么没匹配到缓存时返回 504
        if (networkRequest == null && cacheResponse == null) {
          return new Response.Builder()
              // ... 
              .code(504)
              .message("Unsatisfiable Request (only-if-cached)")
              // ...
              .body(Util.EMPTY_RESPONSE)
              .build();
        }
    
        // 如果禁止使用网络而仅读取缓存的话,那么匹配到缓存时将其返回
        if (networkRequest == null) {
          return cacheResponse.newBuilder()
              .cacheResponse(stripBody(cacheResponse))
              .build();
        }
    
        Response networkResponse = null;
        try {
          // 推进执行请求拦截器链
          networkResponse = chain.proceed(networkRequest);
        } finally {
          // 请求异常则关闭候选缓存实体
          if (networkResponse == null && cacheCandidate != null) {
            closeQuietly(cacheCandidate.body());
          }
        }
    
        // 根据最新网络请求响应数据更新缓存,然后返回响应数据
        if (cacheResponse != null) {
          if (networkResponse.code() == HTTP_NOT_MODIFIED) {
            Response response = cacheResponse.newBuilder()
                // ...
                .cacheResponse(stripBody(cacheResponse))
                .networkResponse(stripBody(networkResponse))
                .build();
            networkResponse.body().close();
            // ...
            cache.update(cacheResponse, response);
            return response;
          } else {
            closeQuietly(cacheResponse.body());
          }
        }
        // 无匹配缓存的情况
        Response response = networkResponse.newBuilder()
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
    
        if (cache != null) {
          if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
            // 使用缓存策略且无匹配缓存,则将响应数据写入缓存
            CacheRequest cacheRequest = cache.put(response);
            return cacheWritingResponse(cacheRequest, response);
          }
          if (HttpMethod.invalidatesCache(networkRequest.method())) {
            try { // 不使用缓存策略,则删除已有缓存
              cache.remove(networkRequest);
            } catch (IOException ignored) {
              // The cache cannot be written.
            }
          }
        }
        return response;
      }
    

    缓存时判断逻辑比较多,不过这里重点在于理解缓存策略,一般会有:仅网络仅缓存网络加缓存 三种请求策略。

    4). ConnectInterceptor

    作用:用于打开一个到目标服务器的连接,并切换至下一个拦截器

    因为在上一节末尾分析OkHttp如何建立连接的问题上已经分析过了,所以不做过多描述。

    这里回忆一下连接规则:建立连接时,首先会查看当前是否有可用连接,如果没有,那么会去连接池中查找,如果找到了,当然就使用这个连接,如果没有,那么就新建一个连接,进行TCPTLS握手以建立联系,接着把连接放入连接池以备后续复用,最后推进请求拦截器链执行,将打开的连接交给下一个拦截器去处理。

    5). CallServerInterceptor

    作用:这是拦截器链的最后一环,至此将真正的进行服务器请求

    CallServerInterceptor#intercept源码如下,作为拦截器链的最后一环,当然要真正地做点实事了,大致操作步骤是:

    发送请求头部 --> 读取一下GETHEAD之外的请求(如POST)的响应数据 --> 结束请求的发送动作 --> 读取响应头部 --> 读取响应数据 --> 封装后返回。

      @Override public Response intercept(Chain chain) throws IOException {
        // ...
        long sentRequestMillis = System.currentTimeMillis();
    
        // 1. 发送请求头部
        httpCodec.writeRequestHeaders(request);
    
        Response.Builder responseBuilder = null;
        // 2. 检查是否是 GET 或 HEAD 以外的请求方式、读取响应数据
        if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
          // 如果请求头部中包含"Expect: 100-continue",那么在转换请求体前等待"HTTP/1.1 100 Continue"
          // 响应。如果没有读取到"HTTP/1.1 100 Continue"的响应,那么就不转换请求体(request body)了,
          // 而直接将我们得到的响应返回(如 状态为 4XX 的响应 );
          if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
            httpCodec.flushRequest();
            // 读取、转换响应头部
            responseBuilder = httpCodec.readResponseHeaders(true);
          }
    
          if (responseBuilder == null) {
            // 如果存在"Expect: 100-continue",则写入请求体
            long contentLength = request.body().contentLength();
            CountingSink requestBodyOut =
                new CountingSink(httpCodec.createRequestBody(request, contentLength));
            BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
    
            request.body().writeTo(bufferedRequestBody);
            bufferedRequestBody.close();
          } else if (!connection.isMultiplexed()) {
            // 如果不存在"Expect: 100-continue",那么禁止 HTTP/1 连接复用。
            // 不过我们仍然必须转换请求体以使连接达到一个固定的状态。
            streamAllocation.noNewStreams();
          }
        }
        // 3. 结束请求的发送动作
        httpCodec.finishRequest();
    
    
        // 4. 读取响应头部
        if (responseBuilder == null) {
          responseBuilder = httpCodec.readResponseHeaders(false);
        }
    
        // 5. 构建响应数据
        Response response = responseBuilder
            .request(request)
            .handshake(streamAllocation.connection().handshake())
            .sentRequestAtMillis(sentRequestMillis)
            .receivedResponseAtMillis(System.currentTimeMillis())
            .build();
    
        int code = response.code();
        if (code == 100) {
          // 如果服务器返回 100-continue 响应即使我们并没有这么请求,则重新读取一遍响应数据;
          responseBuilder = httpCodec.readResponseHeaders(false);
    
          response = responseBuilder
                  .request(request) 
                  .handshake(streamAllocation.connection().handshake())
                  .sentRequestAtMillis(sentRequestMillis)
                  .receivedResponseAtMillis(System.currentTimeMillis())
                  .build();
    
          code = response.code();
        }
        // 6. 填充响应数据
        if (forWebSocket && code == 101) {
          // 连接正在更新,不过我们需要确保拦截器收到的 non-null 的响应体
          response = response.newBuilder()
              .body(Util.EMPTY_RESPONSE)
              .build();
        } else {
          // 这里将 http 输入流包装到响应体
          response = response.newBuilder()
              .body(httpCodec.openResponseBody(response))
              .build();
        }
        // 其他情况...
        return response;
      }
    

    综上,作为拦截器最后一环的CallServerInterceptor终于把请求给终结了,完成了与服务器的沟通交流,把需要的数据拿了回来。请求的时候每个拦截器都会插上一脚,响应的时候也一样,把数据转换的工作分给了各个拦截器处理。

    2. 分发器(Dispatcher

    为什么叫分发器呢?如果叫做执行器(Executor)可能会更好理解一些,因为它的工作就是执行异步请求,虽然会统计请求的数量....嗯~~好吧,换个角度,如果理解为它用于把异步任务分发给线程池执行,起到任务分发的作用,那就理解为啥叫分发器了。

    OK,先来观察一下Dispatcher的构成,部分源码如下,可以先看看注释:

    public final class Dispatcher {
      private int maxRequests = 64;  // 同时执行的最大异步请求数量,数量超过该值时,新增的请求会放入异步请求队列中
      private int maxRequestsPerHost = 5;  // 每个主机最多同时存在的请求数量
      private @Nullable Runnable idleCallback; 
     
      // 线程池执行器
      private @Nullable ExecutorService executorService;
      // 尚未执行的任务队列
      private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
      // 正在执行的任务队列
      private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
      // 同步执行的任务队列
      private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
    
      public Dispatcher(ExecutorService executorService) {
        this.executorService = executorService;
      }
      
      public synchronized ExecutorService executorService() {
        if (executorService == null) {
          // 初始化线程池执行器
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
              new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
        }
        return executorService;
      }
      // ...
    }
    

    简单描述一下:Dispatcher包含三个任务队列,分别用于记录尚未执行的异步请求、正在执行的异步请求、正在执行的同步请求。包含一个线程池,用于执行异步请求,这个线程池执行器的核心线程数量为 0, 最大线程数量不限(整型的最大值2^31-1,相当于不限),闲置线程的最大等待超时时间为60秒,线程池的任务队列使用非公平机制的SynchronousQueue。这就是Dispatcher的主要配置。

    我们来看看它是如何限制每个主机的请求数量的,直接看注释好了。

    synchronized void enqueue(AsyncCall call) {
        // 正在执行的异步请求数量小于限定值,且同一主机的正在执行的异步请求数量小于限定值时
        // 添加到正在执行的异步请求队列中,并执行。
        if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
          runningAsyncCalls.add(call);
          executorService().execute(call);
        } else { // 否则就添加到等待队列中
          readyAsyncCalls.add(call);
        }
      }
    

    runningCallsForHost用于计算当前正在执行的连接到相同主机上异步请求的数量

     private int runningCallsForHost(AsyncCall call) {
        int result = 0;
        for (AsyncCall c : runningAsyncCalls) {
          if (c.get().forWebSocket) continue;
          if (c.host().equals(call.host())) result++;
        }
        return result;
      }
    

    3. 连接池(ConnetionPool

    作用:用于管理HTTPHTTP/2连接的复用以减少网络延迟,因为使用相同地址的请求可能共享一个连接。所以ConnetionPool实现了维护已打开连接已被后续使用的机制。

    线程池啥的就不多说了,这里主要分析一下ConnetionPool如何维护已打开的连接。从ConnetionPool#put着手:

      void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        if (!cleanupRunning) { // 如果当前不在执行清理任务,那么现在执行
          cleanupRunning = true;
          executor.execute(cleanupRunnable); // 线程池的作用就是执行清理任务
        }
        connections.add(connection); // 同时添加到连接队列中
      }
    

    cleanupRunnable源码如下,根据ConnetionPool#put可知每当我们往连接池中添加一个连接时,如果当前不在执行清理任务(cleanupRunnable),那么立马会执行cleanupRunnable,而cleanupRunnable中会循环执行cleanup,直到所有连接都因闲置超时而被清理掉,具体还请先看注释。

    private final Runnable cleanupRunnable = new Runnable() {
        @Override public void run() {
          while (true) {
            long waitNanos = cleanup(System.nanoTime()); // 执行清理
            if (waitNanos == -1) return; // cleanup中的情况 4)
            if (waitNanos > 0) {  // cleanup中的情况 2) 和 3)  
              long waitMillis = waitNanos / 1000000L;
              waitNanos -= (waitMillis * 1000000L);
              synchronized (ConnectionPool.this) {
                try {
                  ConnectionPool.this.wait(waitMillis, (int) waitNanos); // 等待超时
                } catch (InterruptedException ignored) {
                }
              }
            }
            // 至此情况 1), 2), 3) 都会导致 `cleanup`被循环执行
          }
        }
      };
    

    cleanup源码如下,它的作用是查找、清理超过了keep-alive时间限制或者闲置超时闲置的连接。具体还请看注释。

    long cleanup(long now) {
        int inUseConnectionCount = 0;
        int idleConnectionCount = 0;
        RealConnection longestIdleConnection = null;
        long longestIdleDurationNs = Long.MIN_VALUE;
    
        // Find either a connection to evict, or the time that the next eviction is due.
        synchronized (this) {
          // 1. 查找超时连接
          for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            RealConnection connection = i.next();
            // 1). 如果正在使用,那么跳过,继续查找
            if (pruneAndGetAllocationCount(connection, now) > 0) {
              inUseConnectionCount++;
              continue;
            }
    
            idleConnectionCount++; // 记录闲置连接的数量
    
            // 2). 如果闲置时间超过了最大允许闲置时间,则记录下来在后面清除
            long idleDurationNs = now - connection.idleAtNanos;
            if (idleDurationNs > longestIdleDurationNs) {
              longestIdleDurationNs = idleDurationNs;
              longestIdleConnection = connection;
            }
          }
    
          // 2. 查找完成了,将在这里对闲置连接进行处理
          if (longestIdleDurationNs >= this.keepAliveDurationNs
              || idleConnectionCount > this.maxIdleConnections) {
            // 1). 确定已经超时了,那么从连接池中清除,关闭动作会在同步块外面进行
            connections.remove(longestIdleConnection);
          } else if (idleConnectionCount > 0) {
            // 2). 存在闲置连接,但是尚未超时
            return keepAliveDurationNs - longestIdleDurationNs;
          } else if (inUseConnectionCount > 0) {
            // 3). 如果所有连接都正在使用,那么最多保持个`keep-alive`超时时间就又会重新执行清理动作
            return keepAliveDurationNs;
          } else {
            // 4). 压根没有连接,那不管了,标记为非清理状态,并返回-1
            cleanupRunning = false;
            return -1;
          }
        }
        // 3. 关闭上面查找到的处于情况1)的闲置超时连接
        closeQuietly(longestIdleConnection.socket());
        // 返回 0 ,表示马上会重新回来执行清理操作
        return 0;
      }
    

    综上,ConnectionPool 会设定keepAliveDurationNslongestIdleDurationNs两个超时时间,而每次往连接池中添加一个新连接时,如果当前处于非清理装填,都会导致线程池执行器开个线程执行清理动作,而对于清理动作而言,会遍历连接池,查找闲置超时的连接,并记录闲置连接的数量,而遍历完成后,将根据情况 2. 1)、2)、3)、4) 进行相应的处理,而如果是情况 2. 4), 则会当即结束清理循环,意味着连接池中已经没有连接了,此时线程会执行完成而退出,其他几种情况都不会中断循环,因此实际上这个线程池最多只会存在一个连接池维护线程。

    四、总结

    一般来说,当使用OKHttp通过URL请求时,它做了以下事情:

    • 使用URL并且配置OKHttpClient来创建地址(Address),这个地址指定了我们连接web服务器的方式。
    • 尝试从连接池(ConnectionPool)中取出与该地址相同的连接。
    • 如果在连接池中没有对应该地址的连接,那么它会选择一条新路线(route)去尝试,这通常意味着将进行DNS请求以获取对应服务器的IP地址,然后如果需要的话还会选择TLS版本和代理服务器。
    • 如果这是一条新路线,它会通过Socket连、TLS隧道(HTTP代理的HTTPS)或者TLS连接,然后根据需要进行TCPTLS握手。
    • 发送HTTP请求,然后读取响应数据。

    如果连接出现问题,OKHttp会选择另一条路线再次尝试,这使得OKHttp在服务器地址子集无法访问时能够恢复,而当从连接池中拿到的连接已经过期,或者TLS版本不支持的情况下,这种方式同样很有用。一旦接收到响应数据,该连接就会返回到连接池中以备后续请求使用,而连接池中的连接也会在一定时间的不活动状态后被清除掉。

    对于整体框架而言,本文已经详细分析了OkHttp的整体工作流程,相关细节还请回到文中去,这里就不再累赘了。

    相关文章

      网友评论

        本文标题:OkHttp源码解析

        本文链接:https://www.haomeiwen.com/subject/hbqisqtx.html