背景
最近把Okhttp的源码又整理了下,之前也写过Okhttp源码的文章,我觉得那会对Okhttp的认识不够深入,所以这次还是像炒咸饭一样吗?no-no-no,这次我会整理点精华部分,让大家学习点东西,如标题所示,这次要讨论的话题是Okhttp的连接池怎么工作的,以及它工作的原理,为什么要整理这篇文章呢,因为okhttp的连接池在面试过程中很大可能被问到,因此在这里总结出来,供大家参考。
为了大家更好的理解发起同步和异步的过程,画了张草图给大家,如果有不正确的地方望指出:
- 我们知道Okhttp中通过okhttpClient对象是通过Builder对象初始化出来的,此处Builder的用法是建造者模式,建造者模式主要是分离出外部类的属性初始化,而初始化属性交给了内部类Buidler类,这么做的好处是外部类不用关心属性的初始化。 而在初始化的时候有
interceptors
、networkInterceptors
两种拦截器的初始化,还有dispatcher(分发器)
的初始化,以及后面需要讲到的cache(缓存)
初始化等。- 初始化完了后通过builder的build方法构造出
okhttpClient
对象,该类被称作客户端类,通过它的newCall方法返回RealCall对象,在newCall过程的过程中需要request的信息,request信息包装了url、method、headers、body等信息。最后通过RealCall的同步或异步方法交给了okhttpClient
的dispatcher
来处理,在处理同步或异步之前都会判断有没有正在executed
,所以我们不能对同一个RealCall调用异步或同步方法。- 在异步的时候会把RealCall给包装成一个
AsyncCall
,它是一个runnable对象。接着就来到了分发器异步处理部分,首先会把AsyncCall
加入到readyAsyncCalls
的集合中,该集合表示准备阶段的请求集合,紧接着从runningAsyncCalls(该集合装的都是要即将请求的集合)
和readyAsyncCalls
集合中找相同host的AsyncCall
,如果找到了会把当中记录的相同host的个数给该AsyncCall
。注意这里保存host个数用的原子性的AtomicInteger来记录的- 接着会去判断最大的请求是否大于64以及相同host是否大于5个,这里也是okhttp面试高频知识点,如果都通过的话,会把当前的
AsyncCall
的相同host记录数加一,接着会加入到runningAsyncCalls
集合中,接着循环遍历刚符合条件的AsyncCall
,通过线程池去执行AsyncCall
,注意此处的线程池的配置是没有核心线程,总的线程个数是没有限制的,也就是说都是非核心线程,并且个数没有限制,非核心线程等待的时间是60秒,并且使用的任务队列是SynchronousQueue,它是一个没有容量的阻塞队列,只会当里面没有任务的时候,才能往里面放任务,当放完之后,只能等它的任务被取走才能放,这不就是jdk里面提供的Executors.newCachedThreadPool线程池吗,可能是okhttp想自己定义线程工厂的参数吧,定义线程的名字。- 所以到这里才会进入到子线程,由于
AsyncCall
是一个runnable,因此最终执行来到了它的run方法吧,run方法最终会走到execute
方法,该方法来到了okhttp最有意思的单链表结构的拦截器部分,它会把所有的拦截器组装成一个集合,然后传给RealInterceptorChain
的process
方法,在该方法中,会先把下一个RealInterceptorChain
初始化出来,然后把下一个RealInterceptorChain
传给当前Interceptor的intercept
方法,最终一个个的response返回到AsyncCall
的execute
方法。- 处理完当前的
AsyncCall
后,会交给dispatcher
,它会将该AsyncCall
的host数减一,并且把它从runningAsyncCalls
集合中移除,接着再从readyAsyncCalls
集合中拿剩下的AsyncCall
继续执行,直到执行完readyAsyncCalls
里面的AsyncCall
。
关于okhttp发起异步和同步请求可以看这里OkHttp 源码解析
或者看我之前分析的okhttpOkHttp大流程分析
这就是整个okhttp的执行流程,而最重要的是拦截器部分,我会在这章介绍拦截器的连接池部分,先通过源码的形式介绍它的来龙去脉:
连接池的意义
- 频繁的进行建立Sokcet连接(TCP三次握手)和断开Socket(TCP四次分手)是非常消耗网络资源和浪费时间的,HTTP中的keepalive连接对于 降低延迟和提升速度有非常重要的作用。
- 复用连接就需要对连接进行管理,这里就引入了连接池的概念。
- Okhttp支持5个并发KeepAlive,默认链路生命为5分钟(链路空闲后,保持存活的时间),连接池有ConectionPool实现,对连接进行回收和管理。
概要
连接池部分主要是在RealConnectionPool
类中,该类用connections(双端队列)
存储所有的连接,cleanupRunnable
是专门用来清除超时的RealConnection
,既然有清除的任务,那肯定有清除的线程池,没错,该线程池(executor)
跟okhttp处理异步时候的线程池是一样的,keepAliveDurationNs
表示每一个连接keep-alive的时间,默认是5分钟,maxIdleConnections
连接池的最大容量,默认是5个。RealConnection
中有transmitters
字段,用来保存该连接的transmitter个数,通过里面的transmitter个数来标记该RealConnection
有没有在使用中。
注:如果大家只是想关心面试过程中怎么针对面试官问连接池的问题,可以直接看文章结尾哟
源码分析
1.RealCall#getResponseWithInterceptorChain
拦截器的组装是在RealCall的getResponseWithInterceptorChain
方法中放到集合里面了:
Response getResponseWithInterceptorChain() throws IOException {
//所有拦截器的组装集合
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(new RetryAndFollowUpInterceptor(client));
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
//连接池的使用在这里面
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
try {
Response response = chain.proceed(originalRequest);
return response;
} catch (IOException e) {
} finally {
}
}
}
2.ConnectInterceptor#intercept
连接池在ConnectInterceptor
中包装起来了,我们进去瞄一眼:
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
//拿到chain的Transmitter,里面包装了RealCall、okhttpClient、connectionPool等信息
Transmitter transmitter = realChain.transmitter();
//是否不是get请求
boolean doExtensiveHealthChecks = !request.method().equals("GET");
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
return realChain.proceed(request, transmitter, exchange);
}
3.Transmitter#newExchange
拿到chain的Transmitter
对象,该对象是RealCall
、okhttpClient
、connectionPool
等信息的包装类,将是否不是get请求的标识传给了transmitter
的newExchange
方法:
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
//exchangeFinder是对connectionPool、connectionPool、request等信息的包装,它是在RetryAndFollowUpInterceptor拦截器中初始化出来的
ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
synchronized (connectionPool) {
this.exchange = result;
return result;
}
}
4.ExchangeCodec#find
该方法中将ExchangeCodec
的获取交给了exchangeFinder
对象,ExchangeCodec
是一个接口,实现类有Http2ExchangeCodec
和Http1ExchangeCodec
,这两个类表示http1和http2的建立连接的类,里面实现了writeRequestHeaders
和createRequestBody
等方法,这两个方法是在CallServerInterceptor
拦截器中使用的。exchangeFinder
是对connectionPool、connectionPool、request等信息的包装,它是在RetryAndFollowUpInterceptor拦截器中初始化出来的。我们接着看exchangeFinder
的find方法:
public ExchangeCodec find(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
return resultConnection.newCodec(client, chain);
} catch (RouteException e) {
} catch (IOException e) {
}
}
5.ExchangeCodec#findHealthyConnection
这个方法没什么好说的,就一句,看findHealthyConnection
方法:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
//开启循环找符合条件的RealConnection
while (true) {
//找当前keep-alive有效时间内的连接
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
//如果该连接没有出现过连接失败才会返回,连接失败的标识是在Exchange中处理的,而Exchange中的处理是交给了CallServerInterceptor的
if (candidate.successCount == 0) {
return candidate;
}
}
//如果socket连接断开了或者失败了也认为不是有效的连接
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges();
continue;
}
return candidate;
}
}
6.ExchangeCodec#findConnection
该方法会开启循环来找符合keep-alive有效时间内的连接,紧接着判断它是否在CallServerInterceptor拦截器处理过程中出现连接失败或者是该连接的socket连接断开了也认为不是有效的连接。我们主要看findConnection怎么处理keep-alive
的有效时间的连接:
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
//定义最终找到的那个RealConnection
RealConnection result = null;
Route selectedRoute = null;
//用来标识有没有分配的连接
RealConnection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (transmitter.isCanceled()) throw new IOException("Canceled");
hasStreamFailure = false; // This is a fresh attempt.
//获取当前分配的连接的路由信息
Route previousRoute = retryCurrentRoute()
? transmitter.connection.route()
: null;
//获取有没有分配过的连接
releasedConnection = transmitter.connection;
toClose = transmitter.connection != null && transmitter.connection.noNewExchanges
? transmitter.releaseConnectionNoEvents()
: null;
//看当前正在分配的连接有没有
if (transmitter.connection != null) {
result = transmitter.connection;
releasedConnection = null;
}
//如果上面都没找到从连接池中找连接
if (result == null) {
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true;
result = transmitter.connection;
} else {
selectedRoute = previousRoute;
}
}
}
closeQuietly(toClose);
//有的话直接返回
if (result != null) {
return result;
}
//检查有没有新的路由
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
List<Route> routes = null;
synchronized (connectionPool) {
//如果有新的路由信息
if (newRouteSelection) {
routes = routeSelection.getAll();
//再次从连接池中找
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true;
result = transmitter.connection;
}
}
// 没有从连接池中找到连接
if (!foundPooledConnection) {
//创建连接
result = new RealConnection(connectionPool, selectedRoute);
connectingConnection = result;
}
}
// 从连接池中找到返回连接
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// 准别TCP连接
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
connectionPool.routeDatabase.connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
connectingConnection = null;
//继续从连接池中找一次看有没有符合条件的连接池
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
result.noNewExchanges = true;
socket = result.socket();
result = transmitter.connection;
} else {
//如果不符合条件则把当前连接放到连接池里面
connectionPool.put(result);
}
}
closeQuietly(socket);
return result;
}
首先会先判断有没有被分配的连接,如果没有,从连接池中找符合条件的连接,通过connectionPool.transmitterAcquirePooledConnection
去找,如果返回true说明找到了,找到后会把connection放到transmitter里面,如果连接池里面都没获取到连接,则创建连接,创建完了之后准备TCP连接,接着又从连接池中再次获取一次,如果再获取不成功,就把创建的连接放到连接池里面。 这里面涉及到okhttp的代理和路由的获取,这块笔者也不是很懂,所以不在这里叙述
7.RealConnectionPool#transmitterAcquirePooledConnection
上面我们知道,通过connectionPool.transmitterAcquirePooledConnection
获取连接,通过connectionPool.put
往连接池中放连接,我们先来看如何获取有效的连接的:
boolean transmitterAcquirePooledConnection(Address address, Transmitter transmitter,
@Nullable List<Route> routes, boolean requireMultiplexed) {
//connections是一个Deque的队列,也就是双端队列
for (RealConnection connection : connections) {
//默认requireMultiplexed为false
if (requireMultiplexed && !connection.isMultiplexed()) continue;
//判断当前连接是不是合格的
if (!connection.isEligible(address, routes)) continue;
//给transmitter的connection对象赋值
//向connection的transmitters中添加transmitter对象
transmitter.acquireConnectionNoEvents(connection);
return true;
}
return false;
}
okhttp中连接池存储空间是一个队列,并且该队列是一个ArrayDeque,它是一个双端队列:
private final Deque<RealConnection> connections = new ArrayDeque<>();
8.RealConnection#isEligible
如果连接不是合格的则直接跳过该连接,接着操作了transmitter
的acquireConnectionNoEvents
方法,我们分别看isEligible(合格判断)
和acquireConnectionNoEvents
方法:
boolean isEligible(Address address, @Nullable List<Route> routes) {
// 如果url的host和当前连接的路由中的地址host相同则认为是合格的,说白了就是比较两个连接的host是否相同
//所以连接池复用的连接是以host相同为合格的
if (address.url().host().equals(this.route().address().url().host())) {
return true;
}
return true;
}
关于是否合格的判断我去掉了其他的判断,只留下了host比较的代码,如果两个连接host相同,则认为是要找的连接,否则就不是。
9.Transmitter#acquireConnectionNoEvents
void acquireConnectionNoEvents(RealConnection connection) {
//把当前的连接给Transmitter中
this.connection = connection;
//给当前的连接的transmitters添加一个transmitter,注意这里是通过弱引用来包装的,注意该集合会在后面clean的时候有用到
connection.transmitters.add(new TransmitterReference(this, callStackTrace));
}
这里给当前Transmitter的connection附上当前的connection,因为后面要返回它给result。接着给当前的连接的transmitters集合添加一个弱引用的transmitters对象,在开篇已经说了 realConnection的transmitters集合的个数是在clean的时候用来判断该realConnection有没有正在被用到。
如果上面条件都满足,则transmitterAcquirePooledConnection
返回true,表示从连接池中寻找realConnection成功了。
10.RealConnectionPool#put
说完了从连接池中查找过程,我们接着来到连接池存储realConnection过程,上面已经分析了存储是在RealConnectionPool
的put方法:
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
//如果没有正在清除的工作
if (!cleanupRunning) {
cleanupRunning = true;
//这就是开篇说的清除realConnection的线程池
executor.execute(cleanupRunnable);
}
//清除完了后,把连接加入到队列中
connections.add(connection);
}
11.RealConnectionPool#cleanupRunnable
put过程很简单,先是做清除工作,然后做添加操作,我们着重看下清除怎么实现的,直接看cleanupRunnable
的定义:
private final Runnable cleanupRunnable = () -> {
//开启了死循环进行删除操作
while (true) {
//真正实现清除的方法,并且返回要等多久才能进行下次的清除
long waitNanos = cleanup(System.nanoTime());
//如果返回-1说明没有要清除的realConnection
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (RealConnectionPool.this) {
try {
//这里让当前线程直接在同步代码块里面实现等待,等待时间是waitMillis然后继续执行清除操作
RealConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
};
开启死循环进行删除操作,删除操作是在cleanup
中,该方法会返回要等待的时间才能执行下次的清除操作,如果返回-1说明没有要清除的realConnection了,通过同步代码块指定该线程要等待多少秒才能继续下次清除操作。
12.RealConnectionPool#cleanup
ok,我们把重心放在cleanup方法中:
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
//判断当前连接有没有正在使用
if (pruneAndGetAllocationCount(connection, now) > 0) {
//如果真在使用了inUseConnectionCount加1,并且直接跳出该次循环
inUseConnectionCount++;
continue;
}
//如果不是正在使用的连接,则把该变量加1
idleConnectionCount++;
//算出当前连接已经存活的时间
long idleDurationNs = now - connection.idleAtNanos;
//找出存活最久的那个连接
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
//如果最大存活的连接超过了keep-alive设置的时间或者没有正在使用的连接数超过了maxIdleConnections则从连接池中移除
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {//如果还存在没有正在使用的连接,但是存活时间没超过keep-alive设置的时间并且个数没超过最大的个数,返回还要等多久的时间才能进行下次清除操作
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {//如果都是正在使用的连接,则直接返回keep-alive设置的时间,也就是5分钟
return keepAliveDurationNs;
} else {
//如果连正在使用的连接都没有,则直接返回-1,说明不需要下次的清除操作
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
return 0;
}
清除操作的方法还是蛮清晰的:
- 首先根据
pruneAndGetAllocationCount
方法判断该连接有没有被正在使用,如果是正在使用,则直接跳出该次循环,如果不是则把idleConnectionCount数加1。 - 算出当前连接的已经存活的时间,然后找出存活最久的连接。
- 如果最久的连接大于设置的keep-alive设定的时间或者没有正在使用的连接数超过了maxIdleConnections则从连接池中移除,注意此时方法直接返回0了,继续下一次的清除操作。
- 如果还存在没有正在使用的连接,但是存活时间没超过keep-alive设置的时间并且个数没超过最大的个数,返回还要等多久的时间才能进行下次清除操作。
- 如果都是正在使用的连接,则直接返回keep-alive设置的时间,也就是5分钟。
- 如果连正在使用的连接都没有,则直接返回-1,说明不需要下次的清除操作。
13.RealConnectionPool#pruneAndGetAllocationCount
在清除的时候用pruneAndGetAllocationCount
方法判断该连接是不是正在使用的,还记得上面我们一直讲到了realConnection的transmitters集合了吗,它是存储该连接的transmitter对象,下面我们来看看:
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<Transmitter>> references = connection.transmitters;
for (int i = 0; i < references.size(); ) {
Reference<Transmitter> reference = references.get(i);
//如果Transmitter对象不为空,说明该连接在被使用着
if (reference.get() != null) {
i++;
continue;
}
//如果为空则说明要把当前的弱引用从集合中移除
references.remove(i);
connection.noNewExchanges = true;
//如果RealConnection的transmitters集合为空的,说明它没有被使用,并且设置它的死亡时间,设置返回值为0,说明没有正在使用
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
14.Transmitter#releaseConnectionNoEvents
在上面分析中我们发现只要RealConnection中的transmitters的Transmitter都为空的时候则说明该连接没有在使用中,那是在什么时候都为空呢,我们可以在releaseConnectionNoEvents
方法中找到移除Transmitter
的操作:
Socket releaseConnectionNoEvents() {
int index = -1;
//如果当前的连接和里面的任何一个Transmitter是同一个的时候,则找到了要移除的Transmitter
for (int i = 0, size = this.connection.transmitters.size(); i < size; i++) {
Reference<Transmitter> reference = this.connection.transmitters.get(i);
if (reference.get() == this) {
index = i;
break;
}
}
RealConnection released = this.connection;
//删除当前Transmitter
released.transmitters.remove(index);
this.connection = null;
if (released.transmitters.isEmpty()) {
released.idleAtNanos = System.nanoTime();
if (connectionPool.connectionBecameIdle(released)) {
return released.socket();
}
}
return null;
}
该方法是确认realConnection的transmitters的删除Transmitter的方法,它是在每次进入到关闭连接的时候才可能触发删除Transmitter。所以这里可以看出来在关闭连接的时候connection中的transmitters个数变少。如果connection中的transmitters个数减少到0的时候说明该连接没有被使用了,也就印证了上面在清除过程中可以从连接池中被清除了。而在我们创建RealConnection和获取到连接池中的连接的时候会调用Transmitter
的acquireConnectionNoEvents
方法,该方法会向RealConnection中的transmitters集合添加一个Transmitter对象的弱引用,这也正好和releaseConnectionNoEvents
方法呼应。
总结
关于连接池中的源码部分就介绍这么多,下面我们再来总结下连接池中相关问题:
- 连接池是为了解决频繁的进行建立Sokcet连接(TCP三次握手)和断开Socket(TCP四次分手)。
- Okhttp的连接池支持最大5个链路的keep-alive连接,并且默认keep-alive的时间是5分钟。
- 连接池实现的类是RealConnectionPool,它负责存储与清除的工作,存储是通过ArrayDeque的双端队列存储,删除交给了线程池处理cleanupRunnable的任务。
- 在每次创建RealConnection或从连接池中拿一次RealConnection会给RealConnection的
transmitters集合添加一个若引用的transmitter对象,添加它主要是为了后面判断该连接是否在使用中
- 在连接池中找连接的时候会对比连接池中相同host的连接。
- 如果在连接池中找不到连接的话,会创建连接,创建完后会存储到连接池中。
- 在把连接放入连接池中时,会把清除操作的任务放入到线程池中执行,删除任务中会判断当前连接有没有在使用中,有没有正在使用通过RealConnection的transmitters集合的size是否为0来判断,如果不在使用中,找出空闲时间最长的连接,如果空闲时间最长的连接超过了keep-alive默认的5分钟或者空闲的连接数超过了最大的keep-alive连接数5个的话,会把存活时间最长的连接从连接池中删除。保证keep-alive的最大空闲时间和最大的连接数。
推荐OKHttp相关讲解视频:https://www.bilibili.com/video/BV18f4y1R7Jc
大家如果还想了解更多Android 相关的更多知识点可以点进我的【GitHub】项目中,里面记录了许多的Android 知识点。
网友评论