美文网首页
OkHttp源码学习记录(复用连接)

OkHttp源码学习记录(复用连接)

作者: 打工崽 | 来源:发表于2021-03-22 14:20 被阅读0次

    本篇文章通过源码了解OkHttp的复用连接池。为了解决TCP握手和挥手的效率问题,Http有一种叫做keepalive connections的机制;而OkHttp支持5个并发socket的链接,默认keepAlive时间为5分钟,接下来通过源码分析OkHttp是如何复用链接的

    OkHttp的复用连接池

    1. 主要变量与构造方法

    变量

    private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
          Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
    
      /** The maximum number of idle connections for each address. */
      private final int maxIdleConnections;
      private final long keepAliveDurationNs;
      private final Runnable cleanupRunnable = new Runnable() {
        
      private final Deque<RealConnection> connections = new ArrayDeque<>();
      final RouteDatabase routeDatabase = new RouteDatabase();
      boolean cleanupRunning;
    

    说明一下主要变量:
    executor线程池,类似于CachedThreadPool,需要注意的是这种线程池的工作队列采用了没有容量的SynchronousQueue

    Deque,双向队列,双端队列同时具有队列和栈的性质,经常在缓存中被使用,里面维护的RealConnection也就是socket物理连接的包装

    RouteDatabase,它用来记录连接失败的路线名单,当连接失败时就会把失败的线路加进去

    ConnectionPool()构造方法

    public ConnectionPool() {
        this(5, 5, TimeUnit.MINUTES);
      }
    
      public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
        this.maxIdleConnections = maxIdleConnections;
        this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
    
        // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
        if (keepAliveDuration <= 0) {
          throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
        }
      }
    

    构造方法可以看出ConnectionPool默认构造方法里空闲的socket最大连接数为5个,socket的keepAlive时间为5分钟,ConnectionPool是在OkHttpClient实例化时创建的

    public OkHttpClient() {
        this(new Builder());
      }
    
    private OkHttpClient(Builder builder) {
    ···
    this.connectionPool = builder.connectionPool;
    ···
    }
    

    2. 缓存操作

    ConnectionPool提供对Deque<RealConnection>进行操作的方法分别为put、get、connectionBecameIdle和evictAll这几个操作,分别对应放入链接,获取链接,移除链接和移除所有链接操作,这里举例说明put和get操作

    put()方法

    void put(RealConnection connection) {
        assert (Thread.holdsLock(this));
        if (!cleanupRunning) {
          cleanupRunning = true;
          executor.execute(cleanupRunnable);
        }
        connections.add(connection);
      }
    

    在添加到Deque之前首先要清理空闲的线程,这个后面会讲到,再来看get()操作:

    get()方法

    RealConnection get(Address address, StreamAllocation streamAllocation) {
        assert (Thread.holdsLock(this));
        for (RealConnection connection : connections) {
          if (connection.allocations.size() < connection.allocationLimit
              && address.equals(connection.route().address)
              && !connection.noNewStreams) {
            streamAllocation.acquire(connection);
            return connection;
          }
        }
        return null;
      }
    

    for循环里遍历connections缓存列表,if语句判断条件里当某个连接计数的次数小于限制的大小,并且request的地址和缓存列表中此链接的地址完全匹配时,则直接复用缓存列表中的connection作为request的链接。


    3. 自动回收链接

    OkHttp是通过StreamAllocation引用计数是否为0来实现自动回收链接的,我们在put操作前首先要调用executor.execute(cleanupRunnable)来清理闲置线程,我们来看看cleanupRunnable()方法做了什么

    cleanupRunnable()方法

    private final Runnable cleanupRunnable = new Runnable() {
        @Override public void run() {
          while (true) {
            long waitNanos = cleanup(System.nanoTime());
            if (waitNanos == -1) return;
            if (waitNanos > 0) {
              long waitMillis = waitNanos / 1000000L;
              waitNanos -= (waitMillis * 1000000L);
              synchronized (ConnectionPool.this) {
                try {
                  ConnectionPool.this.wait(waitMillis, (int) waitNanos);
                } catch (InterruptedException ignored) {
                }
              }
            }
          }
        }
      };
    

    在while循环里,线程调用cleanup()方法来进行清理,并返回下次需要清理的间隔时间,然后调用wait()方法进行等待以释放锁与时间片,时间到了后再次进行清理,并返回下次要清理的间隔时间,我们来看看cleanup()方法是如何清理的

    cleanup()方法

    long cleanup(long now) {
        int inUseConnectionCount = 0;
        int idleConnectionCount = 0;
        RealConnection longestIdleConnection = null;
        long longestIdleDurationNs = Long.MIN_VALUE;
    
        // Find either a connection to evict, or the time that the next eviction is due.
        synchronized (this) {
          for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
            RealConnection connection = i.next();
    
            // If the connection is in use, keep searching.
            if (pruneAndGetAllocationCount(connection, now) > 0) {
              inUseConnectionCount++;
              continue;
            }
    
            idleConnectionCount++;
    
            // If the connection is ready to be evicted, we're done.
            long idleDurationNs = now - connection.idleAtNanos;
            if (idleDurationNs > longestIdleDurationNs) {
              longestIdleDurationNs = idleDurationNs;
              longestIdleConnection = connection;
            }
          }
    
          if (longestIdleDurationNs >= this.keepAliveDurationNs
              || idleConnectionCount > this.maxIdleConnections) {
            // We've found a connection to evict. Remove it from the list, then close it below (outside
            // of the synchronized block).
            connections.remove(longestIdleConnection);
          } else if (idleConnectionCount > 0) {
            // A connection will be ready to evict soon.
            return keepAliveDurationNs - longestIdleDurationNs;
          } else if (inUseConnectionCount > 0) {
            // All connections are in use. It'll be at least the keep alive duration 'til we run again.
            return keepAliveDurationNs;
          } else {
            // No connections, idle or in use.
            cleanupRunning = false;
            return -1;
          }
        }
    
        closeQuietly(longestIdleConnection.socket());
    
        // Cleanup again immediately.
        return 0;
      }
    

    在for循环外的第1个if块,即从上往下第3个if块中,如果空闲连接keepAlive时间超过5分钟或空闲连接数超过5个,则从Deque中remove()移除此连接。接下来的if语句则根据空闲连接或者活跃连接来返回下次需要清理的时间数

    如果空闲连接 > 0,则返回此链接即将到期的时间
    如果都是活跃连接并且 > 0,则返回默认的keepAlive时间5分钟
    如果没有任何连接,则返回-1

    for循环里的第1个if块中,判断条件通过pruneAndGetAllocationCount()方法来判断连接是否闲置。如果pruneAndGetAllocationCount()方法的返回值 > 0 则是活跃连接,否则就是空闲连接

    总结一下就是cleanup方法根据连接中的引用计数来计算空闲连接数和活跃连接数,然后标记出空闲的连接

    下面我们看看pruneAndGetAllocationCount()方法做了什么

    pruneAndGetAllocationCount()方法

    private int pruneAndGetAllocationCount(RealConnection connection, long now) {
        List<Reference<StreamAllocation>> references = connection.allocations;
        for (int i = 0; i < references.size(); ) {
          Reference<StreamAllocation> reference = references.get(i);
    
          if (reference.get() != null) {
            i++;
            continue;
          }
    
          // We've discovered a leaked allocation. This is an application bug.
          Internal.logger.warning("A connection to " + connection.route().address().url()
              + " was leaked. Did you forget to close a response body?");
          references.remove(i);
          connection.noNewStreams = true;
    
          // If this was the last allocation, the connection is eligible for immediate eviction.
          if (references.isEmpty()) {
            connection.idleAtNanos = now - keepAliveDurationNs;
            return 0;
          }
        }
    
        return references.size();
      }
    }
    

    首先for循环遍历传进来的RealConnection的StreamAllocation列表。

    如果StreamAllocation被使用,则接着遍历下一个StreamAllocation。
    如果StreamAllocation未被使用,则从列表中移除

    最后一个if语句判空操作如果为空则表示此连接没有引用了,返回0表示此连接是空闲练级,否则就返回非0数,表示此连接是活跃连接,那么如何判断StreamAllocation使用与否?往下看


    4. 引用计数

    在OkHttp高层代码调用中,使用了类似引用计数的方式跟着socket流的调用。这里的引用计数对象是SteamAllocation,它被反复执行acquire和release操作,这两个方法其实是在改变RealConnection中的List<Reference<StreamAllocation>>的大小。StreamAllocation的acquire()方法和release()方法如下所示

    acquire()方法

    public void acquire(RealConnection connection) {
        connection.allocations.add(new WeakReference<>(this));
      }
    

    release()方法

    private void release(RealConnection connection) {
        for (int i = 0, size = connection.allocations.size(); i < size; i++) {
          Reference<StreamAllocation> reference = connection.allocations.get(i);
          if (reference.get() == this) {
            connection.allocations.remove(i);
            return;
          }
        }
        throw new IllegalStateException();
      }
    

    RealConnection是socket的物理连接包装,里面维护了List<Reference<StreamAllocation>>的引用,List中的StreamAllocation的数量也就是socket被引用的计数。

    如果计数为0,则说明此连接没有被使用,也就是空闲的,需要通过上文的算法实现回收
    如果计数不为0,则表示上层代码仍在引用,无需关闭连接


    5. 小结

    可以看出连接池复用的核心就是用Deque<RealConnection>来存储连接,通过put(),get(),connectionBecameIdle(),evictAll(),几个操作来对Deque进行操作,另外通过判断连接中的计数对象StreamAllocation来进行自动回收连接。


    同样,我们串联一下调用的方法有哪些:

    用于缓存:
    ConnectionPool()——put()
    ConnectionPool()——get()

    自动回收连接:
    put()——cleanupRunnable()——cleanup()——pruneAndGetAllocationCount()

    引用计数(反复执行):
    StreamAllocation——acquire()
    StreamAllocation——release()


    一些面试题

    结合上文OkHttp源码学习记录(请求网络),我们来解答一下OkHttp相关的一些面试题解

    1. 使用流程

    1. 通过一个构建者模式(Request.Builder)构建所有的request,然后分发到Dispatcher(分发器)

    2. Dispatcher再把request分发到HttpEngine(核心功能类)中,HttpEngine首先要看一下本次请求有没有cache(缓存),如果有缓存,就从缓存中拿到信息,然后返回给response。如果没有缓存,HttpEngine就把request分发到ConnectionPool(连接池)中

    3. 在ConnectionPool(连接池)中,通过Connection发送请求,首先选择Route(路由)和Platfrom(平台),然后到达Server(Socket),获取到Data,然后返回response


    2. 拦截器的种类

    Response getResponseWithInterceptorChain() throws IOException {
        // Build a full stack of interceptors.
        List<Interceptor> interceptors = new ArrayList<>();
        interceptors.addAll(client.interceptors());
        interceptors.add(retryAndFollowUpInterceptor);
        interceptors.add(new BridgeInterceptor(client.cookieJar()));
        interceptors.add(new CacheInterceptor(client.internalCache()));
        interceptors.add(new ConnectInterceptor(client));
        if (!forWebSocket) {
          interceptors.addAll(client.networkInterceptors());
        }
        interceptors.add(new CallServerInterceptor(forWebSocket));
    
        Interceptor.Chain chain = new RealInterceptorChain(
            interceptors, null, null, null, 0, originalRequest);
        return chain.proceed(originalRequest);
      }
    
    1. addInterceptor(Interceptor):这是由开发者设置的,会按照开发者的要求,在所有的拦截器处理之前进行最早的拦截处理,比如一些公共参数,Header都可以在这里添加

    2. RetryAndFollowUpInterceptor:这里会对连接做一些初始化工作,以及请求失败的充实工作,重定向的后续请求工作。主要是构建了一个StreamAllocation管理类对象,维护Connections,Calls,Streams。并且初始化了Socket链接。然后执行下一个BridgeInterceptor,捕捉异常并且重定向,次数不能超过20次

    3. BridgeInterceptor:桥接拦截器,这里会为用户构建一个能够进行网络访问的请求,同时后续工作将网络请求回来的响应Response转化为用户可用的Response,然后进行一些配置的丰富,比如添加文件类型,content-length计算添加,gzip解包

    4. CacheInterceptor:这里主要是处理cache相关处理,会根据OkHttpClient对象的配置以及缓存策略对请求值进行缓存,而且如果本地有了可⽤的Cache,就可以在没有网络交互的情况下就返回缓存结果,采用了DiskLruCache

    5. ConnectInterceptor:这里主要就是负责建立连接了,会建立TCP连接或者TLS连接,以及负责编码解码的HttpCodec。这里体现了多路复用的思想

    6. NetworkInterceptors,这里也是开发者自己设置的,所以本质上和第一个拦截器差不多,但是由于位置不同,所以用处也不同。这个位置添加的拦截器可以看到请求和响应的数据了,所以可以做一些网络调试

    7. CallServerInterceptor:这里就是进行网络数据的请求和响应了,也就是实际的网络I/O操作,通过socket读写数据。


    3. 连接池的意义

    首先明确一个概念,即Socket的建立和断开是非常消耗资源的事情,所以HTTP中的keepAlive连接对于降低延迟和提升速度有非常重要的作用。keepAlive机制就是可以在一次TCP连接中可以持续发送多份数据而不会断开连接。所以连接的多次使用,也就是复用就变得格外重要了。具体的操作也就是对于上文提到的内部维护的双端队列Deque的处理


    本文摘抄自《Android进阶之光——刘望舒》,为自己学习路程中的记录,不以盈利为目的。


    欢迎指正。

    相关文章

      网友评论

          本文标题:OkHttp源码学习记录(复用连接)

          本文链接:https://www.haomeiwen.com/subject/sbezcltx.html