本篇文章通过源码了解OkHttp的复用连接池。为了解决TCP握手和挥手的效率问题,Http有一种叫做keepalive connections的机制;而OkHttp支持5个并发socket的链接,默认keepAlive时间为5分钟,接下来通过源码分析OkHttp是如何复用链接的
OkHttp的复用连接池
1. 主要变量与构造方法
变量
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */
private final int maxIdleConnections;
private final long keepAliveDurationNs;
private final Runnable cleanupRunnable = new Runnable() {
private final Deque<RealConnection> connections = new ArrayDeque<>();
final RouteDatabase routeDatabase = new RouteDatabase();
boolean cleanupRunning;
说明一下主要变量:
executor线程池,类似于CachedThreadPool,需要注意的是这种线程池的工作队列采用了没有容量的SynchronousQueue
Deque,双向队列,双端队列同时具有队列和栈的性质,经常在缓存中被使用,里面维护的RealConnection也就是socket物理连接的包装
RouteDatabase,它用来记录连接失败的路线名单,当连接失败时就会把失败的线路加进去
ConnectionPool()构造方法
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
构造方法可以看出ConnectionPool默认构造方法里空闲的socket最大连接数为5个,socket的keepAlive时间为5分钟,ConnectionPool是在OkHttpClient实例化时创建的
public OkHttpClient() {
this(new Builder());
}
private OkHttpClient(Builder builder) {
···
this.connectionPool = builder.connectionPool;
···
}
2. 缓存操作
ConnectionPool提供对Deque<RealConnection>进行操作的方法分别为put、get、connectionBecameIdle和evictAll这几个操作,分别对应放入链接,获取链接,移除链接和移除所有链接操作,这里举例说明put和get操作
put()方法
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
在添加到Deque之前首先要清理空闲的线程,这个后面会讲到,再来看get()操作:
get()方法
RealConnection get(Address address, StreamAllocation streamAllocation) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.allocations.size() < connection.allocationLimit
&& address.equals(connection.route().address)
&& !connection.noNewStreams) {
streamAllocation.acquire(connection);
return connection;
}
}
return null;
}
for循环里遍历connections缓存列表,if语句判断条件里当某个连接计数的次数小于限制的大小,并且request的地址和缓存列表中此链接的地址完全匹配时,则直接复用缓存列表中的connection作为request的链接。
3. 自动回收链接
OkHttp是通过StreamAllocation引用计数是否为0来实现自动回收链接的,我们在put操作前首先要调用executor.execute(cleanupRunnable)来清理闲置线程,我们来看看cleanupRunnable()方法做了什么
cleanupRunnable()方法
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
在while循环里,线程调用cleanup()方法来进行清理,并返回下次需要清理的间隔时间,然后调用wait()方法进行等待以释放锁与时间片,时间到了后再次进行清理,并返回下次要清理的间隔时间,我们来看看cleanup()方法是如何清理的
cleanup()方法
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
在for循环外的第1个if块,即从上往下第3个if块中,如果空闲连接keepAlive时间超过5分钟或空闲连接数超过5个,则从Deque中remove()移除此连接。接下来的if语句则根据空闲连接或者活跃连接来返回下次需要清理的时间数
如果空闲连接 > 0,则返回此链接即将到期的时间
如果都是活跃连接并且 > 0,则返回默认的keepAlive时间5分钟
如果没有任何连接,则返回-1
for循环里的第1个if块中,判断条件通过pruneAndGetAllocationCount()方法来判断连接是否闲置。如果pruneAndGetAllocationCount()方法的返回值 > 0 则是活跃连接,否则就是空闲连接
总结一下就是cleanup方法根据连接中的引用计数来计算空闲连接数和活跃连接数,然后标记出空闲的连接
下面我们看看pruneAndGetAllocationCount()方法做了什么
pruneAndGetAllocationCount()方法
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<StreamAllocation>> references = connection.allocations;
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
if (reference.get() != null) {
i++;
continue;
}
// We've discovered a leaked allocation. This is an application bug.
Internal.logger.warning("A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?");
references.remove(i);
connection.noNewStreams = true;
// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
}
首先for循环遍历传进来的RealConnection的StreamAllocation列表。
如果StreamAllocation被使用,则接着遍历下一个StreamAllocation。
如果StreamAllocation未被使用,则从列表中移除
最后一个if语句判空操作如果为空则表示此连接没有引用了,返回0表示此连接是空闲练级,否则就返回非0数,表示此连接是活跃连接,那么如何判断StreamAllocation使用与否?往下看
4. 引用计数
在OkHttp高层代码调用中,使用了类似引用计数的方式跟着socket流的调用。这里的引用计数对象是SteamAllocation,它被反复执行acquire和release操作,这两个方法其实是在改变RealConnection中的List<Reference<StreamAllocation>>的大小。StreamAllocation的acquire()方法和release()方法如下所示
acquire()方法
public void acquire(RealConnection connection) {
connection.allocations.add(new WeakReference<>(this));
}
release()方法
private void release(RealConnection connection) {
for (int i = 0, size = connection.allocations.size(); i < size; i++) {
Reference<StreamAllocation> reference = connection.allocations.get(i);
if (reference.get() == this) {
connection.allocations.remove(i);
return;
}
}
throw new IllegalStateException();
}
RealConnection是socket的物理连接包装,里面维护了List<Reference<StreamAllocation>>的引用,List中的StreamAllocation的数量也就是socket被引用的计数。
如果计数为0,则说明此连接没有被使用,也就是空闲的,需要通过上文的算法实现回收
如果计数不为0,则表示上层代码仍在引用,无需关闭连接
5. 小结
可以看出连接池复用的核心就是用Deque<RealConnection>来存储连接,通过put(),get(),connectionBecameIdle(),evictAll(),几个操作来对Deque进行操作,另外通过判断连接中的计数对象StreamAllocation来进行自动回收连接。
同样,我们串联一下调用的方法有哪些:
用于缓存:
ConnectionPool()——put()
ConnectionPool()——get()
自动回收连接:
put()——cleanupRunnable()——cleanup()——pruneAndGetAllocationCount()
引用计数(反复执行):
StreamAllocation——acquire()
StreamAllocation——release()
一些面试题
结合上文OkHttp源码学习记录(请求网络),我们来解答一下OkHttp相关的一些面试题解
1. 使用流程
-
通过一个构建者模式(Request.Builder)构建所有的request,然后分发到Dispatcher(分发器)
-
Dispatcher再把request分发到HttpEngine(核心功能类)中,HttpEngine首先要看一下本次请求有没有cache(缓存),如果有缓存,就从缓存中拿到信息,然后返回给response。如果没有缓存,HttpEngine就把request分发到ConnectionPool(连接池)中
-
在ConnectionPool(连接池)中,通过Connection发送请求,首先选择Route(路由)和Platfrom(平台),然后到达Server(Socket),获取到Data,然后返回response
2. 拦截器的种类
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
-
addInterceptor(Interceptor):这是由开发者设置的,会按照开发者的要求,在所有的拦截器处理之前进行最早的拦截处理,比如一些公共参数,Header都可以在这里添加
-
RetryAndFollowUpInterceptor:这里会对连接做一些初始化工作,以及请求失败的充实工作,重定向的后续请求工作。主要是构建了一个StreamAllocation管理类对象,维护Connections,Calls,Streams。并且初始化了Socket链接。然后执行下一个BridgeInterceptor,捕捉异常并且重定向,次数不能超过20次
-
BridgeInterceptor:桥接拦截器,这里会为用户构建一个能够进行网络访问的请求,同时后续工作将网络请求回来的响应Response转化为用户可用的Response,然后进行一些配置的丰富,比如添加文件类型,content-length计算添加,gzip解包
-
CacheInterceptor:这里主要是处理cache相关处理,会根据OkHttpClient对象的配置以及缓存策略对请求值进行缓存,而且如果本地有了可⽤的Cache,就可以在没有网络交互的情况下就返回缓存结果,采用了DiskLruCache
-
ConnectInterceptor:这里主要就是负责建立连接了,会建立TCP连接或者TLS连接,以及负责编码解码的HttpCodec。这里体现了多路复用的思想
-
NetworkInterceptors,这里也是开发者自己设置的,所以本质上和第一个拦截器差不多,但是由于位置不同,所以用处也不同。这个位置添加的拦截器可以看到请求和响应的数据了,所以可以做一些网络调试
-
CallServerInterceptor:这里就是进行网络数据的请求和响应了,也就是实际的网络I/O操作,通过socket读写数据。
3. 连接池的意义
首先明确一个概念,即Socket的建立和断开是非常消耗资源的事情,所以HTTP中的keepAlive连接对于降低延迟和提升速度有非常重要的作用。keepAlive机制就是可以在一次TCP连接中可以持续发送多份数据而不会断开连接。所以连接的多次使用,也就是复用就变得格外重要了。具体的操作也就是对于上文提到的内部维护的双端队列Deque的处理
本文摘抄自《Android进阶之光——刘望舒》,为自己学习路程中的记录,不以盈利为目的。
欢迎指正。
网友评论