OkHttp
StreamAllocation
此类协调了三个类之间的关系:
Connection:连接到远程服务器的物理Socket连接,Connection建立起来可能会比较慢,并且能够取消已经建立的Connection
Stream:建立在Connection上的逻辑上的HTTP的请求对的数据流,每个连接都有自己的allocation limit,allocation limit 定义了Connection同时可以并发请求的数据流的个数,Http 1.1 连接一次只能分配一个Stream,Http 2 通常可以分配多个流(多路复用)
Calls:Stream的逻辑顺序,通常是初始请求以及重定向请求。并且希望单个请求的所有流都被保存在同一个Connection上,以实现更好的行为
在Connection和StreamAllocation之间的关系是,多个Stream可以共用一个Socket连接,每个tcp连接都是通过一个socket来完成的,socket对应一个host和port,如果有多个Stream(也就是多个Request)都是连接在一个host和port上,那么它们就可以共用同一个socket,这样做的好处就是可以减少tcp的三次握手时间。在OkHttp里面,记录一次连接的是RealConnection,这个负责连接,在这个类里面用socket来连接,用HandShake来处理握手。
在讲解这个类之前,先熟悉3个概念:请求,连接,流。我们要明白Http通信执行网络“请求”需要在“连接”上建立一个新的“流”,我们将StreamAllocation称之流的桥梁,它负责为一次“请求”寻找“连接”并建立“流”,从而完成远程通信。所以说StreamAllocation与“请求”,“连接”,“流”都有关。
从注释我们看到。Connection是建立在Socket之上的物理通信信道,而Stream则是代表逻辑的流,至于Call是对一次请求过程的封装。之前也说过一个Call可能会涉及多个流(比如重定向或者auth认证等情况)。所以我们想一下,如果StreamAllocation要想解决上述问题,需要两个步骤,一是寻找连接,二是获取流。所以StreamAllocation里面应该包含一个Stream(上文已经说到了,OKHttp里面的流是HttpCodec);还应该包含连接Connection。如果想找到合适的连接,还需要一个连接池ConnectionPool属性。所以应该有一个获取流的方法在StreamAllocation里面是newStream();找到合适的流的方法findConnection();还应该有完成请求任务的之后finish()的方法来关闭流对象,还有终止和取消等方法,以及释放资源的方法。
public final Address address;//地址
private RouteSelector.Selection routeSelection;
private Route route;
private final ConnectionPool connectionPool;//连接池
public final Call call;//请求
public final EventListener eventListener;
private final Object callStackTrace;
// State guarded by connectionPool.
private final RouteSelector routeSelector;
private int refusedStreamCount;
private RealConnection connection;//连接
private boolean reportedAcquired;
private boolean released;
private boolean canceled;
private HttpCodec codec;
根据请求,基于连接Connection构建出一个流
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//寻找到一个可用的连接,下面会在这个连接上构造流
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
//构造出HttpCodec,就是跟服务端的输入输出流
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
- 寻找到一个可用的连接Connection
- 下面流的构造会在基于连接Connection上
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
//调用Connection,获取一个符合要求的RealConnection
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
//确认连接Connection必须是健康的,如果是健康的则重用,否者重新查找RealConnection
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
//尝试去用已经分配的连接
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// We had an already-allocated connection and it's good.
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}
if (result == null) {
// Attempt to get a connection from the pool.
//尝试从连接池里面获取可用的连接
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
//如果已经从已经分配的或者从ConnectionPool里面找到了Connection,则返回
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}
//如果上面没找到,那么切换路由,接着寻找连接
// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
//构造出一个新的连接Connection
result = new RealConnection(connectionPool, selectedRoute);
//把当前StreamAllocation添加到连接Connection中的allocations列表中
acquire(result, false);
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// Do TCP + TLS handshakes. This is a blocking operation.
//做TCP和TLS的连接,握手,阻塞操作
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// Pool the connection.
//把这个连接放入到ConnectionPool中
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
HttpCodec
HttpCodec是一个接口,用于与服务端做输入输出流的包装,内部是用Okio来做的,我们这里只看Http1Codec,表示只用于Http1协议。Http1Codec是一个Socket连接,用于发送 Http 1的消息,这个类严格遵守以下的生命周期:
- 写请求头
- 写请求体
- 读取响应头
- 读取响应体
//连接处于空闲状态,准备好写请求头了
private static final int STATE_IDLE = 0; // Idle connections are ready to write request headers.
private static final int STATE_OPEN_REQUEST_BODY = 1;//准备写请求体
private static final int STATE_WRITING_REQUEST_BODY = 2;//写入请求体
private static final int STATE_READ_RESPONSE_HEADERS = 3;//读取响应头
private static final int STATE_OPEN_RESPONSE_BODY = 4;//准备读取响应体
private static final int STATE_READING_RESPONSE_BODY = 5;//读取响应体
private static final int STATE_CLOSED = 6;//关闭
/** The client that configures this stream. May be null for HTTPS proxy tunnels. */
final OkHttpClient client;
/** The stream allocation that owns this stream. May be null for HTTPS proxy tunnels. */
final StreamAllocation streamAllocation;//请求对应的流
final BufferedSource source;//从服务器读取的响应流
final BufferedSink sink;//向服务器写入的请求流
int state = STATE_IDLE;//处于的生命周期
private long headerLimit = HEADER_LIMIT;
- 看的出来一个HttpCodec有7种状态,对应着4个动作
下面只分析一个写入请求头的方法,其他的跟这个类似,只是还是会将请求体和响应体细分为固定长度和非固定长度,分别用不同流来表示。
/**
* Prepares the HTTP headers and sends them to the server.
*
* <p>For streaming requests with a body, headers must be prepared <strong>before</strong> the
* output stream has been written to. Otherwise the body would need to be buffered!
*
* <p>For non-streaming requests with a body, headers must be prepared <strong>after</strong> the
* output stream has been written to and closed. This ensures that the {@code Content-Length}
* header field receives the proper value.
*/
//写入请求头
@Override public void writeRequestHeaders(Request request) throws IOException {
String requestLine = RequestLine.get(
request, streamAllocation.connection().route().proxy().type());
writeRequest(request.headers(), requestLine);
}
/** Returns bytes of a request header for sending on an HTTP transport. */
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
//写入请求行和请求头
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
//状态改变为准备写入请求体
state = STATE_OPEN_REQUEST_BODY;
}
/**
* Returns the request status line, like "GET / HTTP/1.1". This is exposed to the application by
* {@link HttpURLConnection#getHeaderFields}, so it needs to be set even if the transport is
* HTTP/2.
*/
//返回请求行,例如“GET / HTTP/1.1”
public static String get(Request request, Proxy.Type proxyType) {
StringBuilder result = new StringBuilder();
result.append(request.method());
result.append(' ');
if (includeAuthorityInRequestLine(request, proxyType)) {
result.append(request.url());
} else {
result.append(requestPath(request.url()));
}
result.append(" HTTP/1.1");
return result.toString();
}
看到上面的向服务器写入的数据,也可以看到OkHttp的框架属于哪一层?OkHttp是与UrlConnection同一层的网络框架,直接对Socket进行封装。其实OkHttp就是实现了Http协议,并且对接口做了比较好的封装的框架。
网友评论