美文网首页Android进阶之路Android开发Android技术知识
OKhttp网络框架基础学习(http三次握手底层实现)

OKhttp网络框架基础学习(http三次握手底层实现)

作者: 谁动了我的代码 | 来源:发表于2022-08-28 21:16 被阅读0次

    tcp的三次握手及四次挥手相信很多人都比较熟悉,而在Okhttp中,也有对于连接的管理部分。本文主要分析的是Okhttp是如何管理TCP三次握手四次挥手的,或者说Okhttp的连接管理。

    本文基于okhttp 3.14.9

    gradle依赖:implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.14.9'

    Okhttp的拦截器链(责任链)

    Okhttp最为经典的就是以责任链模式设计的拦截器链。Okhttp内部是有线程池触发每个RealCall对象继而触发网络请求的,如果您是异步调用的网络请求RealCall.enqueue(),在AsyncCall.execute()方法中就会调用个以下方法。即组装责任链链路interceptors集合,并发起网络请求。值得一题的是在整个过程中都是通过chain.proceed(originalRequest)的形式将责任链遍历。而本文的主角ConnectInterceptor就由此诞生,该interceptor就是负责http连接的

    // RealCall.java 210行
    Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List<Interceptor> interceptors = new ArrayList<>();
        interceptors.addAll(client.interceptors());
        interceptors.add(new RetryAndFollowUpInterceptor(client));
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        interceptors.add(new CacheInterceptor(client.internalCache()));
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
          interceptors.addAll(client.networkInterceptors());
        }
        interceptors.add(new CallServerInterceptor(forWebSocket));
     
        Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
            originalRequest, this, client.connectTimeoutMillis(),
            client.readTimeoutMillis(), client.writeTimeoutMillis());
     
        boolean calledNoMoreExchanges = false;
        try {
          Response response = chain.proceed(originalRequest);
          if (transmitter.isCanceled()) {
            closeQuietly(response);
            throw new IOException("Canceled");
          }
          return response;
        } catch (IOException e) {
          calledNoMoreExchanges = true;
          throw transmitter.noMoreExchanges(e);
        } finally {
          if (!calledNoMoreExchanges) {
            transmitter.noMoreExchanges(null);
          }
        }
      }
    

    Okhttp的连接管理

    ConnectInterceptor

    // ConnectInterceptor.java
    /** Opens a connection to the target server and proceeds to the next interceptor. */
    public final class ConnectInterceptor implements Interceptor {
      public final OkHttpClient client;
     
      public ConnectInterceptor(OkHttpClient client) {
        this.client = client;
      }
     
      @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        Transmitter transmitter = realChain.transmitter();
     
        // We need the network to satisfy this request. Possibly for validating a conditional GET.
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
     
        return realChain.proceed(request, transmitter, exchange);
      }
    }
    

    通过源码可知,ConnectInterceptor的代码比较简单,简单可理解为:新建一个Exchange对象,然后往下传递,这时客户端与服务器的连接已经建立,接下来将会是真正的数据传递。所以这里的重点就在transmitter.newExchange(chain, doExtensiveHealthChecks)里面了。

    简单介绍一下Transmitter和Exchange

    • Transmitter:OkHttp的应用层和网络层之间的桥梁。里面包含着OkHttpClient对象、RealConnectionPool连接池、该次请求的Call对象、事件监听的EventListener对象、AsyncTimeout对象用于管理超时。Transmitter对象可以理解为一个管理类,关联着整个网络请求的应用层网络层的对接
    • Exchange:管理网络请求的requestresponse,可以理解为网络层,Transmitter中也包含这此对象。实际上,真正处理网络请求的request和response的是其里面的ExchangeCodec对象

    从结果来看,一个Exchange对象就代表着一个与服务器的连接。带着这个结论我们继续往下看。

    Transmitter.newExchange

    // Transmitter.java 158行
    /** Returns a new exchange to carry a new request and response. */
    Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
        synchronized (connectionPool) {
          if (noMoreExchanges) {
            throw new IllegalStateException("released");
          }
          if (exchange != null) {
            throw new IllegalStateException("cannot make a new request because the previous response "
                + "is still open: please call response.close()");
          }
        }
     
        ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
        Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
     
        synchronized (connectionPool) {
          this.exchange = result;
          this.exchangeRequestDone = false;
          this.exchangeResponseDone = false;
          return result;
        }
     }
    

    newExchange方法的核心就是调用exchangeFinder.find()方法找到一个合适的连接,即找到一个合适的ExchangeCodec对象继而生成一个Exchange对象返回

    ExchangeFinder.find

      // ExchangeFinder.java 79行
      public ExchangeCodec find(
          OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
        int connectTimeout = chain.connectTimeoutMillis();
        int readTimeout = chain.readTimeoutMillis();
        int writeTimeout = chain.writeTimeoutMillis();
        int pingIntervalMillis = client.pingIntervalMillis();
        boolean connectionRetryEnabled = client.retryOnConnectionFailure();
     
        try {
          RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
              writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
          return resultConnection.newCodec(client, chain);
        } catch (RouteException e) {
          trackFailure();
          throw e;
        } catch (IOException e) {
          trackFailure();
          throw new RouteException(e);
        }
      }
    

    重点关注的是 RealConnection resultConnection = findHealthyConnection();,这里可以理解为通过findHealthyConnection()获取可用的RealConnection对象,再借助RealConnection新建一个ExchangeCodec对象用于网络请求的交互。

    RealConnection 是一次连接的抽象。

    Exchange or ExchangeCodec 为负责http请求的request和reponse的交互。

    所以,要想了解Okhttp如何管理连接的,就需要看RealConnection的查找过程

    查找RealConnection

    在Okhttp中是有利用连接池复用连接的设计的。RealConnectionPool就是担任着这一角色,其内部维护了一个双端队列Deque connections用来缓存可复用的连接。通过maxIdleConnections最大空闲连接数以及keepAliveDurationNs连接保活时长这俩参数来控制连接的释放。RealConnectionPool内部维护一个线程池,定期释放无用的RealConnectionPool连接。 回到查找可用RealConnection的逻辑当中,之前讲到的findHealthyConnection方法只是对查找过程findConnection的一次封装,它会死循环,直到获取到可用的RealConnection为止

    /**
       * Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
       * until a healthy connection is found.
       */
      private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
          int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
          boolean doExtensiveHealthChecks) throws IOException {
        while (true) {
          RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
              pingIntervalMillis, connectionRetryEnabled);
     
          // If this is a brand new connection, we can skip the extensive health checks.
          synchronized (connectionPool) {
            if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
              return candidate;
            }
          }
     
          // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
          // isn't, take it out of the pool and start again.
          if (!candidate.isHealthy(doExtensiveHealthChecks)) {
            candidate.noNewExchanges();
            continue;
          }
     
          return candidate;
        }
      }
    

    然后就正式进入查找阶段了,findConnection方法:

      private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
          int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
        boolean foundPooledConnection = false;
        // 查找RealConnection结果
        RealConnection result = null;
        Route selectedRoute = null;
        RealConnection releasedConnection;
        Socket toClose;
        synchronized (connectionPool) {
          if (transmitter.isCanceled()) throw new IOException("Canceled");
          hasStreamFailure = false; // This is a fresh attempt.
     
          。。。
          
          // [1] transmitter对象本身记录的RealConnection可复用
          if (transmitter.connection != null) {
            // We had an already-allocated connection and it's good.
            result = transmitter.connection;
            releasedConnection = null;
          }
          
          if (result == null) {
            // Attempt to get a connection from the pool.
            // [2] 通过当前请求地址在连接池connectionPool中寻找到可复用的RealConnection
            if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
              foundPooledConnection = true;
              result = transmitter.connection;
            } else if (nextRouteToTry != null) {
              selectedRoute = nextRouteToTry;
              nextRouteToTry = null;
            } else if (retryCurrentRoute()) {
              selectedRoute = transmitter.connection.route();
            }
          }
        }
        
        。。。
        
        if (result != null) {
          // If we found an already-allocated or pooled connection, we're done.
          return result;
        }
     
        // If we need a route selection, make one. This is a blocking operation.
        boolean newRouteSelection = false;
        if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
          newRouteSelection = true;
          routeSelection = routeSelector.next();
        }
     
        List<Route> routes = null;
        synchronized (connectionPool) {
          if (transmitter.isCanceled()) throw new IOException("Canceled");
     
          if (newRouteSelection) {
            // Now that we have a set of IP addresses, make another attempt at getting a connection from
            // the pool. This could match due to connection coalescing.
            routes = routeSelection.getAll();
            // [3] 遍历全局的路由路径,在连接池connectionPool中寻找到可复用的RealConnection
            if (connectionPool.transmitterAcquirePooledConnection(
                address, transmitter, routes, false)) {
              foundPooledConnection = true;
              result = transmitter.connection;
            }
          }
     
          if (!foundPooledConnection) {
            if (selectedRoute == null) {
              selectedRoute = routeSelection.next();
            }
     
            // Create a connection and assign it to this allocation immediately. This makes it possible
            // for an asynchronous cancel() to interrupt the handshake we're about to do.
            // [4] 若前面[1][2][3]都未找到时会新建一个RealConnection对象,后续请求服务器连接。
            result = new RealConnection(connectionPool, selectedRoute);
            connectingConnection = result;
          }
        }
     
        // If we found a pooled connection on the 2nd time around, we're done.
        if (foundPooledConnection) {
          eventListener.connectionAcquired(call, result);
          return result;
        }
     
        // Do TCP + TLS handshakes. This is a blocking operation.
        // [4] 前面[1][2][3]都未找到时,当前的result为全新RealConnection对象,
        // 这里的connect方法就会进行TCP的3次握手,和SSL/TLS。
        result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
            connectionRetryEnabled, call, eventListener);
        connectionPool.routeDatabase.connected(result.route());
     
        Socket socket = null;
        synchronized (connectionPool) {
          connectingConnection = null;
          // Last attempt at connection coalescing, which only occurs if we attempted multiple
          // concurrent connections to the same host.
          if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
            // We lost the race! Close the connection we created and return the pooled connection.
            result.noNewExchanges = true;
            socket = result.socket();
            result = transmitter.connection;
     
            // It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
            // that case we will retry the route we just successfully connected with.
            nextRouteToTry = selectedRoute;
          } else {
            connectionPool.put(result);
            transmitter.acquireConnectionNoEvents(result);
          }
        }
        closeQuietly(socket);
     
        eventListener.connectionAcquired(call, result);
        return result;
      }
    

    findConnection方法较长,可以寻找注释中的[1]、[2]、[3]、[4],即为每种获取到可用RealConnection对象情况。

    • [1] transmitter对象本身记录的RealConnection可复用。
    • [2] 通过当前请求地址在连接池connectionPool中寻找到可复用的RealConnection。
    • [3] 遍历全局的路由路径,在连接池connectionPool中寻找到可复用的RealConnection。ps:这里与[2]不同的是,会遍历到其他的请求地址,判断的依据可以是相同的host等。
    • [4] 如果前3步都找不到时,就会新建一个连接进行TCP+TLS了。

    值得一提的是,每次查找连接池中可复用的连接都会调用connectionPool.transmitterAcquirePooledConnection()方法(只是传的参数不同),在此方法内部是会将查找到的RealConnection对象备份一个引用到transmitter内。而如果是新建连接的话也会通过transmitter.acquireConnectionNoEvents(result)将新的RealConnection对象备份。这样做的目的是为了重试时更快的找到可复用的连接,也对应会上述的[1]中查找情况

    执行三次握手

    根据上述findConnection方法中的result.connect,跟踪代码会看到,最终走到RealConnection.connectSocket()

    // RealConnection.java
    private void connectSocket(int connectTimeout, int readTimeout, Call call,
          EventListener eventListener) throws IOException {
        Proxy proxy = route.proxy();
        Address address = route.address();
     
        rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
            ? address.socketFactory().createSocket()
            : new Socket(proxy);
     
        eventListener.connectStart(call, route.socketAddress(), proxy);
        rawSocket.setSoTimeout(readTimeout);
        try {
          Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
        } catch (ConnectException e) {
          ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
          ce.initCause(e);
          throw ce;
        }
     
        // The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
        // More details:
        // https://github.com/square/okhttp/issues/3245
        // https://android-review.googlesource.com/#/c/271775/
        try {
          source = Okio.buffer(Okio.source(rawSocket));
          sink = Okio.buffer(Okio.sink(rawSocket));
        } catch (NullPointerException npe) {
          if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
            throw new IOException(npe);
          }
        }
      }
    

    到了这一步,RealConnection内部就会新建一个Socket对象,用于后续的tcp连接通信。

    Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);就是真正执行三次握手的地方,这里的Platform.get()是Okhttp根据平台(ps:可能不是Android平台或者Android版本)的适配逻辑。

    // Platform.java
    public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout)
          throws IOException {
        socket.connect(address, connectTimeout);
      }
    

    显然,Okhttp内部的tcp三次握手,依赖的是传统的socket连接

    ps:上述代码中会利用socket对象创建一个source和一个sink对象,这个是Okio的东西,如果请求是Http1协议的话就会用到。后续request和reponse就是利用它们来实现。

    最后还需要补充的是,上述代码中在找到可用的RealConnection对象后,会利用它新建一个ExchangeCodec对象:resultConnection.newCodec(client, chain);。其实ExchangeCodec是一个接口,会根据所用协议为Http1或者Http2实例出对应的策略类。

    // RealConnection.java
    ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException {
        if (http2Connection != null) {
          return new Http2ExchangeCodec(client, this, chain, http2Connection);
        } else {
          socket.setSoTimeout(chain.readTimeoutMillis());
          source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
          sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
          return new Http1ExchangeCodec(client, this, source, sink);
        }
      }
    

    这里如果是Http1的话,上述利用socket对象创建sourcesink对象就会被引用到Http1ExchangeCodec当中。

    Okhttp的连接释放

    说完了三次握手的管理,再来看看四次挥手,即连接释放。在源码中会看到多处地方调用一个closeQuietly方法,该方法是用于释放连接的。

    // Utils.java
    public static void closeQuietly(Socket socket) {
        if (socket != null) {
          try {
            socket.close();
          } catch (AssertionError e) {
            if (!isAndroidGetsocknameError(e)) throw e;
          } catch (RuntimeException rethrown) {
            throw rethrown;
          } catch (Exception ignored) {
          }
        }
      }
    

    socket.close();就是针对这一次tcp连接的关闭(释放)。这里其实就是真正的连接释放了。而至于常规触发这一逻辑的地方就是前面提到的RealConnectionPool定时回收机制

    最后

    最后在总结一下文中设计的类的职能吧!

    • ConnectInterceptor:Okhttp用于与服务器建立连接的拦截器,连接发起的地方。
    • Transmitter:Okhttp应用层与网络层的中间层。这里利用其来获取到Exchange对象。
    • ExchangeFinder:用于查找可用连接。查找可用RealConnection对象,实例出ExchangeCodec对象,以及返回Exchange对象都会在这里完成。
    • Exchange:封装http的request和response的事件触发,中间还会涉及事件监听的回调。内部拥有该次连接的ExchangeCodec对象。
    • ExchangeCodec:真正用于http的request和response的事件。内部使用的是Okio。
    • RealConnection:一次连接的抽象,内部用于本次tcp连接的socket对象。可在Transmitter中、RealConnectionPool连接池或者新建获取,此步骤发生在ExchangeFinder中。
    • RealConnectionPool:用于缓存可复用的RealConnection对象。

    本文转载于

    作者:Cy13er

    原文地址:https://juejin.cn/post/6959882254076084237

    相关文章

      网友评论

        本文标题:OKhttp网络框架基础学习(http三次握手底层实现)

        本文链接:https://www.haomeiwen.com/subject/cqvcnrtx.html