前文 我们主要对OkHttp涉及到基础类进行了介绍,这里我们来聊聊OkHttp最核心的,拦截器机制,也就是关于OkHttp(一)中提到的核心方法。Dispatcher分发网络请求进入队列,然后进入线程池,AsyncCall最后执行到 execute()方法,最后就会执行getResponseWithInterceptorChain() 方法。
//核心代码 开始真正的执行网络请求
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
//责任链
List<Interceptor> interceptors = new ArrayList<>();
//在配置okhttpClient 时设置的intercept 由用户自己设置
interceptors.addAll(client.interceptors());
//负责处理失败后的重试与重定向
interceptors.add(retryAndFollowUpInterceptor);
//负责把用户构造的请求转换为发送到服务器的请求 、把服务器返回的响应转换为用户友好的响应 处理 配置请求头等信息
//从应用程序代码到网络代码的桥梁。首先,它根据用户请求构建网络请求。然后它继续呼叫网络。最后,它根据网络响应构建用户响应。
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//处理 缓存配置 根据条件(存在响应缓存并被设置为不变的或者响应在有效期内)返回缓存响应
//设置请求头(If-None-Match、If-Modified-Since等) 服务器可能返回304(未修改)
//可配置用户自己设置的缓存拦截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//连接服务器 负责和服务器建立连接 这里才是真正的请求网络
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
//配置okhttpClient 时设置的networkInterceptors
//返回观察单个网络请求和响应的不可变拦截器列表。
interceptors.addAll(client.networkInterceptors());
}
//执行流操作(写出请求体、获得响应数据) 负责向服务器发送请求数据、从服务器读取响应数据
//进行http请求报文的封装与请求报文的解析
interceptors.add(new CallServerInterceptor(forWebSocket));
//创建责任链
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
//执行责任链
return chain.proceed(originalRequest);
}
在List<Interceptor> interceptors中连续add多个Interceptor,最后把这个 interceptor list作为参数传入到RealInterceptorChain的一个对象,另外还要特别注意一下第4个参数0,然后让chain执行proceed()方法,我们看一下这个proceed()方法
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.httpCodec != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
其中RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec, connection, index + 1, request, call, eventListener, connectTimeout, readTimeout, writeTimeout);
这个next的Chain的对象是在构造函数中把上个Chain传进来的interceptors再传到下个Chain里,只是第4个参数由刚才的index(第一次是0)变成了index+1(当前是1),当前的interceptor依然是 interceptors.get(0),next Chain的interceptor就是 interceptors.get(1),当前interceptor执行intercept(next)方法,进入下个Chain,如此反复,这样就形成了一个递归调用,直到递归结束,返回Response。
那么什么时候会结束呢? 理论上当然是执行到最后一个interceptor了,但是每个interceptor都会有相关的业务处理,如果在某个interceptor里触发了某个具体的条件,让interceptor不再执行proceed()方法,这个递归也会结束,然后返回Response。这个流程的代码设计就是责任链模式,如果上一级不处理该业务,就直接转到下一级,直到有处理为止。
我们再回顾一下上文中add interceptor的代码,发现最先add的interceptor是client.interceptors(),这个client.interceptors()是我们开发者调用OkHttp时自定义的interceptor(比如比较常用的自定义LogInterceptor),而最后add的是CallServerInterceptor这个interceptor,所以CallServerInterceptor里肯定是没有执行proceed()方法的,大家也可以看源码。
下面我们来具体分析一下各个Interceptor:
- RetryAndFollowUpInterceptor,负责重试和重定向;
- BridgeInterceptor,首先将应用层的数据类型转换为网络调用层的数据类型,然后将网络层返回的数据类型转换为应用层的数据类型;
- CacheInterceptor,负责读取缓存,更新缓存;
- ConnectInterceptor,负责和服务器建立起链接;
- NetworkInterceptors,client设置的networkInterceptor;
- CallServerInterceptor,负责向服务器发送请求数据、从服务器读取响应数据;
分析Interceptor的源码主要就是分析里面intercept(Chain chain)这个方法。
CallServerInterceptor
我们先来看看CallServerInterceptor,还是比较简单的:
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
//添加头部信息
realChain.eventListener().requestHeadersStart(realChain.call());
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}
//写入请求体
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
//读取响应头
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
//读取响应体
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
return response;
}
写入请求头:
httpCodec.writeRequestHeaders(request);
最后会进入Http1Codec的
/** Returns bytes of a request header for sending on an HTTP transport. */
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}
这个方法里先写入请求行,然后循环写入请求头。Sink是Okio里面的内容,可以理解为outputstream。
写入请求体:
...
if (responseBuilder == null) {
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}
...
这个比较简单,就是把body写入流。
读响应头:
@Override public Response.Builder readResponseHeaders(boolean expectContinue) throws IOException {
if (state != STATE_OPEN_REQUEST_BODY && state != STATE_READ_RESPONSE_HEADERS) {
throw new IllegalStateException("state: " + state);
}
try {
StatusLine statusLine = StatusLine.parse(readHeaderLine());
Response.Builder responseBuilder = new Response.Builder()
.protocol(statusLine.protocol)
.code(statusLine.code)
.message(statusLine.message)
.headers(readHeaders());
if (expectContinue && statusLine.code == HTTP_CONTINUE) {
return null;
} else if (statusLine.code == HTTP_CONTINUE) {
state = STATE_READ_RESPONSE_HEADERS;
return responseBuilder;
}
state = STATE_OPEN_RESPONSE_BODY;
return responseBuilder;
} catch (EOFException e) {
// Provide more context if the server ends the stream before sending a response.
IOException exception = new IOException("unexpected end of stream on " + streamAllocation);
exception.initCause(e);
throw exception;
}
}
解析相应头:
/** Reads headers or trailers. */
public Headers readHeaders() throws IOException {
Headers.Builder headers = new Headers.Builder();
// parse the result headers until the first blank line
for (String line; (line = readHeaderLine()).length() != 0; ) {
Internal.instance.addLenient(headers, line);
}
return headers.build();
}
先创建一个Header.Builder,然后一行行地读出来,再放入一个list里,最后build一下转成Headers。
读取响应体:
解析完头信息后,根据状态信息(code)来判断是否响应成功,然后打开响应body
@Override
public ResponseBody openResponseBody(Response response) throws IOException {
streamAllocation.eventListener.responseBodyStart(streamAllocation.call);
String contentType = response.header("Content-Type");
if (!HttpHeaders.hasBody(response)) {
Source source = newFixedLengthSource(0);
return new RealResponseBody(contentType, 0, Okio.buffer(source));
}
if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
Source source = newChunkedSource(response.request().url());
return new RealResponseBody(contentType, -1L, Okio.buffer(source));
}
long contentLength = HttpHeaders.contentLength(response);
if (contentLength != -1) {
Source source = newFixedLengthSource(contentLength);
return new RealResponseBody(contentType, contentLength, Okio.buffer(source));
}
return new RealResponseBody(contentType, -1L, Okio.buffer(newUnknownLengthSource()));
}
注意:这里并没有真正把数据解析出来,只是关联起了okio的Source而已。
ConnectInterceptor
@Override
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
这个类的作用是负责和服务器建立连接,代码比较简单,但是很多比较重要的逻辑都封装在StreamAllocation,ConnectionPool和RealConnection等类里了。
- StreamAllocation对象的初始化是在RetryAndFollowUpInterceptor里完成的;
- 真正建立连接是在streamAllocation.newStream(…)方法里:
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
- 再进入findHealthyConnection(...)方法:
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
这个方法就是想得到一个Connection对象,里面有个while(true){}循环,不断地从findConnection方法中获取一个候选的Connection对象,然后进行判断这个Connection是否是全新的连接,如果是就直接返回,否则要做isHealthy(...)检查,这个Healthy检查主要是查看当前Connection的(1)Socket是否close,(2)Socket的Input或者Output是否shutdown,(3)BufferedSource是否exhausted,如果Connection isHeathy则返回,否则继续执行循环。
- 再进入findConnection(...)方法:
/**
* Returns a connection to host a new stream. This prefers the existing connection if it exists,
* then the pool, finally building a new connection.
*/
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
//先尝试用已经分配的connection,但是我们需要做一些检查,因为有的connection在刚create的时候会被
//限制(restricted )
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// We had an already-allocated connection and it's good.
// 已分配的connection通过了初步检查,是合法的。
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}
//如果上面的检查导致已分配的connection不能用,下面就会尝试从connectionPool里获取一个connection
if (result == null) {
// Attempt to get a connection from the pool.
//下面这行代码看似简单,其实意义也简单,就是根据address的信息从connectionPool里获取一个可复
//用connection,但是代码的执行流程我们一定要梳理透彻。Internal.instance是OkHttpClient里的static
//代码块,Internal.instance的acquire方法(传入的3个参数意义重大,尤其是第三个参数this)会走到
//ConnectionPool的acquire方法,最后又回到了当前 StreamAllocation的acquire方法,这个过程会获取
//一个可复用的connection
Internal.instance.acquire(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
// 如果截止这里有可复用的connection,就返回这个connection。
return result;
}
// If we need a route selection, make one. This is a blocking operation.
// 这里是route选择,是对多ip的支持
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
// 遍历多个ip的路由,根据route,address设置connection到connectionPool里,然后从connectionPool
// 里拿到可复用的connection
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.acquire(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// Do TCP + TLS handshakes. This is a blocking operation.
// 如果以上还没有找到可复用的connection,那就新建一个Connection,进行 TCP + TLS握手,这是个阻
// 塞操作
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// Pool the connection.
// 把新建的connection放入到connectionPool里
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
// 如果是HTTP2,与此同时该address的另外一个多路连接同时被创建,那就释放这个连接,用那个连
// 接。
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
这个方法很重要,也很长,上文我们也进行了简单的注释,下面再把大致流程说明一下:
(1). 判断当前StreamAllocation对象是否已经有一个Connection对象了,如果有就可以把这个对象给下面的代码用了;
(2). 如果(1)中没有获取到Connection,那就尝试从ConnectionPool获取一个可复用Connection;
(3). 如果(2)中没有获取到Connection,那就对路由列表进行遍历,试图从路由列表里找到多IP的支持,然后再尝试从ConnectionPool获取一个可复用Connection;
(4). 如果(3)中没有获取到Connection,那就new一个Connection对象,进行tcp+tsl握手连接;
(5). 将(4)中new出来的Connection对象放入到ConnectionPool;
(6). 如果是http2的连接,发现有新的多路连接被同时创建就用新创建的那个connection,把刚才放入ConnectionPool的connection 释放掉(获取到其socket,然后close掉);
(7). 最后返回这个connection。
在这个过程中我们还有几个方法要简单阐述:
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
EventListener eventListener) {
if (protocol != null) throw new IllegalStateException("already connected");
RouteException routeException = null;
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy"));
}
} else {
if (route.address().protocols().contains(Protocol.H2_PRIOR_KNOWLEDGE)) {
throw new RouteException(new UnknownServiceException(
"H2_PRIOR_KNOWLEDGE cannot be used with HTTPS"));
}
}
//开始连接
while (true) {
try {
if (route.requiresTunnel()) {
//通道连接,一般不会走这里,这个Tunnel主要是指SSL且有HTTP proxy
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
break;
}
} else {
//一般都是Socket连接
connectSocket(connectTimeout, readTimeout, call, eventListener);
}
// https建立
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
if (route.requiresTunnel() && rawSocket == null) {
ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
+ MAX_TUNNEL_ATTEMPTS);
throw new RouteException(exception);
}
if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}
这个就是新建一个connection,进行连接的方法,一般都是进入connectSockt()方法
/** Does all the work necessary to build a full HTTP or HTTPS connection on a raw socket.
在java原Socket上构建完整的HTTP或者HTTPS连接
*/
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
eventListener.connectStart(call, route.socketAddress(), proxy);
rawSocket.setSoTimeout(readTimeout);
try {
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
// The following try/catch block is a pseudo hacky way to get around a crash on Android 7.0
// More details:
// https://github.com/square/okhttp/issues/3245
// https://android-review.googlesource.com/#/c/271775/
try {
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
然后会进入 Platform.get().connectSocket(...)方法,最后会进行Socket连接。
//okhttp3.internal.platform.Platform
public void connectSocket(Socket socket, InetSocketAddress address, int connectTimeout)
throws IOException {
socket.connect(address, connectTimeout);
}
//java.net.Socket
public void connect(SocketAddress endpoint, int timeout) throws IOException {
if (endpoint == null)
throw new IllegalArgumentException("connect: The address can't be null");
if (timeout < 0)
throw new IllegalArgumentException("connect: timeout can't be negative");
if (isClosed())
throw new SocketException("Socket is closed");
if (!oldImpl && isConnected())
throw new SocketException("already connected");
if (!(endpoint instanceof InetSocketAddress))
throw new IllegalArgumentException("Unsupported address type");
InetSocketAddress epoint = (InetSocketAddress) endpoint;
InetAddress addr = epoint.getAddress ();
int port = epoint.getPort();
checkAddress(addr, "connect");
SecurityManager security = System.getSecurityManager();
if (security != null) {
if (epoint.isUnresolved())
security.checkConnect(epoint.getHostName(), port);
else
security.checkConnect(addr.getHostAddress(), port);
}
if (!created)
createImpl(true);
if (!oldImpl)
impl.connect(epoint, timeout);
else if (timeout == 0) {
if (epoint.isUnresolved())
impl.connect(addr.getHostName(), port);
else
impl.connect(addr, port);
} else
throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
connected = true;
/*
* If the socket was not bound before the connect, it is now because
* the kernel will have picked an ephemeral port & a local address
*/
bound = true;
}
/**
* Acquires a recycled connection to {@code address} for {@code streamAllocation}. If non-null
* {@code route} is the resolved route for a connection.
*/
void acquire(Address address, StreamAllocation streamAllocation, @Nullable Route route) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return;
}
}
}
这个方法是根据Address和Route这2个参数从Connection队列(connections)里找出合格的connection,然后把这个connection传递到StreamAllocation里。
/**
* Returns true if this connection can carry a stream allocation to {@code address}. If non-null
* {@code route} is the resolved route for a connection.
*/
public boolean isEligible(Address address, @Nullable Route route) {
// If this connection is not accepting new streams, we're done.
if (allocations.size() >= allocationLimit || noNewStreams) return false;
// If the non-host fields of the address don't overlap, we're done.
if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
// If the host exactly matches, we're done: this connection can carry the address.
if (address.url().host().equals(this.route().address().url().host())) {
return true; // This connection is a perfect match.
}
// At this point we don't have a hostname match. But we still be able to carry the request if
// our connection coalescing requirements are met. See also:
// https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
// https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
// 1. This connection must be HTTP/2.
if (http2Connection == null) return false;
// 2. The routes must share an IP address. This requires us to have a DNS address for both
// hosts, which only happens after route planning. We can't coalesce connections that use a
// proxy, since proxies don't tell us the origin server's IP address.
if (route == null) return false;
if (route.proxy().type() != Proxy.Type.DIRECT) return false;
if (this.route.proxy().type() != Proxy.Type.DIRECT) return false;
if (!this.route.socketAddress().equals(route.socketAddress())) return false;
// 3. This connection's server certificate's must cover the new host.
if (route.address().hostnameVerifier() != OkHostnameVerifier.INSTANCE) return false;
if (!supportsUrl(address.url())) return false;
// 4. Certificate pinning must match the host.
try {
address.certificatePinner().check(address.url().host(), handshake().peerCertificates());
} catch (SSLPeerUnverifiedException e) {
return false;
}
return true; // The caller's address can be carried by this connection.
}
这个方法就是如何根据Address和Route判断connection是否合格的,其实主要是通过Host匹配,后面有一些关于HTTP2和关于代理的特殊处理,就不做细说了。
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}
这个方法就是把一个Connection对象放入到connectionPool里,其实没什么要说的,但是这里没会涉及到一个属性cleanupRunnable
,这个属性是用于回收连接池中的无效连接,所以这里又涉及到回收无效链接的方法。
/**
* Performs maintenance on this pool, evicting the connection that has been idle the longest if
* either it has exceeded the keep alive limit or the idle connections limit.
*
* <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
* -1 if no further cleanups are required.
*/
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
//通过遍历找到要被驱逐(出栈)的connection,或者找到将要被过期驱逐的时间。
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// If the connection is in use, keep searching.
// 如果当前这个connection正在被使用,那就对for循环continue,找下个connection进行计算
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
// 计算空闲时间
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
// 如果空闲时间比longestIdleDurationNs还大,那就把这个idleDurationNs的值赋值到
// longestIdleDurationNs
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
//最长空闲时间比保活时间大或者空闲connection数目已经达到最大了,就把当前这个connection移除。
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
// 如果所有connections都在使用,那我们只能进行下一次循环了,
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
// 如果没有连接池,那就返回-1
cleanupRunning = false;
return -1;
}
}
closeQuietly(longestIdleConnection.socket());
// Cleanup again immediately.
return 0;
}
这个就是从ConnectionPool里回收无效连接的方法。方法内使用了类似GC算法中的标记-擦除算法,标记处最不活跃的连接进行清除。步骤如下:
(1). 遍历connections队列,如果当前的connection对象正在被使用,continue,inUseConnectionCount+1
(2). 否则idleConnectionCount+1,同时判断当前的connection的空闲时间是否比已知的长,是的话把它记录下来
(3). 如果空闲时间超过了保活时间,或者当前空闲连接数超过了最大空闲连接数,说明这个connection对象需要被回收,将它从connections队列中移除,在代码最后两行进行关闭操作,同时返回0通知cleanupRunnable的线程马上继续执行cleanup方法
(4). 如果3不满足,判断idleConnectionCount是否大于0,是的话返回保活时间与空闲时间差keepAliveDurationNs - longestIdleDurationNs,cleanupRunnable的线程等待时间keepAliveDurationNs - longestIdleDurationNs后继续执行cleanup方法
(5). 如果4不满足,说明没有空闲连接,继续判断有没有正在使用的连接,有的话返回保活时间keepAliveDurationNs,提醒cleanupRunnable的线程至少等待时间keepAliveDurationNs后才需要继续执行cleanup方法
(6). 如果5也不满足,当前说明连接池中没有连接,返回-1,告诉cleanupRunnable的线程不需要再执行了。等待下次调用put方法时再执行。
更多的Interceptor请看https://www.jianshu.com/p/efec03010a38
网友评论