接着上篇:https://www.jianshu.com/p/58421cb7ab57
3.4 ConnectionInterceptor缓存拦截器
主要的类包含ConnectInterceptor,StreamAllocation,RealConnection,ConnectionPool。
86ccf779c3f90700c709088cce54c9f.png
我们先分析ConnectionPool,然后分析ConnectionInterceptor。
3.4.1连接池ConnectionPool
ConnectionPool作用主要是复用Http连接,避免网络连接的时延,以及避免TCP调谐带来的带宽过小的问题。ConnectionPool主要有两个方法
get方法用于获取连接。
put方法用于添加连接。
Get方法:
public final class ConnectionPool {
private final Deque<RealConnection> connections = new ArrayDeque<>();
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
}
遍历连接池的连接,address和route作为参数调用RealConnection对象的方法isEligible来判断连接是否合适。
如果合适,就调用StreamAllocation的acquire方法,然后返回RealConnection对象。当所有的连接都不合适的时候返回null。
我们来看一下StreamAllocation的acquire方法。
public final class StreamAllocation {
public void acquire(RealConnection connection, boolean reportedAcquired) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
this.connection = connection;
this.reportedAcquired = reportedAcquired;
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
}
先判断StreamAllocation的变量connection是否为null。不为null,说明已经包含一个连接了,直接抛出状态异常。
最重要的创建一个StreamAllocationReference对象,StreamAllocationReference是StreamAllocation的弱引用。然后添加到RealConnection对象connection的allocations变量中。这样就可以遍历RealConnection的allocations变量来查看是否被StreamAllocation使用中。
put方法
//ConnectionPool.java
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
put方法比较简单,先是判断cleanupRunning,cleanupRunning表示清理连接的工作正在执行。如果没有执行就调用线程池来执行cleanupRunnable。然后是把RealConnection对象connection添加到变量connections中。
连接清理
看下cleanupRunnable:
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
//通过调用cleanup方法来获取等待时间。
long waitNanos = cleanup(System.nanoTime());
//等待时间是-1直接返回
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
//设置等待时间。
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
leanUp方法会返回三个值,一个是-1直接返回,一个等于0执行循环继续清理,大于0就设置清理的等待时间。
返回值的意思:
-1表示当前没有连接
0表示需要继续清理工作
大于0表示等待的时长
我们来分析一下cleanup方法:
//ConnectionPool.java
long cleanup(long now) {
/**
* 先是定义一些变量。
*
* inUseConnectionCount表示使用中的连接数量。
* idleConnectionCount表示空闲的连接数量。
* longestIdleConnection表示空闲时间最长的连接。
* longestIdleDurationNs表示最长空闲的时间
*/
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
/**
*循环遍历connections,调用pruneAndGetAllocationCount获取
*RealConnection对象的StreamAllocation数量,
*当大于0时,说明存在RealConnection正在使用中,增加inUseConnectionCount。
*如果为0说明RealConnection对象已经空闲,增加idleConnectionCount。
*然后计算连接的空闲的时长(connection.idleAtNanos表示连接空闲的时间点)。
*当空闲时长大于当前的最大值longestIdleDurationNs时,我们赋值longestIdleDurationNs和longestIdleConnection。
*/
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
//说明是连接的空闲时长大于保活的时长或者是空闲数量大于最大的空闲的连接数时,
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
//移除连接,后面关闭它。
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
//返回等待的时间。
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
//连接都在使用中,直接返回保活的时长
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
//当前没有连接,直接返回-1。
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
//关闭最大空闲时长的连接,并返回0,表示继续clean。
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
上述代码已经注释,根据不同的条件,做响应的处理。
我们接下来分析一下pruneAndGetAllocationCount方法的逻辑。
//ConnectionPool.java
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<StreamAllocation>> references = connection.allocations;
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
if (reference.get() != null) {
i++;
continue;
}
// We've discovered a leaked allocation. This is an application bug.
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);
references.remove(i);
connection.noNewStreams = true;
//这段代码会缩短连接的存活时间,使用的是当前时间减去存活时长,
//就说明当前时间点就可以回收。与存活的策略是有冲突的。
// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
RealConnection会弱引用StreamAllocation对象。pruneAndGetAllocationCount就是遍历弱引用是否存在对象。如果存在就说明RealConnection在使用,而没有话的就说明已经不在使用了。
ConnectInterceptor
1c7979d72d3f646ebc8860e6c3a51c7.pngpublic final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//调用StreamAllocation的newStream方法
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
//获取StreamAllocation到RealConnection对象
RealConnection connection = streamAllocation.connection();
//进行下一级的处理
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}
ConnectInterceptor先是获取到RetryAndFollowUpInterceptor创建的StreamAllocation。
接着
调用StreamAllocation对象的newStream方法获取到HttpCodec对象。
调用StreamAllocation的connection的方法。
最后是调用Chain的proceed的方法进行处理。也就是传递给下一个拦截器。
下面一一分析
StreamAllocation的newStream方法
StreamAllocation的newStream方法比较多,分段阅读。
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
//读取超时时间等信息。
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
主要是两个功能:
1.通过findHealthyConnection方法获取连接,
2.通过RealConnection的newCodec方法获取对象,然后返回。
我们来分析一下StreamAllocation的findHealthyConnection方法来获取RealConnection对象。
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
//successCount为0说明连接没使用,因为每次使用后都会增加successCount
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
//判断当前的连接是否健康
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
循环的调用findConnection方法获取RealConnection对象。然后判断RealConnection对象的successCount为0,如果为0说明RealConnection未使用,直接返回。不为0需要通过isHealthy方法判断当前的RealConnection是否是健康,主要是判断Socket是否关闭,以及流是否关闭。
StreamAllocation的findConnection的方法比较多,我们只看关键的代码
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
//容错处理
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// We had an already-allocated connection and it's good.
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}
//连接为空,尝试
if (result == null) {
// 实际是从连接池中获取连接
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
//如果获取到result不为null,就直接返回。
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}
// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
//通过Route从连接池中获取连接对象
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
//从连接池中没有获取到连接
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
//根据Route来再一次从连接池connectionPool中获取对象。如果获取不到,就会创建一个RealConnection对象。
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
//创建RealConnection对象
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}
//如果从连接池中获取到了连接对象,foundPooledConnection为true,就直接的返回了。
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
//foundPooledConnection为false的说明没有从连接池中获取到连接对象,那就是新创建的连接。
//我们需要调用RealConnection的connect进行连接。然后是调用Internal对象的put方法把连接加入到连接池中,最后返回。
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// Pool the connection.
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
接着是分析RealConnection的newCodec方法。
//RealConnection.java
public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, chain, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(chain.readTimeoutMillis());
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
主要是创建HttpCodec。HttpCodec用于编码请求,解码响应。
StreamAllocation的connection方法:
public final class StreamAllocation{
public synchronized RealConnection connection() {
return connection;
}
}
直接返回StreamAllocation的connection变量。
————————————————
参考:
https://blog.csdn.net/wfeii/article/details/88775141
网友评论