OkHttp工作流程

作者: leap_ | 来源:发表于2020-01-28 21:14 被阅读0次
  • 分发器
  • 线程池
  • 拦截器
  • 五大拦截器

分发器

Dispatcher类,这个类的作用用于分发提交的网络任务,高并发任务分发和线程池排队;

Dispatcher的工作流程:

首先介绍Dispatcher类的一些核心成员

  • ready队列和running队列:
  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
  • 最大并发数和单位域名最大并发:默认是64和5,可以自定义
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
分发器异步请求工作流程

dispatcher.enqueue()

  synchronized void enqueue(AsyncCall call) {
    //  比较
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      runningAsyncCalls.add(call);
      executorService().execute(call);
    } else {
      readyAsyncCalls.add(call);
    }
  }

promoteCalls()

  private void promoteCalls() {
    if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }

      if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
    }
  }

线程池

OKHttp内部的线程池:

executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
  • 核心线程数:0;无核心线程数,不缓存线程
  • 阻塞队列:SynchronousQueue;SynchronousQueue的特点是没有容量

当提交了一些任务后,阻塞队列不能缓存任务,会直接新建线程去执行,从而完成了无等待,高并发的特点

拦截器

拦截器是OKHttp最核心的部分,当一个网络任务由分发器提交给线程池后,会交给拦截器处理;

拦截器工作流程

与拦截器相关的核心API:

Chain接口和Intercept接口
public interface Interceptor {
  Response intercept(Chain chain) throws IOException;

  interface Chain {
    Request request();

    Response proceed(Request request) throws IOException;

    /**
     * Returns the connection the request will be executed on. This is only available in the chains
     * of network interceptors; for application interceptors this is always null.
     */
    @Nullable Connection connection();

    Call call();

    int connectTimeoutMillis();

    Chain withConnectTimeout(int timeout, TimeUnit unit);

    int readTimeoutMillis();

    Chain withReadTimeout(int timeout, TimeUnit unit);

    int writeTimeoutMillis();

    Chain withWriteTimeout(int timeout, TimeUnit unit);
  }
}

intercept接口是拦截器实现的接口,自带五个默认的拦截器都实现了这个接口,chain接口是责任链模式的接口,拦截器的调用是使用责任链模式(单一责任原则)调用的

chain接口的实现类:RealInterceptorChain
  private final List<Interceptor> interceptors;
  private final int index;

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {

....

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

....

    return response;
  }

RealInterceptorChain(一下简称realChain)是拦截器责任链模式的主要实现类,他有两个重要的成员变量 interceptors & index集合用来存放所有需要执行的拦截器,index作为下标从0开始一个个的遍历集合;

getResponseWithInterceptorChain():
    //   获取网络请求的Response
  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors. 添加拦截器到list中
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));
    // 通过list创建RealInterceptChain
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());
    //  调用RealChain的proceed方法
    return chain.proceed(originalRequest);
  }

我们从线程池的execute()开始分析拦截器的工作流程,首先调用getResponseWithInterceptorChain()添加自定义的拦截器和五个OKHTTP自带的拦截器,然后创建一个index为0的realChain,调用realChain(index=0)的proceed();
在proceed方法中获取当前index下标所指的intercept,创建一个新的realChain(index+1=2),执行拦截器的intercept方法,并传入新的realChain;

RetryAndFollowUpInterceptor.intercept(realChain(index = 1))
  @Override public Response intercept(Chain chain) throws IOException {
...
      try {
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } 
...
  }

在intercept中调用了责任链的proceed(),此时的index+1了,如此往返将责任链遍历下去,执行每一个拦截器;

五大拦截器

RetryAndFollowUpInterceptor:重试和重定向

  • retry重试
  • followUp重定向

当请求发生异常时,RetryAndFollowUpInterceptor会根据异常的条件判断是否需要重试;

RetryAndFollowUpInterceptor
RetryAndFollowUpInterceptor.intercept
  @Override public Response intercept(Chain chain) throws IOException {
...
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {   // 如果是路由异常/io异常就通过recover()判断是否需要重试
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {  // 如果是路由异常/io异常就通过recover()判断是否需要重试
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }
    }
...
  }

在while循环中catch RouteException & IOException 用recover()判断这个异常是否需要重试,如果需要就continue,不需要则抛出异常结束循环;

RetryAndFollowUpInterceptor.recover()
private boolean recover(IOException e, StreamAllocation streamAllocation,
      boolean requestSendStarted, Request userRequest) {
    streamAllocation.streamFailed(e);

    // The application layer has forbidden retries.      client配置是否允许重试()默认允许
    if (!client.retryOnConnectionFailure()) return false;

    // We can't send the request body again.        // http2.0才可能不通过
    if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;

    // This exception is fatal.      判断是否是重大异常
    if (!isRecoverable(e, requestSendStarted)) return false;

    // No more routes to attempt.  当前域名只有一个ip
    if (!streamAllocation.hasMoreRoutes()) return false;

    // For failure recovery, use the same route selector with a new connection.
    return true;
  }
重大异常的判断
 private boolean isRecoverable(IOException e, boolean requestSendStarted) {
    // If there was a protocol problem, don't recover.
    if (e instanceof ProtocolException) {      // 协议异常 ,不重试
      return false;
    }

    // If there was an interruption don't recover, but if there was a timeout connecting to a route
    // we should try the next route (if there is one).
    if (e instanceof InterruptedIOException) {    
      return e instanceof SocketTimeoutException && !requestSendStarted;     //  是Socket超时异常直接返回true重试,不是继续往下判断
    }

    // Look for known client-side or negotiation errors that are unlikely to be fixed by trying
    // again with a different route.
    if (e instanceof SSLHandshakeException) {
      // If the problem was a CertificateException from the X509TrustManager,
      // do not retry.
      if (e.getCause() instanceof CertificateException) {        //  证书异常,不重试
        return false;
      }
    }
    if (e instanceof SSLPeerUnverifiedException) {
      // e.g. a certificate pinning error.
      return false;
    }

    // An example of one we might want to retry with a different route is a problem connecting to a
    // proxy and would manifest as a standard IOException. Unless it is one we know we should not
    // retry, we return true and try a new route.
    return true;
  }

总的来说,如果是服务端的问题(证书异常,协议异常)就不会重试,因为再重试也是一样结果,而如果是网络波动超时异常,就会触发拦截器重试(需要满足其他的要求);

当重试环节结束后(即while循环跳出后),会进入重定向阶段followUp,非重点,了解就好

重定向:

客户端向服务器发送一个请求,服务端返回一个url给客户端,客户端访问这个新的url,叫做重定向,这个url放在响应头的Location中;

    while (true) {
...
      try {
        //  循环执行新的重定向
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } 

      //  通过响应码判定是否有重定向
      Request followUp = followUpRequest(response, streamAllocation.route());
      // 如果没有直接返回response
      if (followUp == null) {
        if (!forWebSocket) {
          streamAllocation.release();
        }
        return response;
      }
      //  重定向次数限制
      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }
      //  重定向
      request = followUp;
      priorResponse = response;
...
    }

BridgeInterceptor:桥接

最简单的拦截器,主要的功能:

  • 补充请求头
  • 设置和读取Cookie
  • 设置gzip压缩和解压(Accept-Encoding/Content-Encoding)

CacheInterceptor:缓存

缓存拦截器内部有一个缓存策略类,他有两个重要的成员变量:

  /** The request to send on the network, or null if this call doesn't use the network. */
  public final @Nullable Request networkRequest;

  /** The cached response to return or validate; or null if this call doesn't use a cache. */
  public final @Nullable Response cacheResponse;

networkRequest用来表示这个请求的网络访问请求细节(Request),cacheResponse用来表示这个请求的缓存(如果没有为null);

1 cache == null && request == null

缓存为空,请求为空,fail,直接返回一个504空的response

2 cache == null && request != null

缓存为空,请求不空,访问服务器

3 cache != null && request == null

缓存不为空,请求为空,使用缓存

4 cache != null && request != null

缓存不为空,请求不为空,访问服务器,对比缓存(响应码304)

  @Override public Response intercept(Chain chain) throws IOException {
    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    long now = System.currentTimeMillis();
    
    //   通过 CacheStrategy  获取 cache 和 request
    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

    if (cache != null) {
      cache.trackResponse(strategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    //  第一种情况,缓存为空,请求为空
    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {
      return new Response.Builder()
          .request(chain.request())
          .protocol(Protocol.HTTP_1_1)
          .code(504)
          .message("Unsatisfiable Request (only-if-cached)")
          .body(Util.EMPTY_RESPONSE)
          .sentRequestAtMillis(-1L)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build();
    }

    //  request为空,直接使用缓存, (缓存为空则return null)
    // If we don't need the network, we're done.
    if (networkRequest == null) {
      return cacheResponse.newBuilder()
          .cacheResponse(stripBody(cacheResponse))
          .build();
    }

    //   request不为空, 使用责任链发起请求,获取networkResponse 
    Response networkResponse = null;
    try {
      networkResponse = chain.proceed(networkRequest);
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (networkResponse == null && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }
  
     //  第四种情况,request 不为空, 缓存也不为空, 对比缓存
    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (networkResponse.code() == HTTP_NOT_MODIFIED) {
        Response response = cacheResponse.newBuilder()
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .sentRequestAtMillis(networkResponse.sentRequestAtMillis())
            .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        cache.trackConditionalCacheHit();
        cache.update(cacheResponse, response);
        return response;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }

    Response response = networkResponse.newBuilder()
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (cache != null) {
      if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
        // Offer this request to the cache.
        CacheRequest cacheRequest = cache.put(response);
        return cacheWritingResponse(cacheRequest, response);
      }

      if (HttpMethod.invalidatesCache(networkRequest.method())) {
        try {
          cache.remove(networkRequest);
        } catch (IOException ignored) {
          // The cache cannot be written.
        }
      }
    }

    return response;
  }

缓存策略的实现:

我们先看一下缓存拦截器的缓存策略是如何使用;

    Response cacheCandidate = cache != null
        ? cache.get(chain.request())
        : null;

    CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
    Request networkRequest = strategy.networkRequest;
    Response cacheResponse = strategy.cacheResponse;

使用了内部类的get方法获取CacheStrategy 对象;

在缓存策略的内部有一个Factory内部类:在构造Factory时Factory的成员变量用来存储response的一些基本信息备用Date Expires Last-Modified ETag Age

  public static class Factory {
    final long nowMillis;
    final Request request;
    final Response cacheResponse;

    /** The server's time when the cached response was served, if known. */
    private Date servedDate;
    private String servedDateString;

    /** The last modified date of the cached response, if known. */
    private Date lastModified;
    private String lastModifiedString;

    /**
     * The expiration date of the cached response, if known. If both this field and the max age are
     * set, the max age is preferred.
     */
    private Date expires;

    /**
     * Extension header set by OkHttp specifying the timestamp when the cached HTTP request was
     * first initiated.
     */
    private long sentRequestMillis;

    /**
     * Extension header set by OkHttp specifying the timestamp when the cached HTTP response was
     * first received.
     */
    private long receivedResponseMillis;

    /** Etag of the cached response. */
    private String etag;

    /** Age of the cached response. */
    private int ageSeconds = -1;

    public Factory(long nowMillis, Request request, Response cacheResponse) {
      this.nowMillis = nowMillis;
      this.request = request;
      this.cacheResponse = cacheResponse;

      if (cacheResponse != null) {
        this.sentRequestMillis = cacheResponse.sentRequestAtMillis();
        this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis();
        Headers headers = cacheResponse.headers();
        for (int i = 0, size = headers.size(); i < size; i++) {
          String fieldName = headers.name(i);
          String value = headers.value(i);
          if ("Date".equalsIgnoreCase(fieldName)) {
            servedDate = HttpDate.parse(value);
            servedDateString = value;
          } else if ("Expires".equalsIgnoreCase(fieldName)) {
            expires = HttpDate.parse(value);
          } else if ("Last-Modified".equalsIgnoreCase(fieldName)) {
            lastModified = HttpDate.parse(value);
            lastModifiedString = value;
          } else if ("ETag".equalsIgnoreCase(fieldName)) {
            etag = value;
          } else if ("Age".equalsIgnoreCase(fieldName)) {
            ageSeconds = HttpHeaders.parseSeconds(value, -1);
          }
        }
      }
    }
  }

get():

    public CacheStrategy get() {
      CacheStrategy candidate = getCandidate();

      if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
        // We're forbidden from using the network and the cache is insufficient.
        return new CacheStrategy(null, null);
      }

      return candidate;
    }

getCandidate():

    private CacheStrategy getCandidate() {
      // No cached response.
      if (cacheResponse == null) {
        return new CacheStrategy(request, null);
      }

      // Drop the cached response if it's missing a required handshake.
      if (request.isHttps() && cacheResponse.handshake() == null) {
        return new CacheStrategy(request, null);
      }

      // If this response shouldn't have been stored, it should never be used
      // as a response source. This check should be redundant as long as the
      // persistence store is well-behaved and the rules are constant.
      if (!isCacheable(cacheResponse, request)) {
        return new CacheStrategy(request, null);
      }

      CacheControl requestCaching = request.cacheControl();
      if (requestCaching.noCache() || hasConditions(request)) {
        return new CacheStrategy(request, null);
      }

      CacheControl responseCaching = cacheResponse.cacheControl();
      if (responseCaching.immutable()) {
        return new CacheStrategy(null, cacheResponse);
      }

      long ageMillis = cacheResponseAge();
      long freshMillis = computeFreshnessLifetime();

      if (requestCaching.maxAgeSeconds() != -1) {
        freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds()));
      }

      long minFreshMillis = 0;
      if (requestCaching.minFreshSeconds() != -1) {
        minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds());
      }

      long maxStaleMillis = 0;
      if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) {
        maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds());
      }

      if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
        Response.Builder builder = cacheResponse.newBuilder();
        if (ageMillis + minFreshMillis >= freshMillis) {
          builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"");
        }
        long oneDayMillis = 24 * 60 * 60 * 1000L;
        if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
          builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"");
        }
        return new CacheStrategy(null, builder.build());
      }

      // Find a condition to add to the request. If the condition is satisfied, the response body
      // will not be transmitted.
      String conditionName;
      String conditionValue;
      if (etag != null) {
        conditionName = "If-None-Match";
        conditionValue = etag;
      } else if (lastModified != null) {
        conditionName = "If-Modified-Since";
        conditionValue = lastModifiedString;
      } else if (servedDate != null) {
        conditionName = "If-Modified-Since";
        conditionValue = servedDateString;
      } else {
        return new CacheStrategy(request, null); // No condition! Make a regular request.
      }

      Headers.Builder conditionalRequestHeaders = request.headers().newBuilder();
      Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue);

      Request conditionalRequest = request.newBuilder()
          .headers(conditionalRequestHeaders.build())
          .build();
      return new CacheStrategy(conditionalRequest, cacheResponse);
    }

getCandidate()是获取CacheStrategy 类的核心代码:

缓存策略的实现流程(设置`networkRequest`与`cacheResponse`的组合)

ConnectInterceptor 连接

连接拦截器的作用是建立Socket连接和Socket连接池的复用;

相关类 作用
StreamAllocation 协调请求、连接与数据流三者之间的关系,它负责为一次请求寻找连接,然后获得流来实现网络通信,简单来说就是维护连接,在重定向拦截器创建,连接拦截器使用
RealConnection 封装了Socket与一个Socket连接池,建立与服务器的Socket连接
ConnectionPool Socket连接池,复用Socket连接,关闭一定时间内空闲的连接
Socket连接的建立:
三种连接建立情况
  • 无代理:直接向Socket指定目标服务器
  • Socket代理:向Socket传入Proxy对象,Proxy指定代理服务器,Socket连接目标服务器
  • HTTP代理:Socket连接代理服务器,请求报文请求行的第二个参数加上目标服务器域名前置
  • HTTPS代理:Socket连接代理服务器,请求报文需要先请求CONNECT,服务器返回connect成功,再包装一层SSLSocket与服务器通信
连接拦截器Socket建立过程

连接拦截器类并没有多少代码,大部分的逻辑都被封装在了RealConnection和StreamAllocation中,从connect()开始,如果是HTTPS请求,会创建隧道代理,隧道代理的功能代理服务器可以发出身份质疑,关闭连接,保证HTTPS的安全性

HTTPS代理请求CONNECT

CallServerInterceptor 请求服务

将request拼成请求报文,将服务器的响应报文转化成response

一般出现于上传大容量请求体或者需要验证。代表了先询问服务器是否原因接收发送请求体数据,
OkHttp的做法:
如果服务器允许则返回100,客户端继续发送请求体;
如果服务器不允许则直接返回给用户。
同时服务器也可能会忽略此请求头,一直无法读取应答,此时抛出超时异常。

相关文章

网友评论

    本文标题:OkHttp工作流程

    本文链接:https://www.haomeiwen.com/subject/zkwuzctx.html