美文网首页
OkHttp 原理解析 (三)

OkHttp 原理解析 (三)

作者: 莫库施勒 | 来源:发表于2019-06-10 16:19 被阅读0次
    OkHttp.jpg

    ConnectInterceptor

    public final class ConnectInterceptor implements Interceptor {
      public final OkHttpClient client;
    
      public ConnectInterceptor(OkHttpClient client) {
        this.client = client;
      }
    
      @Override public Response intercept(Chain chain) throws IOException {
        RealInterceptorChain realChain = (RealInterceptorChain) chain;
        Request request = realChain.request();
        Transmitter transmitter = realChain.transmitter();
    
        boolean doExtensiveHealthChecks = !request.method().equals("GET");
        Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
    
        return realChain.proceed(request, transmitter, exchange);
      }
    }
    

    之前我们在RetryAndFollowUpInterceptor 已经 prepareToConnect() 做过准备了,然后就是在 BridgeIntercepter 中添加一些请求头和相应头,接着是CacheIntercepter 看是否可以直接使用缓存,如果有缓存的话也不会走到这里,如果没有缓存就需要 ConnectIntercepter 借用 Transmitter来桥接应用层和网络层,通过 ExchangeFinder 中的 finHealthyConnection()connectionPool 中找到一个可用的连接,这个连接可能是复用的,并 connect(),从而得到 输入/输出 流 (source/sink) ,返回一个 ExchangeCallServerIntercepter , 通过这个 Exchange 就可以添加请求头和请求体,并读取响应头和响应体,来交给上面的 Intercepter,层层向上传递。

    // ExchangeFinder.java
      private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
          int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
        boolean foundPooledConnection = false;
        RealConnection result = null;
        Route selectedRoute = null;
        RealConnection releasedConnection;
        Socket toClose;
        synchronized (connectionPool) {
          if (transmitter.isCanceled()) throw new IOException("Canceled");
          hasStreamFailure = false; // This is a fresh attempt.
    
          // 尝试复用已分配 Connection
          releasedConnection = transmitter.connection;
          toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
              ? transmitter.releaseConnectionNoEvents()
              : null;
    
          if (transmitter.connection != null) {
            // 得到了已分配的connection
            result = transmitter.connection;
            releasedConnection = null;
          }
    
          if (result == null) {
            // 尝试获取已回收的connection
            if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
              foundPooledConnection = true;
              result = transmitter.connection;
            } else if (nextRouteToTry != null) {
              selectedRoute = nextRouteToTry;
              nextRouteToTry = null;
            } else if (retryCurrentRoute()) {
              selectedRoute = transmitter.connection.route();
            }
          }
        }
        closeQuietly(toClose);
    
        if (releasedConnection != null) {
          eventListener.connectionReleased(call, releasedConnection);
        }
        if (foundPooledConnection) {
          eventListener.connectionAcquired(call, result);
        }
        if (result != null) {
          // 从connectionPool中找到了就返回
          return result;
        }
    
        // 如果需要路由选择器,就创建。这是一个阻塞操作
        boolean newRouteSelection = false;
        if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
          newRouteSelection = true;
          routeSelection = routeSelector.next();
        }
    
        List<Route> routes = null;
        synchronized (connectionPool) {
          if (transmitter.isCanceled()) throw new IOException("Canceled");
    
          if (newRouteSelection) {
            // 根据 IP addresses 集合, 再次尝试从 connectionPool中获取connection。这里与上次的区别是 routes不为空
            routes = routeSelection.getAll();
            if (connectionPool.transmitterAcquirePooledConnection(
                address, transmitter, routes, false)) {
              foundPooledConnection = true;
              result = transmitter.connection;
            }
          }
    
          if (!foundPooledConnection) {
            if (selectedRoute == null) {
              selectedRoute = routeSelection.next();
            }
    
            // 这里就创建一个 Connection并指派
            result = new RealConnection(connectionPool, selectedRoute);
            connectingConnection = result;
          }
        }
    
        // 得到了connection,返回
        if (foundPooledConnection) {
          eventListener.connectionAcquired(call, result);
          return result;
        }
    
        // 进行 TCP + TLS handshakes. 一个阻塞操作
        result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
            connectionRetryEnabled, call, eventListener);
        connectionPool.routeDatabase.connected(result.route());
    
        Socket socket = null;
        synchronized (connectionPool) {
          connectingConnection = null;
          // 将 connection进行合并,只有在多个connection 复用一个 host的时候
          if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
            // We lost the race! Close the connection we created and return the pooled connection.
            result.noNewExchanges = true;
            socket = result.socket();
            result = transmitter.connection;
          } else {
            connectionPool.put(result);
            transmitter.acquireConnectionNoEvents(result);
          }
        }
        closeQuietly(socket);
    
        eventListener.connectionAcquired(call, result);
        return result;
      }
    

    以上代码主要做的事情有:

    1. StreamAllocation的connection如果可以复用则复用;
    2. 如果connection不能复用,则从连接池中获取RealConnection对象,获取成功则返回;
    3. 如果连接池里没有,则new一个RealConnection对象;
    4. 调用RealConnection的connect()方法发起请求;
    5. 将RealConnection对象存进连接池中,以便下次复用;
    6. 返回RealConnection对象。

    RealConnection

     // Connection 接口
     Route route(); //返回一个路由
     Socket socket();  //返回一个socket
     Handshake handshake();  //如果是一个https,则返回一个TLS握手协议
     Protocol protocol(); //返回一个协议类型 比如 http1.1 等或者自定义类型 
    

    RealConnection是Connection的实现类,代表着链接socket的链路,如果拥有了一个RealConnection就代表了我们已经跟服务器有了一条通信链路。

     // RealConnection 成员变量
      private final ConnectionPool connectionPool;
      private final Route route;
    
      //下面这些字段,通过connect()方法初始化赋值,且不会再次赋值
    
      private Socket rawSocket; //底层 TCP socket
    
      private Socket socket;  //应用层socket
    
      private Handshake handshake;  //握手
     
      private Protocol protocol;  //协议
      
      private Http2Connection http2Connection; // http2的链接
    
      // 通过source和sink,与服务器交互的输入输出流
      private BufferedSource source;
      private BufferedSink sink;
    
      // 下面这个字段是表示链接状态的字段,并且有connectPool统一管理
    
      // 如果noNewStreams被设为true,则noNewStreams一直为true,不会被改变,
      // 并且这个链接不会再创建新的stream流
      public boolean noNewStreams;
      
      //成功的次数
      public int successCount;
    
      //此链接可以承载最大并发流的限制,如果不超过限制,可以随意增加
      public int allocationLimit = 1;
    

    由上面的我们可以得出一些结论:

    • source和sink,以流的形式对服务器进行交互
    • 除了route 字段,部分的字段都是在connect()方法里面赋值的,并且不会改变
    • noNewStream 可以简单理解为该连接不可用。
    • allocationLimit是分配流的数量上限,一个connection最大只能支持一个1并发

    首先是connect() 方法

      public void connect(int connectTimeout, int readTimeout, int writeTimeout,
          int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
          EventListener eventListener) {
        if (protocol != null) throw new IllegalStateException("already connected");
        // 创建一个 Selector 来选择 connectionSpec 也就是线路
        RouteException routeException = null;
        List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
        ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
        ...
        // 尝试连接
        while (true) {
          try {
            // 如果要求隧道模式,建立通道连接,通常不会使用这种
            if (route.requiresTunnel()) {
              connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
              if (rawSocket == null) {
                // We were unable to connect the tunnel but properly closed down our resources.
                break;
              }
            } else {
              // socket 连接
              connectSocket(connectTimeout, readTimeout, call, eventListener);
            }
            // 建立 https 连接
            establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
            eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
            break;
          } catch (IOException e) {
            ...
          }
        }
        if (route.requiresTunnel() && rawSocket == null) {
          ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
              + MAX_TUNNEL_ATTEMPTS);
          throw new RouteException(exception);
        }
    
        if (http2Connection != null) {
          synchronized (connectionPool) {
            allocationLimit = http2Connection.maxConcurrentStreams();
          }
        }
      }
    

    socket 连接

      private void connectSocket(int connectTimeout, int readTimeout, Call call,
          EventListener eventListener) throws IOException {
        Proxy proxy = route.proxy();
        Address address = route.address();
         // 根据代理类型来选择socket是代理还是直连类型
        rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
            ? address.socketFactory().createSocket()
            : new Socket(proxy);
    
        eventListener.connectStart(call, route.socketAddress(), proxy);
        rawSocket.setSoTimeout(readTimeout);
        try {
          // 为支持不同的平台,实际是 socket.connect(address, connectTimeout)
          Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
        } catch (ConnectException e) {
          ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
          ce.initCause(e);
          throw ce;
        }
    
        try {
          // 得到输入/输出流
          source = Okio.buffer(Okio.source(rawSocket));
          sink = Okio.buffer(Okio.sink(rawSocket));
        } catch (NullPointerException npe) {
          if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
            throw new IOException(npe);
          }
        }
      }
    

    隧道连接

      private void connectTunnel(int connectTimeout, int readTimeout, int writeTimeout)
          throws IOException {
        // 创建隧道请求
        Request tunnelRequest = createTunnelRequest();
        HttpUrl url = tunnelRequest.url();
        int attemptedConnections = 0;
        int maxAttempts = 21;
        while (true) {
          if (++attemptedConnections > maxAttempts) {
            throw new ProtocolException("Too many tunnel connections attempted: " + maxAttempts);
          }
          // 建立Socket连接
          connectSocket(connectTimeout, readTimeout);
          // 建立隧道
          tunnelRequest = createTunnel(readTimeout, writeTimeout, tunnelRequest, url);
    
          if (tunnelRequest == null) break; // Tunnel successfully created.
    
          closeQuietly(rawSocket);
          rawSocket = null;
          sink = null;
          source = null;
        }
      }
    

    它们调用connectSocket 中参数 Call 是不一样的。

    connectSocket中的代理连接建立的过程

    1. 没有设置代理的情况下,则直接与HTTP服务器建立TCP连接
    2. 设置了SOCKS代理的情况下,创建Socket时,为其传入proxy,连接时还是以HTTP服务器为目标。
    3. 设置了HTTP代理时,如果是HTTP请求,则与HTTP代理服务器建立TCP连接。HTTP代理服务器解析HTTP请求/响应的内容,并根据其中的信息来完成数据的转发。
    4. 设置了HTTP代理时,如果是 HTTPS/HTTP2请求,与HTTP服务器建立通过HTTP代理的隧道连接。HTTP代理不再解析传输的数据,仅仅完成数据转发的功能。此时HTTP代理的功能退化为如同SOCKS代理类似。
    5. 设置了代理类时,HTTP的服务器的域名解析会交给代理服务器执行。如果是HTTP代理,会对HTTP代理的域名做域名解析。

    establishProtocol 建立连接过程:

    1. 建立 TLS 连接
      1. 用SSLSocketFactory基于原始的TCP Socket,创建一个SSLSocket, 配置SSLSocket。
      2. configureTlsExtensions 配置 TLS扩展
      3. 进行TLS握手
      4. 获取证书信息。
      5. 对证书进行验证。
      6. 完成HTTP/2的ALPN扩展
      7. 基于前面获取到SSLSocket创建于执行的IO的BufferedSource和BufferedSink等,并保存握手信息以及所选择的协议。
    2. 如果是HTTP 2.0,通过Http2Connection.Builder 建立一个 Http2Connection,通过 http2Connection.start() 和服务器建立连接。

    ConnectionPool

    管理http和http/2的链接,以减少请求的网络延迟。同一个address将共享同一个connection。实现了连接复用的功能。

    public final class ConnectionPool {
      final RealConnectionPool delegate;
    }
    

    当前版本将具体的实现委托给了 RealConnectionPool

    public final class RealConnectionPool {
      // 后台线程用来清理过期连接,在每一个连接池中最多又一个线程。
      // 这个 executor 允许自己被GC 清理
      private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
          Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
          new SynchronousQueue<>(), Util.threadFactory("OkHttp ConnectionPool", true));
      // 清理任务
      private final Runnable cleanupRunnable = () -> {
        while (true) {
          long waitNanos = cleanup(System.nanoTime());
          if (waitNanos == -1) return;
          if (waitNanos > 0) {
            long waitMillis = waitNanos / 1000000L;
            waitNanos -= (waitMillis * 1000000L);
            synchronized (RealConnectionPool.this) {
              try {
                RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
              } catch (InterruptedException ignored) {
              }
            }
          }
        }
      };
      // 过期连接队列
      private final Deque<RealConnection> connections = new ArrayDeque<>();
      // 路由数据库,用来记录不可用的route
      final RouteDatabase routeDatabase = new RouteDatabase();
    }
    

    默认情况下,这个连接池最多维持5个连接,且每个链接最多活5分钟。
    从 ConnectionPool 获取Connection

      // RealConectionPool.java
      boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
          @Nullable List<Route> routes, boolean requireMultiplexed) {
        assert (Thread.holdsLock(this));
        for (RealConnection connection : connections) {
          if (requireMultiplexed && !connection.isMultiplexed()) continue;
          if (!connection.isEligible(address, routes)) continue;
          transmitter.acquireConnectionNoEvents(connection);
          return true;
        }
        return false;
      }
    

    然后把这个connection 设置到 Transmitter 中去

     // 此方法有两处调用,一个是 findConnection,另一个是 connectionPool.transmitterAcquirePooledConnection()
     // 后一个方法也会在 findConnection处被调用
      void acquireConnectionNoEvents(RealConnection connection) {
        assert (Thread.holdsLock(connectionPool));
    
        if (this.connection != null) throw new IllegalStateException();
        this.connection = connection;
        connection.transmitters.add(new TransmitterReference(this, callStackTrace));
      }
    

    从代码可以看出来,这个connection 必须 isMultiplexed、 isEligible, 才可以
    至于添加 connection ,就是异步触发清理任务,然后将连接添加到队列中。

     void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        if (!cleanupRunning) {
          cleanupRunning = true;
          executor.execute(cleanupRunnable);
        }
        connections.add(connection);
      }
    

    至于这个清理任务,代码就是上面的 cleanupRunnable

    1. 调用cleanup方法
    2. 等待 connectionBecameIdle() 触发 notifyAll()
      而这个 connectionBecameIdle() 是在 TransmitterreleaseConnectionNoEvents() -> maybeReleaseConnection() -> exchangeMessageDone() -> Exchange.bodyComplete -> complete -> close
      这个 close 属于 ForwardingSource,它的 delegate, 即为 codec.openResponseBodySource(response)

    我们现在看一下 cleanup 做了什么

    1. 统计空连接数量
    2. 查找最长空闲时间的连接,以及它的空闲时长
    3. 如果超过了最大连接数或者最大空闲时长,就 remove 掉这个连接
    4. 否则返回一个等待时长,也就是cleanup 的返回值 waitNanos
      然后阻塞相应的时间,如果有了废弃连接就清理,否则,接着等待

    cleanup中还有一个方法 pruneAndGetAllocationCount(),它是用来追踪泄露连接的,返回还存活于 connection 的 transmitter 的数量。所谓泄漏,就是还在追踪这个connection 但是程序已经废弃掉他们了。

    Transmitter

    是OkHttp的应用程序和网络层之间的桥梁。 此类公开了高级应用程序层原语:连接,请求,响应和流。
    它持有okhttpclient对象以及RealCall对象。
    它支持异步取消,如果是一个 HTTP/2, 取消的是这个流而不是共享的这个连接,但是如果是在进行TLS握手,就会取消整个连接。

    ExchangeFinder

    它尝试去是为一些可能的变化去找到一条可用的连接,策略如下:

    1. 如果当前 call 已经有了一个连接,能够满足请求,就用相同的连接,做一些初始化修改。
    2. 如果连接池中的一个连接满足这个请求。
    3. 如果没有现存的连接,就创建一个路由列表,并创建一个新连接。如果失败了,就迭代的尝试列表中可用的路由。

    相关文章

      网友评论

          本文标题:OkHttp 原理解析 (三)

          本文链接:https://www.haomeiwen.com/subject/pgozxctx.html