美文网首页
关于OkHttp(二)

关于OkHttp(二)

作者: MY1112 | 来源:发表于2019-02-23 17:21 被阅读0次

前文 我们主要对OkHttp涉及到基础类进行了介绍,这里我们来聊聊OkHttp最核心的,拦截器机制,也就是关于OkHttp(一)中提到的核心方法。Dispatcher分发网络请求进入队列,然后进入线程池,AsyncCall最后执行到 execute()方法,最后就会执行getResponseWithInterceptorChain() 方法。

//核心代码 开始真正的执行网络请求
  Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    //责任链
    List<Interceptor> interceptors = new ArrayList<>();
    //在配置okhttpClient 时设置的intercept 由用户自己设置
    interceptors.addAll(client.interceptors());
    //负责处理失败后的重试与重定向
    interceptors.add(retryAndFollowUpInterceptor);
    //负责把用户构造的请求转换为发送到服务器的请求 、把服务器返回的响应转换为用户友好的响应 处理 配置请求头等信息
    //从应用程序代码到网络代码的桥梁。首先,它根据用户请求构建网络请求。然后它继续呼叫网络。最后,它根据网络响应构建用户响应。
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //处理 缓存配置 根据条件(存在响应缓存并被设置为不变的或者响应在有效期内)返回缓存响应
    //设置请求头(If-None-Match、If-Modified-Since等) 服务器可能返回304(未修改)
    //可配置用户自己设置的缓存拦截器
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //连接服务器 负责和服务器建立连接 这里才是真正的请求网络
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      //配置okhttpClient 时设置的networkInterceptors
      //返回观察单个网络请求和响应的不可变拦截器列表。
      interceptors.addAll(client.networkInterceptors());
    }
    //执行流操作(写出请求体、获得响应数据) 负责向服务器发送请求数据、从服务器读取响应数据
    //进行http请求报文的封装与请求报文的解析
    interceptors.add(new CallServerInterceptor(forWebSocket));

    //创建责任链
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    //执行责任链
    return chain.proceed(originalRequest);
  }

在List<Interceptor> interceptors中连续add多个Interceptor,最后把这个 interceptor list作为参数传入到RealInterceptorChain的一个对象,另外还要特别注意一下第4个参数0,然后让chain执行proceed()方法,我们看一下这个proceed()方法

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

其中RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec, connection, index + 1, request, call, eventListener, connectTimeout, readTimeout, writeTimeout);
这个next的Chain的对象是在构造函数中把上个Chain传进来的interceptors再传到下个Chain里,只是第4个参数由刚才的index(第一次是0)变成了index+1(当前是1),当前的interceptor依然是 interceptors.get(0),next Chain的interceptor就是 interceptors.get(1),当前interceptor执行intercept(next)方法,进入下个Chain,如此反复,这样就形成了一个递归调用,直到递归结束,返回Response。

那么什么时候会结束呢? 理论上当然是执行到最后一个interceptor了,但是每个interceptor都会有相关的业务处理,如果在某个interceptor里触发了某个具体的条件,让interceptor不再执行proceed()方法,这个递归也会结束,然后返回Response。这个流程的代码设计就是责任链模式,如果上一级不处理该业务,就直接转到下一级,直到有处理为止。

我们再回顾一下上文中add interceptor的代码,发现最先add的interceptor是client.interceptors(),这个client.interceptors()是我们开发者调用OkHttp时自定义的interceptor(比如比较常用的自定义LogInterceptor),而最后add的是CallServerInterceptor这个interceptor,所以CallServerInterceptor里肯定是没有执行proceed()方法的,大家也可以看源码。

下面我们来具体分析一下各个Interceptor:

  • RetryAndFollowUpInterceptor,负责重试和重定向;
  • BridgeInterceptor,首先将应用层的数据类型转换为网络调用层的数据类型,然后将网络层返回的数据类型转换为应用层的数据类型;
  • CacheInterceptor,负责读取缓存,更新缓存;
  • ConnectInterceptor,负责和服务器建立起链接;
  • NetworkInterceptors,client设置的networkInterceptor;
  • CallServerInterceptor,负责向服务器发送请求数据、从服务器读取响应数据;

分析Interceptor的源码主要就是分析里面intercept(Chain chain)这个方法。

CallServerInterceptor

我们先来看看CallServerInterceptor,还是比较简单的:

@Override 
public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    HttpCodec httpCodec = realChain.httpStream();
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request();

    long sentRequestMillis = System.currentTimeMillis();
    //添加头部信息
    realChain.eventListener().requestHeadersStart(realChain.call());
    httpCodec.writeRequestHeaders(request);
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);
      }
      //写入请求体
      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
        // from being reused. Otherwise we're still obligated to transmit the request body to
        // leave the connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();
    //读取响应头
    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();

    int code = response.code();
    if (code == 100) {
      // server sent a 100-continue even though we did not request one.
      // try again to read the actual response
      responseBuilder = httpCodec.readResponseHeaders(false);

      response = responseBuilder
              .request(request)
              .handshake(streamAllocation.connection().handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();

      code = response.code();
    }

    realChain.eventListener()
            .responseHeadersEnd(realChain.call(), response);
    //读取响应体
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }

    return response;
  }

写入请求头:

httpCodec.writeRequestHeaders(request);

最后会进入Http1Codec的

  /** Returns bytes of a request header for sending on an HTTP transport. */
  public void writeRequest(Headers headers, String requestLine) throws IOException {
    if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
    sink.writeUtf8(requestLine).writeUtf8("\r\n");
    for (int i = 0, size = headers.size(); i < size; i++) {
      sink.writeUtf8(headers.name(i))
          .writeUtf8(": ")
          .writeUtf8(headers.value(i))
          .writeUtf8("\r\n");
    }
    sink.writeUtf8("\r\n");
    state = STATE_OPEN_REQUEST_BODY;
  }

这个方法里先写入请求行,然后循环写入请求头。Sink是Okio里面的内容,可以理解为outputstream。

写入请求体:

...
if (responseBuilder == null) {
  Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
  BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
  request.body().writeTo(bufferedRequestBody);
  bufferedRequestBody.close();
}
...

这个比较简单,就是把body写入流。

读响应头:

@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
    if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
      throw new IllegalStateException("state: " + state);
    }

    try {
      StatusLine statusLine = StatusLine.parse(readHeaderLine());

      Response.Builder responseBuilder = new Response.Builder()
          .protocol(statusLine.protocol)
          .code(statusLine.code)
          .message(statusLine.message)
          .headers(readHeaders());

      if (expectContinue && statusLine.code == HTTP_CONTINUE) {
        return null;
      } else if (statusLine.code == HTTP_CONTINUE) {
        state = STATE_READ_RESPONSE_HEADERS;
        return responseBuilder;
      }

      state = STATE_OPEN_RESPONSE_BODY;
      return responseBuilder;
    } catch (EOFException e) {
      // Provide more context if the server ends the stream before sending a response.
      IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
      exception.initCause(e);
      throw exception;
    }
  }

解析相应头:

/** Reads headers or trailers. */
  public Headers readHeaders() throws IOException {
    Headers.Builder headers = new Headers.Builder();
    // parse the result headers until the first blank line
    for (String line; (line = readHeaderLine()).length() != 0; ) {
      Internal.instance.addLenient(headers, line);
    }
    return headers.build();
  }

先创建一个Header.Builder,然后一行行地读出来,再放入一个list里,最后build一下转成Headers。

读取响应体:

解析完头信息后,根据状态信息(code)来判断是否响应成功,然后打开响应body

@Override 
public ResponseBody openResponseBody(Response response) throws IOException {
    streamAllocation.eventListener.responseBodyStart(streamAllocation.call);
    String contentType = response.header("Content-Type");

    if (!HttpHeaders.hasBody(response)) {
      Source source = newFixedLengthSource(0);
      return new RealResponseBody(contentType, 0, Okio.buffer(source));
    }

    if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
      Source source = newChunkedSource(response.request().url());
      return new RealResponseBody(contentType, -1L, Okio.buffer(source));
    }

    long contentLength = HttpHeaders.contentLength(response);
    if (contentLength != -1) {
      Source source = newFixedLengthSource(contentLength);
      return new RealResponseBody(contentType, contentLength, Okio.buffer(source));
    }

    return new RealResponseBody(contentType, -1L, Okio.buffer(newUnknownLengthSource()));
  }

注意:这里并没有真正把数据解析出来,只是关联起了okio的Source而已。

ConnectInterceptor

@Override
 public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();

    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }

这个类的作用是负责和服务器建立连接,代码比较简单,但是很多比较重要的逻辑都封装在StreamAllocation,ConnectionPool和RealConnection等类里了。

  1. StreamAllocation对象的初始化是在RetryAndFollowUpInterceptor里完成的;
  2. 真正建立连接是在streamAllocation.newStream(…)方法里:
public HttpCodec newStream(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }
  1. 再进入findHealthyConnection(...)方法:
/**
   * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
   * until a healthy connection is found.
   */
  private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
      boolean doExtensiveHealthChecks) throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }

这个方法就是想得到一个Connection对象,里面有个while(true){}循环,不断地从findConnection方法中获取一个候选的Connection对象,然后进行判断这个Connection是否是全新的连接,如果是就直接返回,否则要做isHealthy(...)检查,这个Healthy检查主要是查看当前Connection的(1)Socket是否close,(2)Socket的Input或者Output是否shutdown,(3)BufferedSource是否exhausted,如果Connection isHeathy则返回,否则继续执行循环。

  1. 再进入findConnection(...)方法:
 /**
   * Returns a connection to host a new stream. This prefers the existing connection if it exists,
   * then the pool, finally building a new connection.
   */
  private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    Connection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      // Attempt to use an already-allocated connection. We need to be careful here because our
      // already-allocated connection may have been restricted from creating new streams.
      //先尝试用已经分配的connection,但是我们需要做一些检查,因为有的connection在刚create的时候会被
      //限制(restricted )
      releasedConnection = this.connection;
      toClose = releaseIfNoNewStreams();
      if (this.connection != null) {
        // We had an already-allocated connection and it's good.
        // 已分配的connection通过了初步检查,是合法的。
        result = this.connection;
        releasedConnection = null;
      }
      if (!reportedAcquired) {
        // If the connection was never reported acquired, don't report it as released!
        releasedConnection = null;
      }

      //如果上面的检查导致已分配的connection不能用,下面就会尝试从connectionPool里获取一个connection
      if (result == null) {
        // Attempt to get a connection from the pool.
        //下面这行代码看似简单,其实意义也简单,就是根据address的信息从connectionPool里获取一个可复
        //用connection,但是代码的执行流程我们一定要梳理透彻。Internal.instance是OkHttpClient里的static
        //代码块,Internal.instance的acquire方法(传入的3个参数意义重大,尤其是第三个参数this)会走到 
        //ConnectionPool的acquire方法,最后又回到了当前 StreamAllocation的acquire方法,这个过程会获取 
        //一个可复用的connection
        Internal.instance.acquire(connectionPool, address, this, null);
        if (connection != null) {
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;
        }
      }
    }
    closeQuietly(toClose);

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
    }
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      // 如果截止这里有可复用的connection,就返回这个connection。
      return result;
    }

    // If we need a route selection, make one. This is a blocking operation.
    // 这里是route选择,是对多ip的支持
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        // 遍历多个ip的路由,根据route,address设置connection到connectionPool里,然后从connectionPool
        // 里拿到可复用的connection
        List<Route> routes = routeSelection.getAll();
        for (int i = 0, size = routes.size(); i < size; i++) {
          Route route = routes.get(i);
          Internal.instance.acquire(connectionPool, address, this, route);
          if (connection != null) {
            foundPooledConnection = true;
            result = connection;
            this.route = route;
            break;
          }
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        route = selectedRoute;
        refusedStreamCount = 0;
        result = new RealConnection(connectionPool, selectedRoute);
        acquire(result, false);
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    // 如果以上还没有找到可复用的connection,那就新建一个Connection,进行 TCP + TLS握手,这是个阻  
    // 塞操作
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      reportedAcquired = true;

      // Pool the connection.
      // 把新建的connection放入到connectionPool里
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      // 如果是HTTP2,与此同时该address的另外一个多路连接同时被创建,那就释放这个连接,用那个连  
      // 接。
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
  }

这个方法很重要,也很长,上文我们也进行了简单的注释,下面再把大致流程说明一下:
(1). 判断当前StreamAllocation对象是否已经有一个Connection对象了,如果有就可以把这个对象给下面的代码用了;
(2). 如果(1)中没有获取到Connection,那就尝试从ConnectionPool获取一个可复用Connection;
(3). 如果(2)中没有获取到Connection,那就对路由列表进行遍历,试图从路由列表里找到多IP的支持,然后再尝试从ConnectionPool获取一个可复用Connection;
(4). 如果(3)中没有获取到Connection,那就new一个Connection对象,进行tcp+tsl握手连接;
(5). 将(4)中new出来的Connection对象放入到ConnectionPool;
(6). 如果是http2的连接,发现有新的多路连接被同时创建就用新创建的那个connection,把刚才放入ConnectionPool的connection 释放掉(获取到其socket,然后close掉);
(7). 最后返回这个connection。

在这个过程中我们还有几个方法要简单阐述:

public void connect(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
      EventListener eventListener) {
    if (protocol != null) throw new IllegalStateException("already connected");

    RouteException routeException = null;
    List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

    if (route.address().sslSocketFactory() == null) {
      if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication not enabled for client"));
      }
      String host = route.address().url().host();
      if (!Platform.get().isCleartextTrafficPermitted(host)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication to " + host + " not permitted by network security policy"));
      }
    } else {
      if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
        throw new RouteException(new UnknownServiceException(
            "H2_PRIOR_KNOWLEDGE cannot be used with HTTPS"));
      }
    }
    
    //开始连接
    while (true) {
      try {
        if (route.requiresTunnel()) {
          //通道连接,一般不会走这里,这个Tunnel主要是指SSL且有HTTP proxy
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
          if (rawSocket == null) {
            // We were unable to connect the tunnel but properly closed down our resources.
            break;
          }
        } else {
          //一般都是Socket连接
          connectSocket(connectTimeout, readTimeout, call, eventListener);
        }
        // https建立
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
        eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
        break;
      } catch (IOException e) {
        closeQuietly(socket);
        closeQuietly(rawSocket);
        socket = null;
        rawSocket = null;
        source = null;
        sink = null;
        handshake = null;
        protocol = null;
        http2Connection = null;

        eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);

        if (routeException == null) {
          routeException = new RouteException(e);
        } else {
          routeException.addConnectException(e);
        }

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException;
        }
      }
    }

    if (route.requiresTunnel() && rawSocket == null) {
      ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
          + MAX_TUNNEL_ATTEMPTS);
      throw new RouteException(exception);
    }

    if (http2Connection != null) {
      synchronized (connectionPool) {
        allocationLimit = http2Connection.maxConcurrentStreams();
      }
    }
  }

这个就是新建一个connection,进行连接的方法,一般都是进入connectSockt()方法

/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket.
     在java原Socket上构建完整的HTTP或者HTTPS连接
 */
  private void connectSocket(int connectTimeout, int readTimeout, Call call,
      EventListener eventListener) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();

    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);

    eventListener.connectStart(call, route.socketAddress(), proxy);
    rawSocket.setSoTimeout(readTimeout);
    try {
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }

    // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
    // More details:
    // https://github.com/square/okhttp/issues/3245
    // https://android-review.googlesource.com/#/c/271775/
    try {
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
      }
    }
  }

然后会进入 Platform.get().connectSocket(...)方法,最后会进行Socket连接。

//okhttp3.internal.platform.Platform
public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout)
      throws IOException {
    socket.connect(address, connectTimeout);
  }

//java.net.Socket
public void connect(SocketAddress endpoint, int timeout) throws IOException {
        if (endpoint == null)
            throw new IllegalArgumentException("connect: The address can't be null");

        if (timeout < 0)
          throw new IllegalArgumentException("connect: timeout can't be negative");

        if (isClosed())
            throw new SocketException("Socket is closed");

        if (!oldImpl && isConnected())
            throw new SocketException("already connected");

        if (!(endpoint instanceof InetSocketAddress))
            throw new IllegalArgumentException("Unsupported address type");

        InetSocketAddress epoint = (InetSocketAddress) endpoint;
        InetAddress addr = epoint.getAddress ();
        int port = epoint.getPort();
        checkAddress(addr, "connect");

        SecurityManager security = System.getSecurityManager();
        if (security != null) {
            if (epoint.isUnresolved())
                security.checkConnect(epoint.getHostName(), port);
            else
                security.checkConnect(addr.getHostAddress(), port);
        }
        if (!created)
            createImpl(true);
        if (!oldImpl)
            impl.connect(epoint, timeout);
        else if (timeout == 0) {
            if (epoint.isUnresolved())
                impl.connect(addr.getHostName(), port);
            else
                impl.connect(addr, port);
        } else
            throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
        connected = true;
        /*
         * If the socket was not bound before the connect, it is now because
         * the kernel will have picked an ephemeral port & a local address
         */
        bound = true;
    }
/**
   * Acquires a recycled connection to {@code address} for {@code streamAllocation}. If non-null
   * {@code route} is the resolved route for a connection.
   */
  void acquire(Address address, StreamAllocation streamAllocation, @Nullable Route route) {
    assert (Thread.holdsLock(this));
    for (RealConnection connection : connections) {
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return;
      }
    }
  }

这个方法是根据Address和Route这2个参数从Connection队列(connections)里找出合格的connection,然后把这个connection传递到StreamAllocation里。

/**
   * Returns true if this connection can carry a stream allocation to {@code address}. If non-null
   * {@code route} is the resolved route for a connection.
   */
  public boolean isEligible(Address address, @Nullable Route route) {
    // If this connection is not accepting new streams, we're done.
    if (allocations.size() >= allocationLimit || noNewStreams) return false;

    // If the non-host fields of the address don't overlap, we're done.
    if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;

    // If the host exactly matches, we're done: this connection can carry the address.
    if (address.url().host().equals(this.route().address().url().host())) {
      return true; // This connection is a perfect match.
    }

    // At this point we don't have a hostname match. But we still be able to carry the request if
    // our connection coalescing requirements are met. See also:
    // https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
    // https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/

    // 1. This connection must be HTTP/2.
    if (http2Connection == null) return false;

    // 2. The routes must share an IP address. This requires us to have a DNS address for both
    // hosts, which only happens after route planning. We can't coalesce connections that use a
    // proxy, since proxies don't tell us the origin server's IP address.
    if (route == null) return false;
    if (route.proxy().type() != Proxy.Type.DIRECT) return false;
    if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
    if (!this.route.socketAddress().equals(route.socketAddress())) return false;

    // 3. This connection's server certificate's must cover the new host.
    if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
    if (!supportsUrl(address.url())) return false;

    // 4. Certificate pinning must match the host.
    try {
      address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
    } catch (SSLPeerUnverifiedException e) {
      return false;
    }

    return true; // The caller's address can be carried by this connection.
  }

这个方法就是如何根据Address和Route判断connection是否合格的,其实主要是通过Host匹配,后面有一些关于HTTP2和关于代理的特殊处理,就不做细说了。

 void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

这个方法就是把一个Connection对象放入到connectionPool里,其实没什么要说的,但是这里没会涉及到一个属性cleanupRunnable,这个属性是用于回收连接池中的无效连接,所以这里又涉及到回收无效链接的方法。

/**
   * Performs maintenance on this pool, evicting the connection that has been idle the longest if
   * either it has exceeded the keep alive limit or the idle connections limit.
   *
   * <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
   * -1 if no further cleanups are required.
   */
  long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    //通过遍历找到要被驱逐(出栈)的connection,或者找到将要被过期驱逐的时间。
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        // If the connection is in use, keep searching.
        // 如果当前这个connection正在被使用,那就对for循环continue,找下个connection进行计算
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }

        idleConnectionCount++;

        // If the connection is ready to be evicted, we're done.
        // 计算空闲时间
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          // 如果空闲时间比longestIdleDurationNs还大,那就把这个idleDurationNs的值赋值到        
          // longestIdleDurationNs 
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      
      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // We've found a connection to evict. Remove it from the list, then close it below (outside
        // of the synchronized block).
        //最长空闲时间比保活时间大或者空闲connection数目已经达到最大了,就把当前这个connection移除。
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // A connection will be ready to evict soon.
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        // All connections are in use. It'll be at least the keep alive duration 'til we run again.
        // 如果所有connections都在使用,那我们只能进行下一次循环了,
        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use.
        // 如果没有连接池,那就返回-1
        cleanupRunning = false;
        return -1;
      }
    }

    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    return 0;
  }

这个就是从ConnectionPool里回收无效连接的方法。方法内使用了类似GC算法中的标记-擦除算法,标记处最不活跃的连接进行清除。步骤如下:
(1). 遍历connections队列,如果当前的connection对象正在被使用,continue,inUseConnectionCount+1
(2). 否则idleConnectionCount+1,同时判断当前的connection的空闲时间是否比已知的长,是的话把它记录下来
(3). 如果空闲时间超过了保活时间,或者当前空闲连接数超过了最大空闲连接数,说明这个connection对象需要被回收,将它从connections队列中移除,在代码最后两行进行关闭操作,同时返回0通知cleanupRunnable的线程马上继续执行cleanup方法
(4). 如果3不满足,判断idleConnectionCount是否大于0,是的话返回保活时间与空闲时间差keepAliveDurationNs - longestIdleDurationNs,cleanupRunnable的线程等待时间keepAliveDurationNs - longestIdleDurationNs后继续执行cleanup方法
(5). 如果4不满足,说明没有空闲连接,继续判断有没有正在使用的连接,有的话返回保活时间keepAliveDurationNs,提醒cleanupRunnable的线程至少等待时间keepAliveDurationNs后才需要继续执行cleanup方法
(6). 如果5也不满足,当前说明连接池中没有连接,返回-1,告诉cleanupRunnable的线程不需要再执行了。等待下次调用put方法时再执行。

更多的Interceptor请看https://www.jianshu.com/p/efec03010a38

相关文章

网友评论

      本文标题:关于OkHttp(二)

      本文链接:https://www.haomeiwen.com/subject/vtvpyqtx.html