简介
上一篇文章(OkHttp源码解析(一):基本流程)介绍了 OkHttp 的基本流程,包括 Request 的创建、Dispatcher 对 Request 的调度以及 Interceptor 的使用。OkHttp 中默认会添加 RetryAndFollowUpInterceptor、BridgeInterceptor、CacheInterceptor、ConnectInterceptor 以及 CallServerInterceptor 这几个拦截器。本文主要看一下 RetryAndFollupInterceptor 并引出建立连接相关的分析。
RetryAndFollowUpInterceptor
Interceptor 最主要的代码都在 intercept
中,下面是 RetryAndFollowUpInterceptor#intercept
中的部分代码:
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace); // 1
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release(); // 2
throw new IOException("Canceled");
}
...
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null); // 3
...
}
上面注释 1 处创建了一个 StreamAllocation
对象,注释 2 处 调用了其 release
方法,注释 3 处则把这个对象传给了下一个 Interceptor。StreamAlloction
这个类很重要,下面就看一下它的用途。
StreamAlloction
StreamAllocation
从名字上看是流分配器,其实它是统筹管理了几样东西,注释写的非常清楚:
/**
* This class coordinates the relationship between three entities:
*
* <ul>
* <li><strong>Connections:</strong> physical socket connections to remote servers. These are
* potentially slow to establish so it is necessary to be able to cancel a connection
* currently being connected.
* <li><strong>Streams:</strong> logical HTTP request/response pairs that are layered on
* connections. Each connection has its own allocation limit, which defines how many
* concurrent streams that connection can carry. HTTP/1.x connections can carry 1 stream
* at a time, HTTP/2 typically carry multiple.
* <li><strong>Calls:</strong> a logical sequence of streams, typically an initial request and
* its follow up requests. We prefer to keep all streams of a single call on the same
* connection for better behavior and locality.
* </ul>
简单来说, StreamAllocation
协调了 3 样东西:
-
Connections
: 物理的 socket 连接 -
Streams
:逻辑上的 HTTP request/response 对。每个Connection
有个变量allocationLimit
,用于定义可以承载的并发的streams
的数量。HTTP/1.x 的Connection
一次只能有一个stream
, HTTP/2 一般可以有多个。 -
Calls
:Streams
的序列。一个初始的request
可能还会有后续的request
(如重定向)。OkHttp 倾向于让一个call
所有的streams
运行在同一个connection
上。
StreamAllocation
提供了一些 API 来释放以上的资源对象。 在 RetryAndFollowUpInterceptor
中创建的 StreamAllocation
对象下一个用到的地方是 ConnectInterceptor,其 intercept
代码如下:
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
在上面的代码中, streamAllocation
创建了 httpCodec
以及 connection
对象。 httpCodec
即是上面所说的 Streams
,而 connection
则是上面的 Connection
,Connection
是一个接口,它的唯一实现类是 RealConnection
。
newStream
StreamAllocation
中的 newStream
方法用于寻找新的 RealConnection
以及 HttpCodec
,代码如下:
public HttpCodec newStream(OkHttpClient client, boolean doExtensiveHealthChecks) {
int connectTimeout = client.connectTimeoutMillis();
int readTimeout = client.readTimeoutMillis();
int writeTimeout = client.writeTimeoutMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
在 newStream
中,通过 findHealthyConnection
找到可用的 Connection
,并用这个 Connection
生成一个 HttpCodec
对象。 findHealthyConnection
是找到一个健康的连接,代码如下:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
// successCount == 0 表示还未使用过,则可以使用
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
public boolean isHealthy(boolean doExtensiveChecks) {
if (socket.isClosed() || socket.isInputShutdown() || socket.isOutputShutdown()) {
return false;
}
... // 省略 Http2 代码
return true;
}
在一个无限循环中,通过 findConnection
寻找一个 connection
,并判断是否可用,首先如果没有使用过的肯定是健康的可直接返回,否则调用 isHealthy
,主要就是判断 socket
是否关闭。这里的 socket
是在 findConnection
中赋值的,再看看 findConnection
的代码:
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
boolean connectionRetryEnabled) throws IOException {
Route selectedRoute;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection.
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// Attempt to get a connection from the pool.
// 1. 从 ConnectionPool 取得 connection
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
selectedRoute = route;
}
// If we need a route, make one. This is a blocking operation.
if (selectedRoute == null) {
selectedRoute = routeSelector.next();
}
RealConnection result;
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
// Now that we have an IP address, make another attempt at getting a connection from the pool.
// 2. 有了 ip 地址后再从 connectionpool中取一次
// This could match due to connection coalescing.
Internal.instance.get(connectionPool, address, this, selectedRoute);
if (connection != null) return connection;
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
// 3. ConnectionPool 中没有,新创建一个
result = new RealConnection(connectionPool, selectedRoute);
// 3. 将 StreamAllocation 加入到 `RealConnection` 中的一个队列中
acquire(result);
}
// Do TCP + TLS handshakes. This is a blocking operation.
// 4. 建立连接,在其中创建 socket
result.connect(connectTimeout, readTimeout, writeTimeout, connectionRetryEnabled);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
// Pool the connection.
// 5. 将新创建的 connection 放到 ConnectionPool 中
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
return result;
}
上面 Connection
的创建大体是以下几个步骤:
- 调用
Intenal.get
方法从ConnectionPool
中获取一个Connection
,主要根据 url 的 host 判断,相关代码在ConnectionPool
中。 - 如果没有并且又获取了 IP 地址,则再获取一次。
- 如果
ConnectionPool
中没有, 则新创建一个RealConnection
,并调用acquire
将StreamAllocation
中加入RealConnection
中的一个队列中。 - 调用
RealConnection#connect
方法建立连接,在内部会创建Socket
。 - 将新创建的
Connection
加入到ConnectionPool
中。
获取到了 Connection
之后,再创建一个 HttpCodec
对象。
public HttpCodec newCodec(
OkHttpClient client, StreamAllocation streamAllocation) throws SocketException {
if (http2Connection != null) {
return new Http2Codec(client, streamAllocation, http2Connection);
} else {
socket.setSoTimeout(client.readTimeoutMillis());
source.timeout().timeout(client.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(client.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
根据是 Http1 还是 Http2 创建对应的 HttpCodec
, 其中的 socket
是在 RealConnection
中的 connect
方法创建的。下面具体看看RealConnection
。
RealConnection
RealConnection
封装的是底层的 Socket
连接,内部必然有一个 Socket
对象,下面是 RealConnection
内部的变量:
public final class RealConnection extends Http2Connection.Listener implements Connection {
private static final String NPE_THROW_WITH_NULL = "throw with null exception";
private final ConnectionPool connectionPool;
private final Route route;
// The fields below are initialized by connect() and never reassigned.
/** The low-level TCP socket. */
private Socket rawSocket;
/**
* The application layer socket. Either an {@link SSLSocket} layered over {@link #rawSocket}, or
* {@link #rawSocket} itself if this connection does not use SSL.
*/
private Socket socket;
private Handshake handshake;
private Protocol protocol;
private Http2Connection http2Connection;
private BufferedSource source;
private BufferedSink sink;
// The fields below track connection state and are guarded by connectionPool.
/** If true, no new streams can be created on this connection. Once true this is always true. */
public boolean noNewStreams;
public int successCount;
/**
* The maximum number of concurrent streams that can be carried by this connection. If {@code
* allocations.size() < allocationLimit} then new streams can be created on this connection.
*/
public int allocationLimit = 1;
/** Current streams carried by this connection. */
public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
/** Nanotime timestamp when {@code allocations.size()} reached zero. */
public long idleAtNanos = Long.MAX_VALUE;
...
}
-
Route
表示的是与服务端建立的路径,其实内部封装了Address
,Address
则是封装了请求的 URL。 -
rawSocket
对象代表底层的连接,还有一个socket
是用于 Https, 对于普通的 Http 请求来说,这两个对象是一样的。source
和sink
则是利用 Okio 封装socket
得到的输入输出流。(如果想了解 Okio 的原理,可以参考我之前的文章:Okio 源码解析(一):数据读取流程) -
noNewStream
对象用于标识这个Connection
不能再用于 Http 请求了,一旦设置为 true, 则不会再变。 -
allocationLimit
指的是这个Connection
最多能同时承载几个 Http 流,对于 Http/1 来说只能是一个。 -
allocations
是一个List
对象,里面保存着正在使用这个Connection
的StreamAllocation
的弱引用,当StreamAllocation
调用acquire
时,便会将其弱引用加入这个List
,调用release
则是移除引用。allocations
为空说明此Connection
为闲置,ConnectionPool
利用这些信息来决定是否关闭这个连接。
connect
RealConnection
用于建立连接,里面有相应的 connect
方法:
public void connect(
int connectTimeout, int readTimeout, int writeTimeout, boolean connectionRetryEnabled) {
...
while (true) {
try {
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout);
} else {
// 创建socket,建立连接
connectSocket(connectTimeout, readTimeout);
}
// 建立
establishProtocol(connectionSpecSelector);
break;
}
...
}
private void connectSocket(int connectTimeout, int readTimeout) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
// 创建 socket
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
rawSocket.setSoTimeout(readTimeout);
try {
// 建立连接,相当于调用 socket 的 connect 方法
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
try {
// 获取输入输出流
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
如果不是 Https, 则调用 connectSocket
,在内部创建 rawSocket
对象,设置超时时间。紧接着 Platform.get().connectSocket
根据不同的平台调用相应的 connect
方法,这样 rawSocket
就连接到服务端了。然后是用 Okio
封装 rawSocket
的输入输出流,这里的输入输出流最终是交给 HttpCodec
进行 Http 报文的写入都读取。通过以上步骤,就实现了 Http 请求的连接。
总结
本文从 RetryAndFollowupIntercept
中创建 StreamAllocation
对象,到 Connection
中创建 RealConnection
和 HttpCodec
,分析了 OkHttp 建立连接的基本过程。可以看出, OkHttp 中的连接由
RealConnection
封装,Http 流的输入输出由 HttpCodec
操作,而 StreamAllocation
则统筹管理这些资源。在连接的寻找与创建过程,有个关键的东西是 ConnectionPool
, 即连接池。它负责管理所有的 Connection
,OkHttp 利用这个连接池进行 Connection
的重用以提高网络请求的效率。本文并没有详细分析 ConnectionPool
,相关内容可以参见下一篇: OkHttp源码解析(三):连接池。
网友评论