tcp的三次握手及四次挥手相信很多人都比较熟悉,而在Okhttp中,也有对于连接的管理部分。本文主要分析的是Okhttp是如何管理TCP三次握手四次挥手的,或者说Okhttp的连接管理。
本文基于okhttp 3.14.9
gradle依赖:implementation group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.14.9'
Okhttp的拦截器链(责任链)
Okhttp最为经典的就是以责任链模式
设计的拦截器链。Okhttp内部是有线程池触发每个RealCall对象继而触发网络请求的,如果您是异步调用的网络请求RealCall.enqueue()
,在AsyncCall.execute()
方法中就会调用个以下方法。即组装责任链链路interceptors集合
,并发起网络请求。值得一题的是在整个过程中都是通过chain.proceed(originalRequest)
的形式将责任链遍历。而本文的主角ConnectInterceptor
就由此诞生,该interceptor就是负责http连接的。
// RealCall.java 210行
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(new RetryAndFollowUpInterceptor(client));
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
boolean calledNoMoreExchanges = false;
try {
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}
Okhttp的连接管理
ConnectInterceptor
// ConnectInterceptor.java
/** Opens a connection to the target server and proceeds to the next interceptor. */
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
Transmitter transmitter = realChain.transmitter();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
Exchange exchange = transmitter.newExchange(chain, doExtensiveHealthChecks);
return realChain.proceed(request, transmitter, exchange);
}
}
通过源码可知,ConnectInterceptor的代码比较简单,简单可理解为:新建一个Exchange对象,然后往下传递,这时客户端与服务器的连接已经建立,接下来将会是真正的数据传递。所以这里的重点就在transmitter.newExchange(chain, doExtensiveHealthChecks)
里面了。
简单介绍一下Transmitter和Exchange
- Transmitter:OkHttp的应用层和网络层之间的桥梁。里面包含着OkHttpClient对象、RealConnectionPool连接池、该次请求的Call对象、事件监听的EventListener对象、AsyncTimeout对象用于管理超时。Transmitter对象可以理解为一个管理类,关联着整个网络请求的
应用层
和网络层
的对接。- Exchange:管理网络请求的
request
和response
,可以理解为网络层
,Transmitter中也包含这此对象。实际上,真正处理网络请求的request和response的是其里面的ExchangeCodec
对象。
从结果来看,一个Exchange
对象就代表着一个与服务器的连接。带着这个结论我们继续往下看。
Transmitter.newExchange
// Transmitter.java 158行
/** Returns a new exchange to carry a new request and response. */
Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
synchronized (connectionPool) {
if (noMoreExchanges) {
throw new IllegalStateException("released");
}
if (exchange != null) {
throw new IllegalStateException("cannot make a new request because the previous response "
+ "is still open: please call response.close()");
}
}
ExchangeCodec codec = exchangeFinder.find(client, chain, doExtensiveHealthChecks);
Exchange result = new Exchange(this, call, eventListener, exchangeFinder, codec);
synchronized (connectionPool) {
this.exchange = result;
this.exchangeRequestDone = false;
this.exchangeResponseDone = false;
return result;
}
}
newExchange
方法的核心就是调用exchangeFinder.find()
方法找到一个合适的连接,即找到一个合适的ExchangeCodec
对象继而生成一个Exchange
对象返回。
ExchangeFinder.find
// ExchangeFinder.java 79行
public ExchangeCodec find(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
return resultConnection.newCodec(client, chain);
} catch (RouteException e) {
trackFailure();
throw e;
} catch (IOException e) {
trackFailure();
throw new RouteException(e);
}
}
重点关注的是 RealConnection resultConnection = findHealthyConnection();
,这里可以理解为通过findHealthyConnection()
获取可用的RealConnection
对象,再借助RealConnection
新建一个ExchangeCodec
对象用于网络请求的交互。
RealConnection 是一次连接的抽象。
Exchange or ExchangeCodec 为负责http请求的request和reponse的交互。
所以,要想了解Okhttp如何管理连接的,就需要看RealConnection的查找过程。
查找RealConnection
在Okhttp中是有利用连接池复用连接的设计的。RealConnectionPool
就是担任着这一角色,其内部维护了一个双端队列Deque connections
用来缓存可复用的连接。通过maxIdleConnections
最大空闲连接数以及keepAliveDurationNs
连接保活时长这俩参数来控制连接的释放。RealConnectionPool
内部维护一个线程池,定期释放无用的RealConnectionPool
连接。 回到查找可用RealConnection的逻辑当中,之前讲到的findHealthyConnection
方法只是对查找过程findConnection
的一次封装,它会死循环,直到获取到可用的RealConnection为止。
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0 && !candidate.isMultiplexed()) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
candidate.noNewExchanges();
continue;
}
return candidate;
}
}
然后就正式进入查找阶段了,findConnection
方法:
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
// 查找RealConnection结果
RealConnection result = null;
Route selectedRoute = null;
RealConnection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (transmitter.isCanceled()) throw new IOException("Canceled");
hasStreamFailure = false; // This is a fresh attempt.
。。。
// [1] transmitter对象本身记录的RealConnection可复用
if (transmitter.connection != null) {
// We had an already-allocated connection and it's good.
result = transmitter.connection;
releasedConnection = null;
}
if (result == null) {
// Attempt to get a connection from the pool.
// [2] 通过当前请求地址在连接池connectionPool中寻找到可复用的RealConnection
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) {
foundPooledConnection = true;
result = transmitter.connection;
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry;
nextRouteToTry = null;
} else if (retryCurrentRoute()) {
selectedRoute = transmitter.connection.route();
}
}
}
。。。
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}
// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
List<Route> routes = null;
synchronized (connectionPool) {
if (transmitter.isCanceled()) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
routes = routeSelection.getAll();
// [3] 遍历全局的路由路径,在连接池connectionPool中寻找到可复用的RealConnection
if (connectionPool.transmitterAcquirePooledConnection(
address, transmitter, routes, false)) {
foundPooledConnection = true;
result = transmitter.connection;
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
// [4] 若前面[1][2][3]都未找到时会新建一个RealConnection对象,后续请求服务器连接。
result = new RealConnection(connectionPool, selectedRoute);
connectingConnection = result;
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// Do TCP + TLS handshakes. This is a blocking operation.
// [4] 前面[1][2][3]都未找到时,当前的result为全新RealConnection对象,
// 这里的connect方法就会进行TCP的3次握手,和SSL/TLS。
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
connectionPool.routeDatabase.connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
connectingConnection = null;
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
result.noNewExchanges = true;
socket = result.socket();
result = transmitter.connection;
// It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
// that case we will retry the route we just successfully connected with.
nextRouteToTry = selectedRoute;
} else {
connectionPool.put(result);
transmitter.acquireConnectionNoEvents(result);
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
findConnection
方法较长,可以寻找注释中的[1]、[2]、[3]、[4],即为每种获取到可用RealConnection对象情况。
- [1] transmitter对象本身记录的RealConnection可复用。
- [2] 通过当前请求地址在连接池connectionPool中寻找到可复用的RealConnection。
- [3] 遍历全局的路由路径,在连接池connectionPool中寻找到可复用的RealConnection。ps:这里与[2]不同的是,会遍历到其他的请求地址,判断的依据可以是相同的host等。
- [4] 如果前3步都找不到时,就会新建一个连接进行TCP+TLS了。
值得一提的是,每次查找连接池中可复用的连接都会调用connectionPool.transmitterAcquirePooledConnection()
方法(只是传的参数不同),在此方法内部是会将查找到的RealConnection
对象备份一个引用到transmitter
内。而如果是新建连接的话也会通过transmitter.acquireConnectionNoEvents(result)
将新的RealConnection
对象备份。这样做的目的是为了重试时更快的找到可复用的连接,也对应会上述的[1]中查找情况。
执行三次握手
根据上述findConnection
方法中的result.connect
,跟踪代码会看到,最终走到RealConnection.connectSocket()
。
// RealConnection.java
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
eventListener.connectStart(call, route.socketAddress(), proxy);
rawSocket.setSoTimeout(readTimeout);
try {
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
// More details:
// https://github.com/square/okhttp/issues/3245
// https://android-review.googlesource.com/#/c/271775/
try {
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
到了这一步,RealConnection
内部就会新建一个Socket
对象,用于后续的tcp连接通信。
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
就是真正执行三次握手的地方,这里的Platform.get()
是Okhttp根据平台(ps:可能不是Android平台或者Android版本)的适配逻辑。
// Platform.java
public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout)
throws IOException {
socket.connect(address, connectTimeout);
}
显然,Okhttp内部的tcp三次握手,依赖的是传统的socket连接。
ps:上述代码中会利用socket
对象创建一个source
和一个sink
对象,这个是Okio的东西,如果请求是Http1协议的话就会用到。后续request和reponse就是利用它们来实现。
最后还需要补充的是,上述代码中在找到可用的RealConnection
对象后,会利用它新建一个ExchangeCodec
对象:resultConnection.newCodec(client, chain);
。其实ExchangeCodec
是一个接口,会根据所用协议为Http1或者Http2实例出对应的策略类。
// RealConnection.java
ExchangeCodec newCodec(OkHttpClient client, Interceptor.Chain chain) throws SocketException {
if (http2Connection != null) {
return new Http2ExchangeCodec(client, this, chain, http2Connection);
} else {
socket.setSoTimeout(chain.readTimeoutMillis());
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
return new Http1ExchangeCodec(client, this, source, sink);
}
}
这里如果是Http1的话,上述利用socket
对象创建source
和sink
对象就会被引用到Http1ExchangeCodec
当中。
Okhttp的连接释放
说完了三次握手的管理,再来看看四次挥手,即连接释放。在源码中会看到多处地方调用一个closeQuietly
方法,该方法是用于释放连接的。
// Utils.java
public static void closeQuietly(Socket socket) {
if (socket != null) {
try {
socket.close();
} catch (AssertionError e) {
if (!isAndroidGetsocknameError(e)) throw e;
} catch (RuntimeException rethrown) {
throw rethrown;
} catch (Exception ignored) {
}
}
}
socket.close();
就是针对这一次tcp连接的关闭(释放)。这里其实就是真正的连接释放了。而至于常规触发这一逻辑的地方就是前面提到的RealConnectionPool
的定时回收机制。
最后
最后在总结一下文中设计的类的职能吧!
- ConnectInterceptor:Okhttp用于与服务器建立连接的拦截器,连接发起的地方。
- Transmitter:Okhttp应用层与网络层的中间层。这里利用其来获取到
Exchange
对象。 - ExchangeFinder:用于查找可用连接。查找可用
RealConnection
对象,实例出ExchangeCodec
对象,以及返回Exchange
对象都会在这里完成。 - Exchange:封装http的request和response的事件触发,中间还会涉及事件监听的回调。内部拥有该次连接的
ExchangeCodec
对象。 - ExchangeCodec:真正用于http的request和response的事件。内部使用的是Okio。
- RealConnection:一次连接的抽象,内部用于本次tcp连接的socket对象。可在
Transmitter
中、RealConnectionPool
连接池或者新建获取,此步骤发生在ExchangeFinder
中。 - RealConnectionPool:用于缓存可复用的
RealConnection
对象。
本文转载于
作者:Cy13er
网友评论