先上前几篇的地址
第一篇
第二篇
第三篇
接着上一篇直接走吧,在CacheInterceptor执行之后
interceptors.add(new CacheInterceptor(client.internalCache()));
//处理socket建立连接或者复用连接过程,总之这里会建立连接
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
//在已经建立的链接上进行参数发送和获取响应封装等操作
interceptors.add(new CallServerInterceptor(forWebSocket));
下一个执行的拦截器为ConnectInterceptor
ConnectInterceptor
连接拦截器,从意思来看就是去进行网络连接的拦截器,看一下细节实现
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
//每一个Call都有一个url,一个url就会对应一个新的StreamAllocation,即流分配者
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
//检查当前请求是否GET请求
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//通过流分配者来处理流和Connection的创建,核心方法
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
//注意在newStream中Connection已经创建,此处只是获取而已
RealConnection connection = streamAllocation.connection();
//此处connection创建已经完成
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
连接拦截器的工作很简洁,纯粹就是为了创建连接,当连接响应之后并没有任何处理。
内部的核心工作委托了StreamAllocation进行,那么接下来看OkHttp是如何创建和复用连接的
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
//对于一般APP来说都是POST操作,则doExtensiveHealthChecks为true
//获取之前在client中设定的连接、读取和输出超时时长
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
//获取之前在client中设置的是否允许重新连接
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//尝试从连接池中获取可以复用的连接,对于Http1.1来说,一个已经完成的保持长连接的连接被同一个请求连接复用的可能性会大一点
//假设从连接池中获取到可以复用的连接,但是要检查当前连接对应的socket的可用性
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
//根据HTTP2/HTTP1获取对应的请求和响应处理类(因为协议的不同,所以有所区别)
//一个请求会有一个执行类,执行完成之后会废弃(置空)
HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {//标记当前流的请求和响应实现类
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
基本的思路就是查找一个可用的连接,这个可用的连接后面会看到可能是复用的、或者新建的,这里对于Http1.1来说最终返回的是Http1Codec,内部主要是封装了报文拼接和解析等操作,所以说这个根据协议的不同会有所差别,接着看StreamAllocation是如何获取可用连接的
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {//此处通过循环直到成功获取连接为止
//查找连接
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
//如果当前连接是新建的,直接使用当前连接即可
//在连接池中是的连接都是建立完成确实可用的,不会出现successCount为0的情况
//除非是新建的连接
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
// 校验当前连接的可用性,主要是因为有的连接是从连接池中复用的,那么需要保证这个复用的连接还可用
// 比方说当前socket连接是否正常、是否还可以传输数据
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
//当前Connection已经不再健康,从连接池中移除,
//并且要关闭Socket连接
noNewStreams();
//继续查找其它可用的连接
continue;
}
return candidate;
}
}
这里可以看到获取连接的一个执行思路,首先获取一个连接,然后需要校验当前连接是否可用,如果可用则返回,否则从连接池中移除,然后关闭socket,然后继续循环去获取连接。
public boolean isHealthy(boolean doExtensiveChecks) {
//检查socket连接和流是否已经关闭
if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
return false;
}
//如果是Http2.0,看一下Connection是否关闭就好
if (http2Connection != null) {
return !http2Connection.isShutdown();
}
//POST这里为true,GET不需要校验这个,因为不需要传参
if (doExtensiveChecks) {
try {
//具体检查方式可以看注释
//通过设置SO_TIMEOUT来修改socket读取数据的最大超时时间
//如果抛出socket超时异常,说明socket连接正常
int readTimeout = socket.getSoTimeout();
try {
socket.setSoTimeout(1);
if (source.exhausted()) {
return false; // Stream is exhausted; socket is closed.
}
return true;
} finally {
socket.setSoTimeout(readTimeout);
}
} catch (SocketTimeoutException ignored) {
// Read timed out; socket is good.
} catch (IOException e) {
return false; // Couldn't read; socket is closed.
}
}
//这里一般就是socket的读取超时异常,说明当前连接正常
return true;
}
这里看一下连接是否可用的校验(基于http1.1):
1.当前socket是否已经关闭、输入/输出流是否已经关闭
2.socket没有关闭,在POST模式下,还需要校验是否可以正常读取数据,这里通过1ms的读取超时来读取数据,如果读取到数据或者读取超时,那么说明当前连接可以读取数据,是正常可用的
接下来看一下连接的新建和复用处理
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {//此处要获取连接池的锁,因为尝试从连接池中获取可以复用的连接,应该可以理解为消费/生产者模型
if (released) throw new IllegalStateException("released");
if (canceled) throw new IOException("Canceled");
if (codec != null) throw new IllegalStateException("codec != null");
// Attempt to use an already-allocated connection.
// 首先尝试使用已经分配的连接
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// Attempt to get a connection from the pool.
//尝试从连接池中获取一个连接,进而复用连接
//这里其实就是调用connectionPool.get方法
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {//从连接池中复用成功
return connection;
}
//一个新的连接路由初始化都为空
//当一个连接成功之后,哪怕后续出现异常,这个路由都是正确的
//那么重试的时候会直接使用这个路由
selectedRoute = route;
}
// If we need a route, make one. This is a blocking operation.
// 内部实际上有通过InetAddress通过Host来查找IP,这是为了Socket连接提供IP和端口
// 内部会通过DNS做域名映射,从而获得域名所对应的IP地址
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
}
RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
// Now that we have an IP address, make another attempt at getting a connection from the pool.
// This could match due to connection coalescing.
// 前面已经通过域名去尝试复用连接,走到这里说明没有命中
// 此处在Host不匹配的情况下,可以再次通过IP地址来进行查找可复用的连接
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) return connection;
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
//无法重用流
//新建一个流对象
result = new RealConnection(connectionPool, selectedRoute);
//关联当前分配者分配的Connection
//主要是标记当前流正在使用中
acquire(result);
}
// 这里进行TCP和TLS的握手
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
// 当前节点连接成功
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// Pool the connection.
// 将当前已经建立的连接放入连接池中
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
// 如果当前新建的连接协议为http2.0,因为http2.0一个TCP连接中可以接受多个流同时进行
// 所以可以清理掉连接池中一些多余的连接,这里只清理一个
// 并且将之前用的socket和流都转到当前新建的Connection中
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
return result;
}
思路比较简单,首先通过当前请求地址从连接池中尝试获取可以复用的连接,如果没有,根据路由选择者选择一个路由节点,通过当前节点返回的IP地址再次尝试从连接池中复用连接,如果还是没有,则通过当前节点使用socket建立连接,连接成功之后再将当前连接放入连接池中,并且标记当前连接正在使用中。
接下来看一下连接池ConnectionPool复用的标准:
RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
//检测连接是否可以复用
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection);//关联流和连接
return connection;
}
}
public boolean isEligible(Address address, Route route) {
// If this connection is not accepting new streams, we're done.
// allocations.size()实际上表示当前连接被多少流使用中
// 在Http1.1中一个连接同时只能有一个流使用,所以只有等到一个流完成使用完毕之后
// 后面的流才能复用当前连接
// noNewStreams顾名思义不允许复用流
if (allocations.size() >= allocationLimit || noNewStreams) return false;
// If the non-host fields of the address don't overlap, we're done.
// 检查一些参数是否一致,具体看Address中equalsNonHost
if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
// If the host exactly matches, we're done: this connection can carry the address.
//如果请求host一致,则认为当前连接可以携带此流
if (address.url().host().equals(this.route().address().url().host())) {
return true; // This connection is a perfect match.
}
// At this point we don't have a hostname match. But we still be able to carry the request if
// our connection coalescing requirements are met. See also:
// https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
// https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
// 1. This connection must be HTTP/2.
//在host无法匹配的情况下,http2.0下有可能可以重用连接,http1.x到这里就不符合重用的条件了
if (http2Connection == null) return false;
//...这里都是HTTP2.0的处理,忽略
return true; // The caller's address can be carried by this connection.
}
boolean equalsNonHost(Address that) {
//仅仅没有校验域名
return this.dns.equals(that.dns)
&& this.proxyAuthenticator.equals(that.proxyAuthenticator)
&& this.protocols.equals(that.protocols)
&& this.connectionSpecs.equals(that.connectionSpecs)
&& this.proxySelector.equals(that.proxySelector)
&& equal(this.proxy, that.proxy)
&& equal(this.sslSocketFactory, that.sslSocketFactory)
&& equal(this.hostnameVerifier, that.hostnameVerifier)
&& equal(this.certificatePinner, that.certificatePinner)
&& this.url().port() == that.url().port();
}
这里只关心Http1.x的复用条件:
1.连接在当前时刻没有流使用中,并且当前允许在当前连接下新建流
2.当前请求地址要完全符合当前连接的请求地址(包括域名、端口等数据)
当前连接正在使用中,这个在OkHttp是通过一个引用队列标示的
public void acquire(RealConnection connection) {
assert (Thread.holdsLock(connectionPool));
if (this.connection != null) throw new IllegalStateException();
this.connection = connection;
//向引用队列中添加一个引用,表示当前连接正在被当前流分配者使用
connection.allocations.add(new StreamAllocationReference(this, callStackTrace));
}
private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
assert (Thread.holdsLock(connectionPool));
//当前连接完全结束,也就是TCP握手、TLS握手(有的话)、写入请求报文和读取请求报文这些操作全部完成之后
if (streamFinished) {
this.codec = null;
}
//当前流分配者是否可用,如果released则表示后续流分配者不可继续使用
//一般来说连接完成之后,后续都是不可用的
//如果连接重连,那么同一个请求地址的应该还是可用的
if (released) {
this.released = true;
}
Socket socket = null;
//当前流分配者已经分配了一个连接
if (connection != null) {
if (noNewStreams) {//当前不允许重用连接
connection.noNewStreams = true;
}
//在一次正常的请求完成之后,要进行连接的释放
if (this.codec == null && (this.released || connection.noNewStreams)) {
release(connection);//这里就是解除引用队列,从而标记当前connection后续可以复用
if (connection.allocations.isEmpty()) {//这里是尝试唤醒连接池中的清理任务
connection.idleAtNanos = System.nanoTime();
if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
socket = connection.socket();//如果当前连接已经过期,那么可以关闭socket连接了
}
}
connection = null;
}
}
return socket;
}
private void release(RealConnection connection) {
for (int i = 0, size = connection.allocations.size(); i < size; i++) {
Reference<StreamAllocation> reference = connection.allocations.get(i);
if (reference.get() == this) {
connection.allocations.remove(i);
return;
}
}
throw new IllegalStateException();
}
通过上述方法可以进行流的分配和释放操作,重点是要配合连接池工作。
总结
上述还有提到路由选择者、socket连接等操作,都会在下一篇中提到。
这一篇主要是讲述了连接拦截器的整个运作流程和基本思路,连接池的意义也在这里相对清晰了一些。
网友评论