OkHttp3源码解析(三)——连接池复用

作者: AntDream | 来源:发表于2018-08-11 17:25 被阅读7次

    本文基于OkHttp3的3.11.0版本

    implementation 'com.squareup.okhttp3:okhttp:3.11.0'
    

    我们已经分析了OkHttp3的拦截器链和缓存策略,今天我们再来看看OkHttp3的连接池复用。

    客户端和服务器建立socket连接需要经历TCP的三次握手和四次挥手,是一种比较消耗资源的动作。Http中有一种keepAlive connections的机制,在和客户端通信结束以后可以保持连接指定的时间。OkHttp3支持5个并发socket连接,默认的keepAlive时间为5分钟。下面我们来看看OkHttp3是怎么实现连接池复用的。

    OkHttp3的连接池--ConnectionPool

    public final class ConnectionPool {
        
        //线程池,用于执行清理空闲连接
        private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
          Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
        //最大的空闲socket连接数
        private final int maxIdleConnections;
        //socket的keepAlive时间
        private final long keepAliveDurationNs;
        
        private final Deque<RealConnection> connections = new ArrayDeque<>();
        final RouteDatabase routeDatabase = new RouteDatabase();
        boolean cleanupRunning;
    }
    

    ConnectionPool里的几个重要变量:

    (1)executor线程池,类似于CachedThreadPool,用于执行清理空闲连接的任务。

    (2)Deque双向队列,同时具有队列和栈的性质,经常在缓存中被使用,里面维护的RealConnection是socket物理连接的包装

    (3)RouteDatabase,用来记录连接失败的路线名单

    下面看看ConnectionPool的构造函数

    public ConnectionPool() {
        this(5, 5, TimeUnit.MINUTES);
    }
    
    public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
        this.maxIdleConnections = maxIdleConnections;
        this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
    
        // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
        if (keepAliveDuration <= 0) {
          throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
        }
    }
    

    从构造函数中可以看出,ConnectionPool的默认空闲连接数为5个,keepAlive时间为5分钟。ConnectionPool是什么时候被创建的呢?是在OkHttpClient的builder中:

    public static final class Builder {
        ...
        ConnectionPool connectionPool;
        ...
        public Builder() {
            ...
            connectionPool = new ConnectionPool();
            ...
        }
        
        //我们也可以定制连接池
        public Builder connectionPool(ConnectionPool connectionPool) {
            if (connectionPool == null) throw new NullPointerException("connectionPool == null");
            this.connectionPool = connectionPool;
            return this;
        }
    }
    

    缓存操作:添加、获取、回收连接

    (1)从缓存中获取连接

    //ConnectionPool.class
    @Nullable 
    RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
        assert (Thread.holdsLock(this));
        for (RealConnection connection : connections) {
          if (connection.isEligible(address, route)) {
            streamAllocation.acquire(connection, true);
            return connection;
          }
        }
        return null;
    }
    

    获取连接的逻辑比较简单,就遍历连接池里的连接connections,然后用RealConnection的isEligible方法找到符合条件的连接,如果有符合条件的连接则复用。需要注意的是,这里还调用了streamAllocation的acquire方法。acquire方法的作用是对RealConnection引用的streamAllocation进行计数,OkHttp3是通过RealConnection的StreamAllocation的引用计数是否为0来实现自动回收连接的。

    //StreamAllocation.class
    public void acquire(RealConnection connection, boolean reportedAcquired) {
        assert (Thread.holdsLock(connectionPool));
        if (this.connection != null) throw new IllegalStateException();
    
        this.connection = connection;
        this.reportedAcquired = reportedAcquired;
        connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
    }
    
    public static final class StreamAllocationReference extends WeakReference<StreamAllocation> {
    
        public final Object callStackTrace;
    
        StreamAllocationReference(StreamAllocation referent, Object callStackTrace) {
          super(referent);
          this.callStackTrace = callStackTrace;
        }
    }
    
    //RealConnection.class
    public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
    

    每一个RealConnection中都有一个allocations变量,用于记录对于StreamAllocation的引用。StreamAllocation中包装有HttpCodec,而HttpCodec里面封装有Request和Response读写Socket的抽象。每一个请求Request通过Http来请求数据时都需要通过StreamAllocation来获取HttpCodec,从而读取响应结果,而每一个StreamAllocation都是和一个RealConnection绑定的,因为只有通过RealConnection才能建立socket连接。所以StreamAllocation可以说是RealConnection、HttpCodec和请求之间的桥梁。

    当然同样的StreamAllocation还有一个release方法,用于移除计数,也就是将当前的StreamAllocation的引用从对应的RealConnection的引用列表中移除。

    private void release(RealConnection connection) {
        for (int i = 0, size = connection.allocations.size(); i < size; i++) {
          Reference<StreamAllocation> reference = connection.allocations.get(i);
          if (reference.get() == this) {
            connection.allocations.remove(i);
            return;
          }
        }
        throw new IllegalStateException();
    }
    

    (2)向缓存中添加连接

    //ConnectionPool.class
    void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        if (!cleanupRunning) {
          cleanupRunning = true;
          executor.execute(cleanupRunnable);
        }
        connections.add(connection);
      }
    

    添加连接之前会先调用线程池执行清理空闲连接的任务,也就是回收空闲的连接。

    (3)空闲连接的回收

    private final Runnable cleanupRunnable = new Runnable() {
        @Override public void run() {
          while (true) {
            long waitNanos = cleanup(System.nanoTime());
            if (waitNanos == -1) return;
            if (waitNanos > 0) {
              long waitMillis = waitNanos / 1000000L;
              waitNanos -= (waitMillis * 1000000L);
              synchronized (ConnectionPool.this) {
                try {
                  ConnectionPool.this.wait(waitMillis, (int) waitNanos);
                } catch (InterruptedException ignored) {
                }
              }
            }
          }
        }
    };
    

    cleanupRunnable中执行清理任务是通过cleanup方法来完成,cleanup方法会返回下次需要清理的间隔时间,然后会调用wait方法释放锁和时间片。等时间到了就再次进行清理。下面看看具体的清理逻辑:

    long cleanup(long now) {
        //记录活跃的连接数
        int inUseConnectionCount = 0;
        //记录空闲的连接数
        int idleConnectionCount = 0;
        //空闲时间最长的连接
        RealConnection longestIdleConnection = null;
        long longestIdleDurationNs = Long.MIN_VALUE;
    
        synchronized (this) {
          for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            RealConnection connection = i.next();
    
            //判断连接是否在使用,也就是通过StreamAllocation的引用计数来判断
            //返回值大于0说明正在被使用
            if (pruneAndGetAllocationCount(connection, now) > 0) {
                //活跃的连接数+1
                inUseConnectionCount++;
                continue;
            }
            //说明是空闲连接,所以空闲连接数+1
            idleConnectionCount++;
    
            //找出了空闲时间最长的连接,准备移除
            long idleDurationNs = now - connection.idleAtNanos;
            if (idleDurationNs > longestIdleDurationNs) {
              longestIdleDurationNs = idleDurationNs;
              longestIdleConnection = connection;
            }
          }
    
          if (longestIdleDurationNs >= this.keepAliveDurationNs
              || idleConnectionCount > this.maxIdleConnections) {
            //如果空闲时间最长的连接的空闲时间超过了5分钟
            //或是空闲的连接数超过了限制,就移除
            connections.remove(longestIdleConnection);
          } else if (idleConnectionCount > 0) {
            //如果存在空闲连接但是还没有超过5分钟
            //就返回剩下的时间,便于下次进行清理
            return keepAliveDurationNs - longestIdleDurationNs;
          } else if (inUseConnectionCount > 0) {
            //如果没有空闲的连接,那就等5分钟后再尝试清理
            return keepAliveDurationNs;
          } else {
            //当前没有任何连接,就返回-1,跳出循环
            cleanupRunning = false;
            return -1;
          }
        }
    
        closeQuietly(longestIdleConnection.socket());
    
        // Cleanup again immediately.
        return 0;
    }
    

    下面我们看看判断连接是否是活跃连接的pruneAndGetAllocationCount方法

    private int pruneAndGetAllocationCount(RealConnection connection, long now) {
        List<Reference<StreamAllocation>> references = connection.allocations;
        for (int i = 0; i < references.size(); ) {
            Reference<StreamAllocation> reference = references.get(i);
        
            //如果存在引用,就说明是活跃连接,就继续看下一个StreamAllocation
            if (reference.get() != null) {
                i++;
                continue;
            }
    
          // We've discovered a leaked allocation. This is an application bug.
          //发现泄漏的引用,会打印日志
            StreamAllocation.StreamAllocationReference streamAllocRef =
                (StreamAllocation.StreamAllocationReference) reference;
            String message = "A connection to " + connection.route().address().url()
                + " was leaked. Did you forget to close a response body?";
            Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
            
            //如果没有引用,就移除
            references.remove(i);
            connection.noNewStreams = true;
    
            //如果列表为空,就说明此连接上没有StreamAllocation引用了,就返回0,表示是空闲的连接
            if (references.isEmpty()) {
                connection.idleAtNanos = now - keepAliveDurationNs;
                return 0;
            }
        }
        //遍历结束后,返回引用的数量,说明当前连接是活跃连接
        return references.size();
    }
    

    至此我们就分析完OkHttp3的连接池复用了。

    总结

    (1)OkHttp3中支持5个并发socket连接,默认的keepAlive时间为5分钟,当然我们可以在构建OkHttpClient时设置不同的值。

    (2)OkHttp3通过Deque<RealConnection>来存储连接,通过put、get等操作来管理连接。

    (3)OkHttp3通过每个连接的引用计数对象StreamAllocation的计数来回收空闲的连接,向连接池添加新的连接时会触发执行清理空闲连接的任务。清理空闲连接的任务通过线程池来执行。

    OKHttp3源码解析系列


                        欢迎关注我的微信公众号,和我一起每天进步一点点!
    
    AntDream

    相关文章

      网友评论

        本文标题:OkHttp3源码解析(三)——连接池复用

        本文链接:https://www.haomeiwen.com/subject/lnbdbftx.html