上节我们讲解了Okhttp网络请求和响应的处理过程,其中我们知道了请求之前是需要建立网络连接的,也就是http请求是需要建立TCP连接之上的。这也是符合TCP/IP四层模型和OSI七层模型中,传输层的TCP协议,应用层的HTTP协议应用。Okhttp在网络连接的管理方面有哪些特性和优势呢?我们来列举一下。
- 网络连接池的引入和管理机制。内部维护网络连接池,查找当前请求是否有对应可用的连接,避免每次请求重新建立和断开TCP连接。
- 网络连接智能路由机制。重试查找可用IP,建立连接,同时记录连接失败的IP,避免重复请求无效IP。
- 支持主流HTTP协议,HTTPS请求和Proxy代理服务。支持HTTP 1.1/2和SPDY协议,支持HTTPS,支持HTTP代理和SOCKS代理。
下面我们围绕这3点,去探究它们是如何实现的。
网络连接的引入
上节我们知道网络请求到查找可用连接的流程是这样的。
QQ截图20171108180536.png
那我们就从StreamAllocation的newStream看看它是如何创建用于HTTP通信的封装流对象,也就是获取用于当前HTTP通信的TCP连接。这里分为两步。
- 查找可用连接。
- 这里会从连接池中查找可用的连接,没有找到则新建一个连接对象(RealConnection),加入到连接池(ConnectionPool),然后调用连接,最后返回连接完成的连接对象(RealConnection)。
- 创建基于TCP连接之上的用于HTTP流操作的封装对象。
- 这里根据获取的连接对象(RealConnection)是否包含framedConnection帧流对象来判断创建Http2xStream还是Http1xStream对象。Http1xStream用于HTTP 1.1协议的数据传输,而Http2xStream用于HTTP 2/SPDY协议的数据传输。
public HttpStream newStream(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws RouteException, IOException {
try {
//查找可用的连接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
//创建HTTP封装流对象,这里分为Http1xStream和Http2xStream两种
HttpStream resultStream;
if (resultConnection.framedConnection != null) {
//Http1xStream用于HTTP 1.1协议的HTTP流封装
resultStream = new Http2xStream(this, resultConnection.framedConnection);
} else {
//Http2xStream用于HTTP 2/SPDY协议的HTTP流封装.
resultConnection.socket().setSoTimeout(readTimeout);
resultConnection.source.timeout().timeout(readTimeout, MILLISECONDS);
resultConnection.sink.timeout().timeout(writeTimeout, MILLISECONDS);
resultStream = new Http1xStream(this, resultConnection.source, resultConnection.sink);
}
synchronized (connectionPool) {
stream = resultStream;
return resultStream;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
网络连接池的引入和管理机制
StreamAllocation.findHealthyConnection查找可用连接
从findHealthyConnection就开始进入网络连接的获取,以及网络连接池管理的范围了。
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException, RouteException {
//循环,直到找到可用连接
while (true) {
//查找连接
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
//表示它是一个新创建的连接,默认为可用,返回它
if (candidate.successCount == 0) {
return candidate;
}
}
//表示它是一个可重用的连接,返回它
// Otherwise do a potentially-slow check to confirm that the pooled connection is still good.
if (candidate.isHealthy(doExtensiveHealthChecks)) {
return candidate;
}
//当前获取的连接不可用,记录它,下次获取连接屏蔽它,节省时间。
connectionFailed(new IOException());
}
}
这里获取健康可用的连接,是在while循环中进行的,意味着会重试多次查找,直到找到可用的连接。也就是说,当找到一个新创建的连接或者是验证了是可重用的连接,就代表找到可用连接了,否则将当前找到的不可用连接加入黑名单,避免下次重复获取它,造成后续的无效请求。
StreamAllocation.findConnection查找连接
接下来看findConnection是如何查找连接的,这里分为以下几步。
- 如果当前的连接是空闲的,则直接使用该连接(RealConnection)。
- 根据请求地址(Address)从连接池查找缓存的连接(RealConnection)。
- 接如果没有从连接池找到,则从路由选择器(RouteSelector)中获取其中的下一个路由(Route),然后用这个路由去创建连接对象(RealConnection)。
- 将当前的StreamAllocation流分配对象记录在这个新创建的连接对象中。这里的用意是记录当前的连接对象(RealConnection)之上分配了多少个请求,连接上没有分配请求,连接池会考虑回收这个连接。采用了引用计数来管理连接的回收。
- 将新创建的连接加入到连接池。
- 新创建的连接对象(RealConnection)开始建立连接。
- 连接建立了,把这个连接对应的路由信息从黑名单中移除,以便以后可以继续访问它。
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException, RouteException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (stream != null) throw new IllegalStateException("stream != null");
if (canceled) throw new IOException("Canceled");
//当前连接空闲,重用当前连接
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
//连接池查找Adress对应的连接
// Attempt to get a connection from the pool.
RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);
if (pooledConnection != null) {
this.connection = pooledConnection;
return pooledConnection;
}
selectedRoute = route;
}
//没有查找到,从路由选择器中找到下一个路由信息
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
synchronized (connectionPool) {
route = selectedRoute;
}
}
//根据路由信息创建新连接
RealConnection newConnection = new RealConnection(selectedRoute);
//增加当前请求对新连接的引用。引用计数的方式。
acquire(newConnection);
//新连接添加到连接池
synchronized (connectionPool) {
Internal.instance.put(connectionPool, newConnection);
this.connection = newConnection;
if (canceled) throw new IOException("Canceled");
}
//建立连接
newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(),
connectionRetryEnabled);
//将新连接的路由信息预先从黑名单移除
routeDatabase().connected(newConnection.route());
//返回连接
return newConnection;
}
总体说就是先从连接池获取该Address对应的连接,这里不管它是否是有效的。如果获取不到,则新建一个连接,新建的连接要做4件事情。
- 通过acquire记录,表示这个连接之上分配了一个流对象了。
- 将新创建的连接加入到连接池,交给连接池管理。
- 建立连接。
- 预先把这个连接对应的路由信息从黑名单删除,待后面findHealthyConnection健康连接检查时,如果不符合,再重新加入黑名单,这是对路由信息黑名单的管理。先从黑名单删除,判断不可用之后再加入黑名单。
到这里我想大家应该能想到连接池中的连接是采用引用计数的方式来记录是否空闲,那它到底是怎么管理的呢?我们继续分析
ConnectionPool网络连接池
我们先分析ConnectionPool的属性和构造方法。
public final class ConnectionPool {
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
*/
//核心线程数为0,空闲存活时间是60秒
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
/** The maximum number of idle connections for each address. */
private final int maxIdleConnections;
private final long keepAliveDurationNs;
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
private final Deque<RealConnection> connections = new ArrayDeque<>();
final RouteDatabase routeDatabase = new RouteDatabase();
boolean cleanupRunning;
/**
* Create a new connection pool with tuning parameters appropriate for a single-user application.
* The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
* this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
*/
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
}
主要描述了以下内容:
- 用于清理连接的线程池executor,以及进行清理操作的cleanupRunnable,cleanupRunnable中的while循环和wait等待配合,直到完成了清理操作。
- 最大空闲连接数,最长空闲连接时间,当前是否正在清理。
- 默认的构造方法是,支持最多5个空闲连接,如果有一个空闲连接超过5分钟,就从连接队列里将它移除并关闭连接。
我们看缓存连接的方法put
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
- 首先判断是否正在清理,如果没有,则将清理任务放入线程池执行。
- 会将连接添加到连接队列里。
清理连接的线程模型
配合之前的线程池executor定义,我们能得出,因为cleanupRunning的判断,线程池中通常只会有一个线程进行清理工作,就是说清理工作使用的是单线程模型,这里没有直接new一个线程,而是使用这个线程池,是因为这个线程池定义了最大空闲线程数是0,最长空闲时间是60秒,也就是说,当前清理线程空闲时,超过60秒,线程池会释放该线程,如果在时间之内,线程可以重用,兼顾了线程重用和无用超时时释放的优点。
连接的清理机制
我们接下来看清理工作是如果进行的,看cleanup方法
/**
* Performs maintenance on this pool, evicting the connection that has been idle the longest if
* either it has exceeded the keep alive limit or the idle connections limit.
*
* <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
* -1 if no further cleanups are required.
*/
long cleanup(long now) {
//正在使用连接的数量
int inUseConnectionCount = 0;
//空闲连接的数量
int idleConnectionCount = 0;
//最长空闲时间的连接
RealConnection longestIdleConnection = null;
//当前最长的空闲时间
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
//这里表示当前连接之上的流请求分配数量,大于0表示当前连接不是空闲的
// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
//这里是在for循环中找到空闲时间最长连接,它的空闲时间
// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
//1. 如果当前的最长空闲时间超过了最长空闲时间阈值,移除并清理
//2. 如果空闲连接数超过最大空闲连接数,找到空闲最长的那个连接,移除并清理
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
//表示有空闲连接,那就等待它们到达最大空闲时间阈值之后再试试
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
//表示所有连接都在使用,那就等待最大空闲时间再试试
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
//否则就表示清理完成了
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}
//关闭这个移除的连接
closeQuietly(longestIdleConnection.socket());
//这里返回的是一个等待时间,配合cleanupRunnable中的while循环,实现多次执行清理工作
// Cleanup again immediately.
return 0;
}
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
//while循环调用cleanup
long waitNanos = cleanup(System.nanoTime());
//当返回-1,也就是标记清理完成时,停止清理
if (waitNanos == -1) return;
//等待指定的时间,执行下一次cleanup清理操作
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
从以上代码可以看到,这里在三种情况下会多次执行。
- 连接的空闲时间大于最大空闲时间,也就是说只要空闲时间超过了,就会去清理。
- 空闲连接数据大于最大空闲连接数,也就是说空闲连接数超过了,那就会去清理,直到没有超过最大空闲连接数。
- 如果还是有空闲连接,但是没超过最大空闲连接,那我就等那个空闲最长的连接到达最大空闲时间,然后再次调用cleanup清理它。
- 如果没有空闲连接的话,那我就等个最大空闲时间,默认的话就是5分钟之后,再试试有没有可以清理的。
综上所述,清理只要开始了,那么它就会一直守候着,没达到它的清理条件,就继续等待,等待下次继续试试能不能清理,直到连接池中没有一个连接存在了,才停止清理工作。
连接是否空闲的判断机制
我们接下来分析是如何判断一个连接是否是空闲还是正在被使用着的。
/**
* Prunes any leaked allocations and then returns the number of remaining live allocations on
* {@code connection}. Allocations are leaked if the connection is tracking them but the
* application code has abandoned them. Leak detection is imprecise and relies on garbage
* collection.
*/
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
//这里获取当前连接上所分配流的弱引用列表
List<Reference<StreamAllocation>> references = connection.allocations;
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);
if (reference.get() != null) {
i++;
continue;
}
//如果发现了弱引用指向的流分配对象为空,表示流分配对象泄漏了,这时候要从连接中移除,并标记连接为没有新的分配流
// We've discovered a leaked allocation. This is an application bug.
Internal.logger.warning("A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?");
references.remove(i);
connection.noNewStreams = true;
//引用列表为空,说明连接上没有分配流,是空闲连接
// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}
return references.size();
}
这里会判断连接中是否存在泄漏的分配流,如果存在分配流的泄漏,就标记该连接不能再分配流了。再分析获取连接的方法
/** Returns a recycled connection to {@code address}, or null if no such connection exists. */
RealConnection get(Address address, StreamAllocation streamAllocation) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.allocations.size() < connection.allocationLimit
&& address.equals(connection.route().address)
&& !connection.noNewStreams) {
streamAllocation.acquire(connection);
return connection;
}
}
return null;
}
遍历连接队列,如果该连接的流分配数量没有超过限制,然后路由信息一致,并且连接中没有发生流的泄漏,可以分配新的流,就代表找到可用的连接了。然后acquire表示要增加对这个连接的引用。路由信息是否一样的判断
public final class Address {
@Override public boolean equals(Object other) {
if (other instanceof Address) {
Address that = (Address) other;
return this.url.equals(that.url)
&& this.dns.equals(that.dns)
&& this.proxyAuthenticator.equals(that.proxyAuthenticator)
&& this.protocols.equals(that.protocols)
&& this.connectionSpecs.equals(that.connectionSpecs)
&& this.proxySelector.equals(that.proxySelector)
&& equal(this.proxy, that.proxy)
&& equal(this.sslSocketFactory, that.sslSocketFactory)
&& equal(this.hostnameVerifier, that.hostnameVerifier)
&& equal(this.certificatePinner, that.certificatePinner);
}
return false;
}
}
可以看到会匹配url,dns,协议,代理等等内容是否相同。到这里清楚了连接池管理连接的原理了。那么连接上分配的流是在哪里释放的呢?
连接上分配流的引用
我们知道一个连接是否空闲是根据它上面的分配流引用数来判断的,采用弱引用是为了不影响分配流的回收。我们来分析它
public final class StreamAllocation {
//这里为连接添加分配流的弱引用
/**
* Use this allocation to hold {@code connection}. Each call to this must be paired with a call to
* {@link #release} on the same connection.
*/
public void acquire(RealConnection connection) {
connection.allocations.add(new WeakReference<>(this));
}
//这里移除连接的该分配流引用
/** Remove this allocation from the connection's list of allocations. */
private void release(RealConnection connection) {
for (int i = 0, size = connection.allocations.size(); i < size; i++) {
Reference<StreamAllocation> reference = connection.allocations.get(i);
if (reference.get() == this) {
connection.allocations.remove(i);
return;
}
}
throw new IllegalStateException();
}
}
连接上分配流的释放
我们知道在获取可重用的连接和新建连接时都会调用acquire来标记当前请求要对连接作一次引用,即需要在该连接上分配流,进行数据请求。那么release释放连接引用是在哪里调用的呢?这里指出一条释放的支线流程。
微信截图_20171108180548.png
在Http1xStream这个这个HTTP 1.1协议的流传输对象中,其中的FixedLengthSource对象读取响应流完成时,会调用StreamAllocation的streamFinished标记流读取完成,接着标记释放当前流到release。我们分析Http1xStream
public final class Http1xStream implements HttpStream {
/** An HTTP body with a fixed length specified in advance. */
private class FixedLengthSource extends AbstractSource {
private long bytesRemaining;
public FixedLengthSource(long length) throws IOException {
bytesRemaining = length;
if (bytesRemaining == 0) {
endOfInput(true);
}
}
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
if (closed) throw new IllegalStateException("closed");
if (bytesRemaining == 0) return -1;
long read = source.read(sink, Math.min(bytesRemaining, byteCount));
//读取响应流完成
if (read == -1) {
endOfInput(false); // The server didn't supply the promised content length.
throw new ProtocolException("unexpected end of stream");
}
bytesRemaining -= read;
if (bytesRemaining == 0) {
endOfInput(true);
}
return read;
}
@Override public void close() throws IOException {
if (closed) return;
//关闭流
if (bytesRemaining != 0 && !Util.discard(this, DISCARD_STREAM_TIMEOUT_MILLIS, MILLISECONDS)) {
endOfInput(false);
}
closed = true;
}
}
private abstract class AbstractSource implements Source {
protected final ForwardingTimeout timeout = new ForwardingTimeout(source.timeout());
protected boolean closed;
@Override public Timeout timeout() {
return timeout;
}
/**
* Closes the cache entry and makes the socket available for reuse. This should be invoked when
* the end of the body has been reached.
*/
protected final void endOfInput(boolean reuseConnection) throws IOException {
if (state == STATE_CLOSED) return;
if (state != STATE_READING_RESPONSE_BODY) throw new IllegalStateException("state: " + state);
detachTimeout(timeout);
//标记关闭状态,标记流关闭
state = STATE_CLOSED;
if (streamAllocation != null) {
streamAllocation.streamFinished(!reuseConnection, Http1xStream.this);
}
}
}
}
接着是StreamAllocation
public final class StreamAllocation {
public void streamFinished(boolean noNewStreams, HttpStream stream) {
synchronized (connectionPool) {
if (stream == null || stream != this.stream) {
throw new IllegalStateException("expected " + this.stream + " but was " + stream);
}
if (!noNewStreams) {
//如果连接上可以创建新的流,标记连接上成功请求的次数
connection.successCount++;
}
}
//标记释放流
deallocate(noNewStreams, false, true);
}
/**
* Releases resources held by this allocation. If sufficient resources are allocated, the
* connection will be detached or closed.
*/
private void deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
RealConnection connectionToClose = null;
synchronized (connectionPool) {
if (streamFinished) {
this.stream = null;
}
if (released) {
this.released = true;
}
if (connection != null) {
if (noNewStreams) {
connection.noNewStreams = true;
}
if (this.stream == null && (this.released || connection.noNewStreams)) {
//这里释放了当前分配流对该连接的引用
release(connection);
if (connection.allocations.isEmpty()) {
connection.idleAtNanos = System.nanoTime();
//如果连接上的没有分配流的引用了,说明连接空闲了,会调用连接池的connectionBecameIdle,通知有连接空闲了
if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
connectionToClose = connection;
}
}
connection = null;
}
}
}
//如果连接池标记要移除该连接了,则立即关闭该连接
if (connectionToClose != null) {
Util.closeQuietly(connectionToClose.socket());
}
}
}
分析ConnectionPool的connectionBecameIdle方法
/**
* Notify this pool that {@code connection} has become idle. Returns true if the connection has
* been removed from the pool and should be closed.
*/
boolean connectionBecameIdle(RealConnection connection) {
assert (Thread.holdsLock(this));
if (connection.noNewStreams || maxIdleConnections == 0) {
//如果连接上有流泄漏了,不会再分配新的流了,移除,返回true交给后面去关闭
//最大空闲数为0,不保留空闲连接,移除,返回true交给后面去关闭
connections.remove(connection);
return true;
} else {
//否则,唤醒清理线程,交给清理线程去关闭。
notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
return false;
}
}
我们发现,原来当一个连接空闲时,除了连接池中的清理线程负责清理关闭之外,本身在StreamAllocation的deallocate中也会直接对判定为空闲的连接进行关闭操作,这样做可以尽快地关闭空闲的连接。
下节分析
本来打算这节一起讲连接的建立和管理,犹豫篇幅原因和连接建立过程的重要性,觉得有必要单独一节进行讲解,这节就到这吧,嘿嘿。
网友评论