美文网首页
网络请求框架学习 okhttp ----连接池

网络请求框架学习 okhttp ----连接池

作者: 0_oHuanyu | 来源:发表于2017-07-24 20:28 被阅读66次

上一篇拦截器分析中,在ConnectInterceptor的intercept方法中,有这样一句代码来获得stream。

HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);

点streamAllocation进去之后看到newStream方法中调用了findHealthyConnection方法,实现如下:

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException {
    while (true) {
      // 这一句代码是实际获得连接的
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }

如果是新的连接就跳过健康检查,如果不是就查一下是否已经断开啦,输入输出是否关闭啦。再进去看findConnection方法

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException {
    Route selectedRoute;
    // 在pool中取连接的话,就需要拿这个pool做同步锁,如果是第一次发起请求应该是拿不到的,会走到下面
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");

      // Attempt to use an already-allocated connection.
      RealConnection allocatedConnection = this.connection;
      // 如果allocatedConnection 不为空并且连接池还没满,就直接使用这个连接
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      // Attempt to get a connection from the pool.
      // 调用了okhttpclient的get方法,从connectionPool中根据address拿到连接。
      Internal.instance.get(connectionPool, address, this, null);
      if (connection != null) {
        return connection;
      }
      // 第一次进来就是空
      selectedRoute = route;
    }

    // If we need a route, make one. This is a blocking operation.
    if (selectedRoute == null) {
      // 这里面先是查找内存缓存,根据proxies的类型在routeSelector的集合inetSocketAddresses中查找,没有的话就重设一个
      // 调用address.dns().lookup(socketHost)方法,通过DNS服务器查询返回一组ip地址(一个域名可能对应多个ip地址,可用于自动重连)
      // 最后将得到的address 加入集合inetSocketAddresses中缓存起来。
      selectedRoute = routeSelector.next();
    }

    RealConnection result;
    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      // Now that we have an IP address, make another attempt at getting a connection from the pool.
      // This could match due to connection coalescing.
      // 再进行一次尝试,从连接池中拿连接
      Internal.instance.get(connectionPool, address, this, selectedRoute);
      if (connection != null) {
        route = selectedRoute;
        return connection;
      }

      // Create a connection and assign it to this allocation immediately. This makes it possible
      // for an asynchronous cancel() to interrupt the handshake we're about to do.
      route = selectedRoute;
      refusedStreamCount = 0;
      // 新搞一个连接
      result = new RealConnection(connectionPool, selectedRoute);
      // 将connection的引用交给streamAllocation,将streamAllocation的弱引用加入到connection的allocations集合中
      acquire(result);
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    // 这里调用RealConnection的connect方法,传入ip 端口号进行connect,RealConnection内部的source和sink就是在这个方法中赋值的。
    result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
    // connectionPool中维护了一个键值对,里面存了所有连接失败的route,每个连接失败的route都加入进去。
    // 而这句话是把连接成功的从route黑名单中去除掉。  
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      // Pool the connection.
      // 用okhttpclient的代码把当前的连接放入连接池中,这种麻烦的写法估计是跟设计模式有关系
      // 注意放进去的同时会触发清理
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      // 处理多线程产生的问题,如果产生了多个connection就release掉当前的,用另一个线程创建的connection
      // 并且关闭掉多余的socket
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    return result;
  }

代码很长,我看到的是:
RouteSelector做准备;
ConnectionPool管理连接;
RealConnection做具体执行;
以StreamAllocation为中心协调各个类;
将RealConnection生成的Http1Codec和Http2Codec这种面向协议(设置请求头读取回复)进行sink和read的类 传递到拦截器中。

代码执行步骤大致分三部分:

  1. 获得route、ip、port那些鬼,由RouteSelector这个类完成,拿到address之后再尝试连接池中拿connection。
  2. 实在从连接池中拿不到了,就新建connection,用raw socket进行三握手那些鬼,由RealConnection这个类完成,拿到source和sink;
    让connection和streamAllocation相互引用(一个强引用一个弱引用),连接池里面有一个ArrayDeque来记录所有的socket连接。将新的connection放入连接池,触发清理;
    将route从黑名单移除。
  3. 检查是否有多线程导致的问题,如果有,就释放当前连接,用别的线程创建的连接。

上面说将连接加入连接池时会触发清理操作,下面贴上代码详细说明是如何清理的。

在connectionPool中,有个cleanup方法来执行清理操作

 long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        // If the connection is in use, keep searching.
        // 这里具体执行streamAllocation的清理,具体代码在下面
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }

        idleConnectionCount++;

        // If the connection is ready to be evicted, we're done.
        // 如果说:闲置的时间超过了设定值,或者最大限制连接数超过设定值,就把connection从连接池中移除,并关掉connection。
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        // We've found a connection to evict. Remove it from the list, then close it below (outside
        // of the synchronized block).
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        // A connection will be ready to evict soon.
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        // All connections are in use. It'll be at least the keep alive duration 'til we run again.
        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use.
        cleanupRunning = false;
        return -1;
      }
    }

    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    // 根据闲置时间和闲置连接数,还有可能立刻执行下一次清理
    return 0;
  }
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
    List<Reference<StreamAllocation>> references = connection.allocations;
    for (int i = 0; i < references.size(); ) {
      Reference<StreamAllocation> reference = references.get(i);

      if (reference.get() != null) {
        i++;
        continue;
      }

      // We've discovered a leaked allocation. This is an application bug.
      //遍历每一个connection的streamAllocation弱引用集合,发现弱引用已被回收,就将其在弱引用集合中移除
      StreamAllocation.StreamAllocationReference streamAllocRef =
          (StreamAllocation.StreamAllocationReference) reference;
      String message = "A connection to " + connection.route().address().url()
          + " was leaked. Did you forget to close a response body?";
      // 打印警告,告知程序员,你的使用有问题
      Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

      references.remove(i);
      connection.noNewStreams = true;

      // If this was the last allocation, the connection is eligible for immediate eviction.
      if (references.isEmpty()) {
        // 如果所有的弱引用都被移除掉了,说明这个connection是闲置的,记录闲置的时间。将闲置最久的connection记录下来。
        connection.idleAtNanos = now - keepAliveDurationNs;
        return 0;
      }
    }

    return references.size();
  }

总结一下,清理策略就是:

  1. connection自身记录streamAllocation的连接数,达到0的时候就标记自己为闲置连接,记录闲置时间等待清理
  2. 满足闲置时间太长或者闲置连接太多时,ConnectionPool就执行清理操作关掉连接(默认空闲的socket最大连接数为5个,socket的keepAlive时间为5秒)。
  3. 正常情况下ConnectionPool会每隔一段时间就尝试清理一次。看连接使用情况,忙的话就一直尝试清理,闲的时候加入任务也会触发清理。

okhttp对socket的直接管理还是通过ConnectionPool来实现的。

回顾一下前面的拦截器的知识,结合一下:

  1. 在RealInterceptorChain中有一个streamAllocation成员变量
  2. 在RetryAndFollowUpInterceptor中初始化streamAllocation传到RealInterceptorChain中,此时还是没有任何连接和这个streamAllocation绑定的
  3. 到了ConnectInterceptor中,调用streamAllocation的newStream方法,内部调用findConnection方法,获得连接
  4. 连接的获得是先尝试从连接池中取,取不到就初始化一个连接,将streamAllocation弱引用给connection(此时connection可能已经有很多streamAllocation在用了),同时在连接池中尝试清理。
  5. 拿到连接之后,返回给ConnectInterceptor一个HttpCodec,这是一个接口的实现类,根据http协议是 1.x 还是2 内部有不同的实现
  6. 回到CallServerInterceptor中,拿HttpCodec来执行写入请求头、读取返回信息、构造responseBody等。

相关文章

网友评论

      本文标题:网络请求框架学习 okhttp ----连接池

      本文链接:https://www.haomeiwen.com/subject/nbjbkxtx.html