OkHttp源码解析

作者: horseLai | 来源:发表于2019-02-09 21:55 被阅读4次

一、引言

在我们日常开发中,OkHttp可谓是最常用的开源库之一,目前就连Android API中的网络请求接口都是用的OkHttp,好吧,真的很强。

在上学期间我也曾阅读和分析过OkHttp的源码,并记录在笔记中,不过现在再去翻看的时候发现当时很多地方并没有正确理解,因此也趁着这个过年假期重新阅读和整理一遍,感兴趣的童鞋可以看看。

本文纯属基于个人理解,因此受限于知识水平,有些地方可能依然没有理解到位,还请发现问题的童鞋理性指出。

温馨提示: 本文很长,源码基于 OKHttp-3.11.0

二、从一个简单请求入手分析整体运作流程

1. 先来点关于 OkHttpClient 的官方释义

OkHttpClient作为Call的工厂类,用于发送HTTP请求并读取相应数据。

OkHttpClient应当被共享.

使用OkHttpClient的最佳使用方式是创建一个OkHttpClient单例,然后复用这个单例进行所有的HTTP请求。为啥呢?因为每个OkHttpClient自身都会持有一个连接池和线程池,所以符用连接和线程可以减少延迟、节约内存。相反地,如果给每个请求都创建一个OkHttpClient的话,那就是浪费闲置线程池的资源。

可以如下使用new OkHttpClient()创建一个默认配置的共享OkHttpClient.

public final OkHttpClient client = new OkHttpClient();

或使用 new OkHttpClient.Builder()来创建一个自定义配置的共享实例:

public final OkHttpClient client = new OkHttpClient.Builder()
    .addInterceptor(new HttpLoggingInterceptor())
    .cache(new Cache(cacheDir, cacheSize))
    .build();

可以通过newBuilder()来自定义OkHttpClient,这样创建出来的OkHttpClient具有与原对象相同的连接池、线程池和配置。使用这个方法可以派生一个具有自己特殊配置的OkHttpClient以符合我们的特殊要求。

如下示例演示的就是如何派生一个读超时为500毫秒的OkHttpClient:

OkHttpClient eagerClient = client.newBuilder()
    .readTimeout(500, TimeUnit.MILLISECONDS)
    .build();
Response response = eagerClient.newCall(request).execute();

关闭(Shutdown)不是必须的

线程和连接会一直被持有,直到当它们保持闲置时自动被释放。但是如果你编写的应用需要主动释放无用资源,那么你也可以主动去关闭。通过shutdown()方法关闭分发器dispatcher的执行服务,这将导致之后OkHttpClient收到的请求全部被拒绝掉。

client.dispatcher().executorService().shutdown();

清空连接池可以用evictAll(),不过连接池的守护线程可能不会马上退出。

client.connectionPool().evictAll();

如果相关比缓存,可以调用close(),注意如果缓存已经关闭了再创建call的话就会出现错误,并且会导致call崩溃。

client.cache().close();

OkHttp同样会为所有HTTP/2连接建立守护线程,并且再它们保持闲置状态时自动关闭掉它们。

2. 常规使用

上面的官方释义描述了OkHttpClient的最佳实践原则和清理操作,接下来我们根据一个简单的GET请求操作来引出我们要分析的问题:

如下创建一个OkHttpClient实例,添加了Intercepter,并在工程目录下建了个名为cacheCache缓存:

Interceptor logInterceptor = chain -> {

    Request request = chain.request();
    System.out.println(request.url());
    System.out.println(request.method());
    System.out.println(request.tag());
    System.out.println(request.headers());

    return chain.proceed(request);
};

okHttpClient = new OkHttpClient.Builder()
        .cache(new Cache(new File("cache/"), 10 * 1024 * 1024))
        .addInterceptor(logInterceptor)
        .build();

然后一个普通的GET请求是这样的,这里以获取 玩Android 首页列表为例。

public void getHomeList(int page){
    // 1. 建立HTTP请求
    Request request = new Request.Builder()
            .url(String.format("http://wanandroid.com/article/list/%d/json", page))
            .get()
            .build();
    // 2. 基于 Request创建 Call
    okhttp3.Call call = okHttpClient.newCall(request);
    // 3. 执行Call
    call.enqueue(new Callback() {
        @Override
        public void onFailure(okhttp3.Call call, IOException e) {
            e.printStackTrace();
        }
        @Override
        public void onResponse(okhttp3.Call call, Response response) throws IOException {
            System.out.println(response.message());
            System.out.println(response.code());
            System.out.println(response.headers());

            if (response.isSuccessful()){
                ResponseBody body = response.body();
                if (body == null) return;

                System.out.println(body.string());
                // 每个ResponseBody只能使用一次,使用后需要手动关闭
                body.close();
            }
        }
    });
}

3. 执行流程分析

注意到上面的okHttpClient.newCall(request),对应的源码如下,可知它创建的实际上是Call的实现类RealCall

@Override public Call newCall(Request request) {
    return RealCall.newRealCall(this, request, false /* for web socket */);
}
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
    RealCall call = new RealCall(client, originalRequest, forWebSocket);  
    call.eventListener = client.eventListenerFactory().create(call);
    return call;
}

Call提供请求任务的执行和取消和相关状态操作方法。类似于FutureTask,是任务执行单元,其核心的执行方法代码如下,包含同步执行(execute())和异步执行(enqueue())两种方式。对于同步方法而言,RealCall仅仅通过executed()方法将自身记录在Dispatcher(分发器)的同步请求队列中,这是为了在分发器中统计请求数量,在请求结束之后则通过finished()方法将自身从分发器中的同步请求队列中移除,而真正进行数据请求的是在拦截器Intercepter,如下源码:


@Override public Response execute() throws IOException {
    // ...
    eventListener.callStart(this);
    try {
      // 1. 仅仅将这个 Call记录在分发器 ( Dispatcher )的同步执行队列中
      client.dispatcher().executed(this);
      // 2. 通过拦截器链获取响应数据,这里才会真正的执行请求
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      // 3.  拿到响应数据后从分发器的同步执行队列中移除当前请求
      client.dispatcher().finished(this);
    }
}

跟进至getResponseWithInterceptorChain(),可以注意到,除了我们在创建OkHttpClient时添加的拦截器外,每个HTTP请求都会默认添加几个固有的拦截器,如
RetryAndFollowUpInterceptorBridgeInterceptorCacheInterceptorConnectInterceptorCallServerInterceptor

  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

关于它们的源码实现会在后面的核心类解读中详细分析,这里先了解一个它们各自的作用:

  • RetryAndFollowUpInterceptor:用于失败时恢复以及在必要时进行重定向。
  • BridgeInterceptor:应用代码与网络代码的桥梁。首先根据用户请求建立网络请求,然后执行这个网络请求,最后根据网络请求的响应数据建立一个用户响应数据。
  • CacheInterceptor:用于从本地缓存中读取数据,以及将服务器数据写入到缓存。
  • ConnectInterceptor:用于打开一个到目标服务器的连接,并切换至下一个拦截器。
  • CallServerInterceptor:这是拦截器链的最后一环,至此将真正的进行服务器请求。

请求时整个拦截器的调用链的执行次序如下

拦截器执行链

对于请求时拦截器的调用链你可能会有所疑惑,为什么它是按这个次序执行的呢?咱看看RealInterceptorChain#proceed(...)方法的主要源码,发现,虽然这里看起来只进行了一次调用,但是如果你结合这些拦截器一起分析的话,你就会发现,其实这里对拦截器集合进行了递归取值,因为每次执行proceed()方法时集合索引index+1, 并将index传入新建的RealInterceptorChain,而拦截器集合唯一,因此相当于每次proceed都是依次取得拦截器链中的下一个拦截器并使用这个新建的RealInterceptorChain,执行RealInterceptorChain#proceed方法,直到集合递归读取完成。

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException {
    // ...
    // 每次执行 proceed() 方法时 index+1, 然后传入新建的 RealInterceptorChain
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
     // 但是 拦截器集合是相同的,因此相当于每次都是依次取得拦截器链中的下一个拦截器
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);
    // ...
    return response;
}

递归? 是的,如果你观察的够仔细的话,你会发现,其实BridgeInterceptorRetryAndFollowUpInterceptorCacheInterceptorConnectInterceptor都会执行RealInterceptorChain#proceed方法,相当于这个方法在不断地调用自己,符合递归的执行特性,因此Response响应数据的返回次序刚好是与请求时相反的。BridgeInterceptor#intercept相应抽取的源码如下:

public final class BridgeInterceptor implements Interceptor {

  @Override public Response intercept(Chain chain) throws IOException {
  // do something ...
    Response networkResponse = chain.proceed(requestBuilder.build());
  // do something ...
    return responseBuilder.build();
  }
}

因而拦截器链的响应数据返回次序如下:

拦截器链的响应次序

我靠,是不是觉得设计的非常巧妙,这也是我热衷于源码的重要原因之一,因为不看看别人的代码你就永远不知道别人有多骚。。

根据上面的分析,我们已经知道了原来正真执行请求、处理响应数据是在拦截器,并且对于同步请求,分发器Dispatcher仅仅是记录下了同步请求的Call,用作请求数量统计用的,并没有参与到实际请求和执行中来。

OK,来看看异步请求RealCall#enqueue()Dispatcher#enqueue(),毫无疑问,异步请求肯定是运行在线程池中了

@Override public void enqueue(Callback responseCallback) {
    synchronized (this) {
        if (executed) throw new IllegalStateException("Already Executed");
        executed = true;
    }
    captureCallStackTrace();
    eventListener.callStart(this);
    client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

Dispatcher#enqueue()

synchronized void enqueue(AsyncCall call) {
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
}

对于上面的AsyncCall,核心源码如下,注意到getResponseWithInterceptorChain(),是不是非常地熟悉了,在上面的同步请求那里已经详细解释过了,就不再累赘了。

  final class AsyncCall extends NamedRunnable {
     // ...
    @Override protected void execute() {
      boolean signalledCallback = false;
      try {
        Response response = getResponseWithInterceptorChain();
        // ...
      } catch (IOException e) {
        // ...
      } finally {
        client.dispatcher().finished(this);
      }
    }
  }

至此,OkHttp的主体运作流程是不是已经清晰了,不过有没有感觉还少点什么,我们只是分析了运作流程,具体到怎么连接的问题还没有分析。

好吧,既然是建立连接,那么极速定位到ConnectInterceptor,没毛病吧, 核心源码如下:

public final class ConnectInterceptor implements Interceptor {
  public final OkHttpClient client;
  // ...
  @Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    // 注意这里
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();
    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
}

注意到上面的streamAllocation.newStream(..),源码如下:

public HttpCodec newStream( OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
      // ... 
      // 1. 查找可用连接
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      // 2. 建立 HTTP 或 HTTP2 连接
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
}

继续定位到 findHealthyConnection

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException {
    while (true) { // 完全阻塞式查找,找不到不罢休
      // 1. 查找已有连接
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);

      // 2. 如果这是一条全新的连接,那么可以跳过大量的健康检查,直接返回
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }
      // 3. 做一个速度超慢的检查,以确保池中的连接仍然可用,
      //    如果不可用了就将其从池中剔除,然后继续查找
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }
      return candidate;
    }
}

定位到StreamAllocation#findConnection,这里查找连接的规则是:先查看当前是否存在可用连接,如果不存在,再从连接池中查找,如果还没有,那就新建一个,用来承载新的数据流。 需要注意的一个细节就是,从连接池查找连接时会查询两次,第一次只是根据当前目标服务器地址去查,如果没有查到,则第二次会重新选择路由表,然后用该地址去匹配。最终如果存在已经创建好的连接,则直接返回使用,如果不存在,则新建一个连接,进行TCPTLS握手,完事之后将这个连接的路由信息记录在路由表中,并把这个连接保存到连接池。还需要注意的一点是:如果有个连接与当前建立的连接的地址相同,那么将释放掉当前建立好的连接,而使用后面创建的连接(保证连接是最新的)

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    synchronized (connectionPool) {
        // ... 
      // 1. 检查当前有没有可用连接,如果有,那么直接用当前连接
      releasedConnection = this.connection;
      toClose = releaseIfNoNewStreams();
      if (this.connection != null) { // 可用
        result = this.connection;
        releasedConnection = null;
      }
      // ...
      if (result == null) { 
        // 2. 不存在已有连接或者已有连接不可用,则尝试从连接池中获得可用连接
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;
        }
      }
    }
    closeQuietly(toClose);
    // ...
    if (result != null) { // 已有连接中找到了连接,完成任务
      return result;
    }

    boolean newRouteSelection = false;
    // 选择一条路由,这是个阻塞式操作
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");
      if (newRouteSelection) {
          // 路由已经选好了,此时再根据路由中的 IP集合去匹配连接池中的连接,
          // 这个可能因为连接合并的缘故而匹配到
        List<Route> routes = routeSelection.getAll();
        for (int i = 0, size = routes.size(); i < size; i++) {
          Route route = routes.get(i);
          Internal.instance.get(connectionPool, address, this, route);
          if (connection != null) {
            foundPooledConnection = true;
            result = connection;
            this.route = route;
            break;
          }
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // 3. 最后实在没找到已有的连接,那么就只能重新建立连接了
        route = selectedRoute;
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        acquire(result, false);
      }
    }

    // 根据路由匹配到了连接池中的连接
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // 进行 TCP + TLS 握手. 这是阻塞式操作
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    routeDatabase().connected(result.route()); // 路由表中记录下这个连接的路由信息

    Socket socket = null;
    synchronized (connectionPool) {
      reportedAcquired = true;
      // 将这个连接记录到连接池
      Internal.instance.put(connectionPool, result);
      // 如果多个连接指向当前创建的连接的相同地址,那么释放掉当前连接,使用后面创建的连接
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
}

根据以上分析可得出以下主体执行流程:

OkHttp主体执行流程.png

当然这是同步请求的流程,而对于异步请求而言,也仅仅是把拦截器链放到了线程池执行器中执行而已。

三、核心类解读

至此,我们已经清楚了OkHttp的主干,当然,我们仅仅是把流程给走通了,在本节中,我们将根据源码具体分析OkHttp中各核心类的作用及其实现,内容很长,请做好心理准备。

1. 拦截器(Intercepter

1). RetryAndFollowUpInterceptor

作用:用于失败时恢复以及在必要时进行重定向。

作为核心方法,RetryAndFollowUpInterceptor#intercept体现了RetryAndFollowUpInterceptor的工作流程,源码如下,我们来分析分析它是怎么恢复和重定向的,具体实现流程还请看注释:

  @Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();

    // 仅仅创建流的承载对象,此时并没有建立流
    StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
    this.streamAllocation = streamAllocation;

    int followUpCount = 0; // 用于记录重定向和需要授权请求的数量
    Response priorResponse = null;
    while (true) {
      // 1. 如果此时请求被取消了,那么关闭连接,释放资源
      if (canceled) { 
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {
        // 2. 推进执行拦截器链,请求并返回响应数据
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        // 3. 如果连接失败了,尝试使用失败的地址恢复一下,此时请求可能还没有发送
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getFirstConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
        // 4. 尝试重新与交流失败的服务器重新交流,这个时候请求可能已经发送了
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // 5. 如果是位置异常,那么释放掉所有资源
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }
      // 6. 如果记录的上一个请求大的响应数据存在,那么将其响应体置空
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }

      Request followUp;
      try {
        // 7. 处理请求的认证头部、重定向或请求超时问题,如果这些操作都不必要
        //    或者应用不了,那么返回 null
        followUp = followUpRequest(response, streamAllocation.route());
      } catch (IOException e) {
        streamAllocation.release();
        throw e;
      }
      //需要处理认证、重定向和超时问题,那么结束处理,返回响应数据
      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }
      // 否则,关闭当前响应,进行后续重定向等问题的处理
      closeQuietly(response.body());
      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }
      // ...
      if (!sameConnection(response, followUp.url())) {
        streamAllocation.release();
        streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(followUp.url()), call, eventListener, callStackTrace);
        this.streamAllocation = streamAllocation;
      } else if (streamAllocation.codec() != null) {
        throw new IllegalStateException("Closing the body of " + response
            + " didn't close its backing stream. Bad interceptor?");
      }
      request = followUp;
      priorResponse = response;
    }
  }

2). BridgeInterceptor

作用:应用代码与网络代码的桥梁。首先根据用户请求建立网络请求,然后执行这个网络请求,最后根据网络请求的响应数据建立一个用户响应数据。

BridgeInterceptor#intercept源码如下,主要做了以下事情:

  • 用于请求: 这个是在推进请求拦截器链时进行的,也就是说此时尚未真正地进行网络请求。此时会补充缺失的请求头参数,如 Content-TypeTransfer-EncodingHostConnectionAccept-EncodingUser-AgentCookie。如果在请求时添加了gzip请求头参数,即开启了gzip压缩,那么在取得响应数据时需要对数据进行解压。
  • 用于响应: 这个实在取得网络响应数据后回退拦截器链时进行的,即已经取得了网络响应数据。此时会对相应头部进行处理,如果请求时开启了gzip压缩,那么此时会对响应数据进行解压。
  @Override public Response intercept(Chain chain) throws IOException {
    Request userRequest = chain.request();
    Request.Builder requestBuilder = userRequest.newBuilder();

    // 这里处于请求之前
    // 1. 此时主要为请求添加缺失的请求头参数
    RequestBody body = userRequest.body();
    if (body != null) {
      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -1) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }
    }
    // ...
    // 如果启用了GZIP压缩,那么需要负责解压响应数据
    boolean transparentGzip = false;
    if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
      transparentGzip = true;
      requestBuilder.header("Accept-Encoding", "gzip");
    }
    //...

    // 2. 推进执行拦截器链,进行请求、返回数据
    Response networkResponse = chain.proceed(requestBuilder.build());

    // 取得网络响应数据后
    // 3. 处理响应头,如果请求时开启了GZIP压缩,那么这里需要将响应数据解压
    HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

    Response.Builder responseBuilder = networkResponse.newBuilder()
        .request(userRequest);

    if (transparentGzip
        && "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
        && HttpHeaders.hasBody(networkResponse)) {
      GzipSource responseBody = new GzipSource(networkResponse.body().source());
      Headers strippedHeaders = networkResponse.headers().newBuilder()
          .removeAll("Content-Encoding")
          .removeAll("Content-Length")
          .build();
      responseBuilder.headers(strippedHeaders);
      String contentType = networkResponse.header("Content-Type");
      // 建立用户响应数据
      responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
    }

    return responseBuilder.build();
  }

综上所述,BridgeInterceptor主要用于请求前对用户请求进行完善,补充缺失参数,然后推进请求拦截器链,并等待响应数据返回,取得响应数据后则是将其转换成用户响应数据,此时如果数据进行过gzip压缩,那么会在这里进行解压,然后重新封装成用户数据。

3). CacheInterceptor

作用:用于从本地缓存中读取数据,以及将服务器数据写入到缓存。

CacheInterceptor#intercept源码如下,拦截器链执行到这一步主要做了如下事情:

  • 请求:

    如果开启了缓存,且请求策略是禁用网络仅读缓存的话,那么首先会根据当前请求去查找缓存,如果匹配到了缓存,则将缓存封装成响应数据返回,如果没有匹配到,那么返回一个504的响应,这将导致请求拦截器链执行终止,进而返回执行响应拦截器链。

    如果请求策略是网络加缓存,当那么然网络请求优先,所以就推进请求拦截器链执行请求,

  • 网络响应:

    在得到网络响应数据后,如果开启了缓存策略其匹配到了旧缓存,那么根据最新网络请求响应数据更新缓存,然后返回响应数据;如果没有匹配到缓存但是开启了缓存,那么将响应数据写入缓存后返回;而如果开启了缓存,但是并不使用缓存策略,那么根据响应数据移除缓存中对应的数据缓存。

  @Override public Response intercept(Chain chain) throws IOException {
    //  读取候选的旧缓存
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();
    // 解析请求和缓存策略
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest; // 如果仅读缓存,那么网络请求会为 null
    Response cacheResponse = strategy.cacheResponse;
    // ... 
    // 如果禁止使用网络而仅读取缓存的话,那么没匹配到缓存时返回 504
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          // ... 
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          // ...
          .body(Util.EMPTY_RESPONSE)
          .build();
    }

    // 如果禁止使用网络而仅读取缓存的话,那么匹配到缓存时将其返回
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    Response networkResponse = null;
    try {
      // 推进执行请求拦截器链
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // 请求异常则关闭候选缓存实体
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }

    // 根据最新网络请求响应数据更新缓存,然后返回响应数据
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            // ...
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();
        // ...
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }
    // 无匹配缓存的情况
    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // 使用缓存策略且无匹配缓存,则将响应数据写入缓存
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }
      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try { // 不使用缓存策略,则删除已有缓存
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }
    return response;
  }

缓存时判断逻辑比较多,不过这里重点在于理解缓存策略,一般会有:仅网络仅缓存网络加缓存 三种请求策略。

4). ConnectInterceptor

作用:用于打开一个到目标服务器的连接,并切换至下一个拦截器

因为在上一节末尾分析OkHttp如何建立连接的问题上已经分析过了,所以不做过多描述。

这里回忆一下连接规则:建立连接时,首先会查看当前是否有可用连接,如果没有,那么会去连接池中查找,如果找到了,当然就使用这个连接,如果没有,那么就新建一个连接,进行TCPTLS握手以建立联系,接着把连接放入连接池以备后续复用,最后推进请求拦截器链执行,将打开的连接交给下一个拦截器去处理。

5). CallServerInterceptor

作用:这是拦截器链的最后一环,至此将真正的进行服务器请求

CallServerInterceptor#intercept源码如下,作为拦截器链的最后一环,当然要真正地做点实事了,大致操作步骤是:

发送请求头部 --> 读取一下GETHEAD之外的请求(如POST)的响应数据 --> 结束请求的发送动作 --> 读取响应头部 --> 读取响应数据 --> 封装后返回。

  @Override public Response intercept(Chain chain) throws IOException {
    // ...
    long sentRequestMillis = System.currentTimeMillis();

    // 1. 发送请求头部
    httpCodec.writeRequestHeaders(request);

    Response.Builder responseBuilder = null;
    // 2. 检查是否是 GET 或 HEAD 以外的请求方式、读取响应数据
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // 如果请求头部中包含"Expect: 100-continue",那么在转换请求体前等待"HTTP/1.1 100 Continue"
      // 响应。如果没有读取到"HTTP/1.1 100 Continue"的响应,那么就不转换请求体(request body)了,
      // 而直接将我们得到的响应返回(如 状态为 4XX 的响应 );
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        // 读取、转换响应头部
        responseBuilder = httpCodec.readResponseHeaders(true);
      }

      if (responseBuilder == null) {
        // 如果存在"Expect: 100-continue",则写入请求体
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
      } else if (!connection.isMultiplexed()) {
        // 如果不存在"Expect: 100-continue",那么禁止 HTTP/1 连接复用。
        // 不过我们仍然必须转换请求体以使连接达到一个固定的状态。
        streamAllocation.noNewStreams();
      }
    }
    // 3. 结束请求的发送动作
    httpCodec.finishRequest();


    // 4. 读取响应头部
    if (responseBuilder == null) {
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    // 5. 构建响应数据
    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (code == 100) {
      // 如果服务器返回 100-continue 响应即使我们并没有这么请求,则重新读取一遍响应数据;
      responseBuilder = httpCodec.readResponseHeaders(false);

      response = responseBuilder
              .request(request) 
              .handshake(streamAllocation.connection().handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();

      code = response.code();
    }
    // 6. 填充响应数据
    if (forWebSocket && code == 101) {
      // 连接正在更新,不过我们需要确保拦截器收到的 non-null 的响应体
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      // 这里将 http 输入流包装到响应体
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }
    // 其他情况...
    return response;
  }

综上,作为拦截器最后一环的CallServerInterceptor终于把请求给终结了,完成了与服务器的沟通交流,把需要的数据拿了回来。请求的时候每个拦截器都会插上一脚,响应的时候也一样,把数据转换的工作分给了各个拦截器处理。

2. 分发器(Dispatcher

为什么叫分发器呢?如果叫做执行器(Executor)可能会更好理解一些,因为它的工作就是执行异步请求,虽然会统计请求的数量....嗯~~好吧,换个角度,如果理解为它用于把异步任务分发给线程池执行,起到任务分发的作用,那就理解为啥叫分发器了。

OK,先来观察一下Dispatcher的构成,部分源码如下,可以先看看注释:

public final class Dispatcher {
  private int maxRequests = 64;  // 同时执行的最大异步请求数量,数量超过该值时,新增的请求会放入异步请求队列中
  private int maxRequestsPerHost = 5;  // 每个主机最多同时存在的请求数量
  private @Nullable Runnable idleCallback; 
 
  // 线程池执行器
  private @Nullable ExecutorService executorService;
  // 尚未执行的任务队列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  // 正在执行的任务队列
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  // 同步执行的任务队列
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
    this.executorService = executorService;
  }
  
  public synchronized ExecutorService executorService() {
    if (executorService == null) {
      // 初始化线程池执行器
      executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
    }
    return executorService;
  }
  // ...
}

简单描述一下:Dispatcher包含三个任务队列,分别用于记录尚未执行的异步请求、正在执行的异步请求、正在执行的同步请求。包含一个线程池,用于执行异步请求,这个线程池执行器的核心线程数量为 0, 最大线程数量不限(整型的最大值2^31-1,相当于不限),闲置线程的最大等待超时时间为60秒,线程池的任务队列使用非公平机制的SynchronousQueue。这就是Dispatcher的主要配置。

我们来看看它是如何限制每个主机的请求数量的,直接看注释好了。

synchronized void enqueue(AsyncCall call) {
    // 正在执行的异步请求数量小于限定值,且同一主机的正在执行的异步请求数量小于限定值时
    // 添加到正在执行的异步请求队列中,并执行。
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else { // 否则就添加到等待队列中
      readyAsyncCalls.add(call);
    }
  }

runningCallsForHost用于计算当前正在执行的连接到相同主机上异步请求的数量

 private int runningCallsForHost(AsyncCall call) {
    int result = 0;
    for (AsyncCall c : runningAsyncCalls) {
      if (c.get().forWebSocket) continue;
      if (c.host().equals(call.host())) result++;
    }
    return result;
  }

3. 连接池(ConnetionPool

作用:用于管理HTTPHTTP/2连接的复用以减少网络延迟,因为使用相同地址的请求可能共享一个连接。所以ConnetionPool实现了维护已打开连接已被后续使用的机制。

线程池啥的就不多说了,这里主要分析一下ConnetionPool如何维护已打开的连接。从ConnetionPool#put着手:

  void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) { // 如果当前不在执行清理任务,那么现在执行
      cleanupRunning = true;
      executor.execute(cleanupRunnable); // 线程池的作用就是执行清理任务
    }
    connections.add(connection); // 同时添加到连接队列中
  }

cleanupRunnable源码如下,根据ConnetionPool#put可知每当我们往连接池中添加一个连接时,如果当前不在执行清理任务(cleanupRunnable),那么立马会执行cleanupRunnable,而cleanupRunnable中会循环执行cleanup,直到所有连接都因闲置超时而被清理掉,具体还请先看注释。

private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime()); // 执行清理
        if (waitNanos == -1) return; // cleanup中的情况 4)
        if (waitNanos > 0) {  // cleanup中的情况 2) 和 3)  
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos); // 等待超时
            } catch (InterruptedException ignored) {
            }
          }
        }
        // 至此情况 1), 2), 3) 都会导致 `cleanup`被循环执行
      }
    }
  };

cleanup源码如下,它的作用是查找、清理超过了keep-alive时间限制或者闲置超时闲置的连接。具体还请看注释。

long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
      // 1. 查找超时连接
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();
        // 1). 如果正在使用,那么跳过,继续查找
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }

        idleConnectionCount++; // 记录闲置连接的数量

        // 2). 如果闲置时间超过了最大允许闲置时间,则记录下来在后面清除
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      // 2. 查找完成了,将在这里对闲置连接进行处理
      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // 1). 确定已经超时了,那么从连接池中清除,关闭动作会在同步块外面进行
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // 2). 存在闲置连接,但是尚未超时
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        // 3). 如果所有连接都正在使用,那么最多保持个`keep-alive`超时时间就又会重新执行清理动作
        return keepAliveDurationNs;
      } else {
        // 4). 压根没有连接,那不管了,标记为非清理状态,并返回-1
        cleanupRunning = false;
        return -1;
      }
    }
    // 3. 关闭上面查找到的处于情况1)的闲置超时连接
    closeQuietly(longestIdleConnection.socket());
    // 返回 0 ,表示马上会重新回来执行清理操作
    return 0;
  }

综上,ConnectionPool 会设定keepAliveDurationNslongestIdleDurationNs两个超时时间,而每次往连接池中添加一个新连接时,如果当前处于非清理装填,都会导致线程池执行器开个线程执行清理动作,而对于清理动作而言,会遍历连接池,查找闲置超时的连接,并记录闲置连接的数量,而遍历完成后,将根据情况 2. 1)、2)、3)、4) 进行相应的处理,而如果是情况 2. 4), 则会当即结束清理循环,意味着连接池中已经没有连接了,此时线程会执行完成而退出,其他几种情况都不会中断循环,因此实际上这个线程池最多只会存在一个连接池维护线程。

四、总结

一般来说,当使用OKHttp通过URL请求时,它做了以下事情:

  • 使用URL并且配置OKHttpClient来创建地址(Address),这个地址指定了我们连接web服务器的方式。
  • 尝试从连接池(ConnectionPool)中取出与该地址相同的连接。
  • 如果在连接池中没有对应该地址的连接,那么它会选择一条新路线(route)去尝试,这通常意味着将进行DNS请求以获取对应服务器的IP地址,然后如果需要的话还会选择TLS版本和代理服务器。
  • 如果这是一条新路线,它会通过Socket连、TLS隧道(HTTP代理的HTTPS)或者TLS连接,然后根据需要进行TCPTLS握手。
  • 发送HTTP请求,然后读取响应数据。

如果连接出现问题,OKHttp会选择另一条路线再次尝试,这使得OKHttp在服务器地址子集无法访问时能够恢复,而当从连接池中拿到的连接已经过期,或者TLS版本不支持的情况下,这种方式同样很有用。一旦接收到响应数据,该连接就会返回到连接池中以备后续请求使用,而连接池中的连接也会在一定时间的不活动状态后被清除掉。

对于整体框架而言,本文已经详细分析了OkHttp的整体工作流程,相关细节还请回到文中去,这里就不再累赘了。

相关文章

网友评论

    本文标题:OkHttp源码解析

    本文链接:https://www.haomeiwen.com/subject/hbqisqtx.html