美文网首页
探究OkHttpClient的运行原理(5---ConnectI

探究OkHttpClient的运行原理(5---ConnectI

作者: 零星瓢虫 | 来源:发表于2021-01-19 21:19 被阅读0次

    ConnectInterceptor

    ConnectInterceptor 即连接拦截器,此拦截器即和 Request 请求的拦截相关;

    查看 ConnectInterceptor的 intercept 方法;

      @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request(); // 拿到 Request 请求
        StreamAllocation streamAllocation = realChain.streamAllocation();// 获取 streamAllocation  实例,此实例在 RetryAndFollowUpInterceptor 进行创建过
    
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        boolean doExtensiveHealthChecks = !request.method().equals("GET");// 是否是 get 方法
        HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks); // 获取 HttpCodec  实例
        RealConnection connection = streamAllocation.connection(); // 拿到 RealConnection 实例
    
        return realChain.proceed(request, streamAllocation, httpCodec, connection);// 调用下一个拦截器
      }
    

    ConnectInterceptor 获取到 RetryAndFollowUpInterceptor 拦截器中创建的 StreamAllocation 实例,通过streamAllocation,获取到 HttpCodec 和 RealConnection 实例对象,最后传递相关对象参数调用到下一个拦截器;

    获取 HttpCodec 实例对象的方法;

      public HttpCodec newStream(
          OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
        int connectTimeout = chain.connectTimeoutMillis();// 连接超时时间
        int readTimeout = chain.readTimeoutMillis();// 读数据超时时间
        int writeTimeout = chain.writeTimeoutMillis();// 写数据超时时间
        int pingIntervalMillis = client.pingIntervalMillis();
        boolean connectionRetryEnabled = client.retryOnConnectionFailure();// 失败是否重连
    
        try {
          RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
              writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
          HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
    
          synchronized (connectionPool) {
            codec = resultCodec;
            return resultCodec;
          }
        } catch (IOException e) {
          throw new RouteException(e);
        }
      }
    
    

    newStream 通过 findHealthyConnection 方法获取到 RealConnection 实例对象,同时创建 HttpCodec 实例对象;

      private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
          int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
          boolean doExtensiveHealthChecks) throws IOException {
        while (true) {
          RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
              pingIntervalMillis, connectionRetryEnabled); // 查找 RealConnection
    
          // If this is a brand new connection, we can skip the extensive health checks.
          synchronized (connectionPool) {
            if (candidate.successCount == 0) {
              return candidate;
            }
          }
    
          // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
          // isn't, take it out of the pool and start again.
          if (!candidate.isHealthy(doExtensiveHealthChecks)) {
            noNewStreams();
            continue;
          }
    
          return candidate;
        }
      }
    
    
    

    继续调用 findConnection 方法获取 RealConnection 实例对象;

     /**
       * Returns a connection to host a new stream. This prefers the existing connection if it exists,
       * then the pool, finally building a new connection.
       * 返回 RealConnection 如果存在 connection 返回,否则重新创建一个 connection
       */
    
      private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
          int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException { 
        boolean foundPooledConnection = false;
        RealConnection result = null;
        Route selectedRoute = null;
        Connection releasedConnection;
        Socket toClose;
        synchronized (connectionPool) {
          if (released) throw new IllegalStateException("released");
          if (codec != null) throw new IllegalStateException("codec != null");
          if (canceled) throw new IOException("Canceled");
    
          // Attempt to use an already-allocated connection. We need to be careful here because our
          // already-allocated connection may have been restricted from creating new streams.
          // 试图去使用已经存在 connection
          releasedConnection = this.connection;
          toClose = releaseIfNoNewStreams();
          if (this.connection != null) { //  不为空,使用 connection
            // We had an already-allocated connection and it's good.
            result = this.connection;
            releasedConnection = null;
          }
          if (!reportedAcquired) { 
            // If the connection was never reported acquired, don't report it as released!
            releasedConnection = null;
          }
    
          if (result == null) { // 试图 从 connectionPool 获取一个  connection
            // Attempt to get a connection from the pool.
            Internal.instance.get(connectionPool, address, this, null);
            if (connection != null) {
              foundPooledConnection = true;
              result = connection;
            } else {
              selectedRoute = route;
            }
          }
        }
        closeQuietly(toClose);
    
        if (releasedConnection != null) { // 调用监听的回调
          eventListener.connectionReleased(call, releasedConnection);
        }
        if (foundPooledConnection) {// 调用监听的回调
          eventListener.connectionAcquired(call, result);
        }
        if (result != null) { // 使用到了 connectionPool 中的 connection
          // If we found an already-allocated or pooled connection, we're done.
          return result;
        }
    
        // If we need a route selection, make one. This is a blocking operation.
        // 创建路由,从新路由中去获取 connection
        boolean newRouteSelection = false;
        if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
          newRouteSelection = true;
          routeSelection = routeSelector.next();
        }
    
        synchronized (connectionPool) {
          if (canceled) throw new IOException("Canceled");
    
          if (newRouteSelection) {
            // Now that we have a set of IP addresses, make another attempt at getting a connection from
            // the pool. This could match due to connection coalescing.
            List<Route> routes = routeSelection.getAll();
            for (int i = 0, size = routes.size(); i < size; i++) {
              Route route = routes.get(i);
              Internal.instance.get(connectionPool, address, this, route);
              if (connection != null) {
                foundPooledConnection = true;
                result = connection;
                this.route = route;
                break;
              }
            }
          }
    
          if (!foundPooledConnection) { // 没有从 connectionPool 获取到 connection
            if (selectedRoute == null) {
              selectedRoute = routeSelection.next();
            }
    
            // Create a connection and assign it to this allocation immediately. This makes it possible
            // for an asynchronous cancel() to interrupt the handshake we're about to do.
            route = selectedRoute;
            refusedStreamCount = 0;
            result = new RealConnection(connectionPool, selectedRoute);
            acquire(result, false);
          }
        }
    
        // If we found a pooled connection on the 2nd time around, we're done.
        if (foundPooledConnection) { // 获取到了 connection
          eventListener.connectionAcquired(call, result);
          return result;
        }
    
        // Do TCP + TLS handshakes. This is a blocking operation.
        result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
            connectionRetryEnabled, call, eventListener);进行连接
        routeDatabase().connected(result.route()); // 
    
        Socket socket = null;
        synchronized (connectionPool) {
          reportedAcquired = true;
    
          // Pool the connection.
          Internal.instance.put(connectionPool, result); // connection 存入 connectionPool 中
    
          // If another multiplexed connection to the same address was created concurrently, then
          // release this connection and acquire that one.
          if (result.isMultiplexed()) {
            socket = Internal.instance.deduplicate(connectionPool, address, this);
            result = connection;
          }
        }
        closeQuietly(socket);
    
        eventListener.connectionAcquired(call, result);
        return result;
      }
    
    

    findConnection 方法主要做了以下事情;

    先获取定义的 connection,为空的话,通过 connectionPool 连接池中获取,还为空的话,更换路由去查找,查找不到最后创建 RealConnection 放入到下一个路由以及 connectionPool 中;

    ConnectionPool

    public final class ConnectionPool {
    
      private final Deque<RealConnection> connections = new ArrayDeque<>(); // RealConnection 队列
      final RouteDatabase routeDatabase = new RouteDatabase();
      boolean cleanupRunning;
    
      private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
          Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
    
      void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        if (!cleanupRunning) {
          cleanupRunning = true;
          executor.execute(cleanupRunnable); // 定期执行清除
        }
        connections.add(connection);
      }
    
      @Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
        assert (Thread.holdsLock(this));
        for (RealConnection connection : connections) { // 获取 connection
          if (connection.isEligible(address, route)) {
            streamAllocation.acquire(connection, true);
            return connection;
          }
        }
        return null;
      }
    
    

    ConnectionPool 维护了 RealConnection 的双向队列,同时当添加 RealConnection 的时候会开启线程,对 connectionPool 进行相关的清理操作;

    ConnectionPool 的 get 方法会对比参数的 Host 对比成功,返回对应 connection,即 Host 相同的时候复用 connection;

      public boolean isEligible(Address address, @Nullable Route route) {
        // If this connection is not accepting new streams, we're done.
        if (allocations.size() >= allocationLimit || noNewStreams) return false;
    
        // If the non-host fields of the address don't overlap, we're done.
        if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
    
        // If the host exactly matches, we're done: this connection can carry the address.
        if (address.url().host().equals(this.route().address().url().host())) { // 两者Host 相同
          return true; // This connection is a perfect match.
        }
    
    

    StreamAllocation 的 acquire 方法则会赋值 connection 变量;

      public void acquire(RealConnection connection, boolean reportedAcquired) {
        assert (Thread.holdsLock(connectionPool));
        if (this.connection != null) throw new IllegalStateException();
    
        this.connection = connection;
        this.reportedAcquired = reportedAcquired;
        connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
      }
    

    分析到这里,我们知道了 ConnectInterceptor 会根据 Reqsuet 请求的 Url 的 Host 去对比 ConnectionPool 中查找是否有对应的 connetction 连接已经存在,如果有即复用 connetction ;这里复用的规则即为相同 Host 的请求,这样避免了重新创建 connection;

    获取到了 RealConnection 之后即进行进行 connect;

    RealConnection

      public void connect(int connectTimeout, int readTimeout, int writeTimeout,
          int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
          EventListener eventListener) {
        if (protocol != null) throw new IllegalStateException("already connected");
    
        RouteException routeException = null;
        List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
        ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
    
        if (route.address().sslSocketFactory() == null) { // 无ssl
          if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
            throw new RouteException(new UnknownServiceException(
                "CLEARTEXT communication not enabled for client"));
          }
          String host = route.address().url().host();
          if (!Platform.get().isCleartextTrafficPermitted(host)) { 
            throw new RouteException(new UnknownServiceException( // 不安全的请求
                "CLEARTEXT communication to " + host + " not permitted by network security policy"));
          }
        }
    
        while (true) {
          try {
            if (route.requiresTunnel()) { // 通道连接
              connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
              if (rawSocket == null) {
                // We were unable to connect the tunnel but properly closed down our resources.
                break;
              }
            } else {// 创建 socket 连接 一般走这里
              connectSocket(connectTimeout, readTimeout, call, eventListener);
            }
            establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener); // 建立协议,一般为TLS握手协议
            eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
            break;
          } catch (IOException e) {
            closeQuietly(socket);
            closeQuietly(rawSocket);
            socket = null;
            rawSocket = null;
            source = null;
            sink = null;
            handshake = null;
            protocol = null;
            http2Connection = null;
    
            eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e); // 连接失败
    
            if (routeException == null) {
              routeException = new RouteException(e);
            } else {
              routeException.addConnectException(e);
            }
    
            if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
              throw routeException;
            }
          }
        }
    
        if (route.requiresTunnel() && rawSocket == null) {
          ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
              + MAX_TUNNEL_ATTEMPTS);
          throw new RouteException(exception);
        }
    
        if (http2Connection != null) { // 最大分配数
          synchronized (connectionPool) { 
            allocationLimit = http2Connection.maxConcurrentStreams();
          }
        }
      }
    

    connect 方法主要执行以下流程;
    1 通过路由获取安全套件,并验证安全套件是否和协议一致:对于HTTP协议的请求,安全套件中必须包含CLEARTEXT,CLEATTEXT代表着明文传输;Android平台本身的安全策略是否允许向相应的主机发送明文请求。
    2 进入循环创建连接直到创建成功,跳出循环。
    3 首先根据路由判断是否需要建立隧道 ,建立隧道连接 或者建立普通的连接
    4 建立协议,指的是建立TSL握手协议
    5 对于HTTP2协议,设置连接的最大分配数,指一条HTTP连接上最多同时存在的请求数目。

      private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,
          int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
        if (route.address().sslSocketFactory() == null) { // http
          protocol = Protocol.HTTP_1_1;
          socket = rawSocket;
          return;
        }
    
        eventListener.secureConnectStart(call);
        connectTls(connectionSpecSelector);// TLS协议握手过程 包括TLS 握手过程
        eventListener.secureConnectEnd(call, handshake);
    
        if (protocol == Protocol.HTTP_2) { // https
          socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
          http2Connection = new Http2Connection.Builder(true)
              .socket(socket, route.address().url().host(), source, sink)
              .listener(this)
              .pingIntervalMillis(pingIntervalMillis)
              .build();
          http2Connection.start();
        }
      }
    

    如果是HTTP协议,不需要建立协议的过程,此时TCP握手已经完成,可以在这个连接上开始于服务器的通信;如果是HTTPS、HTTP2 协议则还需要建立协议 TLS协议,完成TLS的握手,验证服务器证书,以及协商机密算法、传输秘钥。

    到这里即完成了 connect 过程分析,返回 RealConnection;

    当拿到了 RealConnection 之后,创建 HttpCodec ;

      public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
          StreamAllocation streamAllocation) throws SocketException {
        if (http2Connection != null) {
          return new Http2Codec(client, chain, streamAllocation, http2Connection);
        } else {
          socket.setSoTimeout(chain.readTimeoutMillis());
          source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
          sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
          return new Http1Codec(client, streamAllocation, source, sink);
        }
      }
    

    HttpCodec

      public Http2Codec(OkHttpClient client, Interceptor.Chain chain, StreamAllocation streamAllocation,
          Http2Connection connection) {
        this.client = client;
        this.chain = chain;
        this.streamAllocation = streamAllocation;
        this.connection = connection;
      }
    

    Http2Codec 封装请求的相关参数;

    RealConnection 和 Http2Codec 作为参数传入下一个拦截器;

    相关文章

      网友评论

          本文标题:探究OkHttpClient的运行原理(5---ConnectI

          本文链接:https://www.haomeiwen.com/subject/pziaaktx.html