一、引言
在我们日常开发中,OkHttp
可谓是最常用的开源库之一,目前就连Android API
中的网络请求接口都是用的OkHttp
,好吧,真的很强。
在上学期间我也曾阅读和分析过OkHttp
的源码,并记录在笔记中,不过现在再去翻看的时候发现当时很多地方并没有正确理解,因此也趁着这个过年假期重新阅读和整理一遍,感兴趣的童鞋可以看看。
本文纯属基于个人理解,因此受限于知识水平,有些地方可能依然没有理解到位,还请发现问题的童鞋理性指出。
温馨提示: 本文很长,源码基于
OKHttp-3.11.0
二、从一个简单请求入手分析整体运作流程
1. 先来点关于 OkHttpClient 的官方释义
OkHttpClient
作为Call
的工厂类,用于发送HTTP
请求并读取相应数据。
OkHttpClient
应当被共享.
使用OkHttpClient
的最佳使用方式是创建一个OkHttpClient
单例,然后复用这个单例进行所有的HTTP
请求。为啥呢?因为每个OkHttpClient
自身都会持有一个连接池和线程池,所以符用连接和线程可以减少延迟、节约内存。相反地,如果给每个请求都创建一个OkHttpClient
的话,那就是浪费闲置线程池的资源。
可以如下使用new OkHttpClient()
创建一个默认配置的共享OkHttpClient
.
public final OkHttpClient client = new OkHttpClient();
或使用 new OkHttpClient.Builder()
来创建一个自定义配置的共享实例:
public final OkHttpClient client = new OkHttpClient.Builder()
.addInterceptor(new HttpLoggingInterceptor())
.cache(new Cache(cacheDir, cacheSize))
.build();
可以通过newBuilder()
来自定义OkHttpClient
,这样创建出来的OkHttpClient
具有与原对象相同的连接池、线程池和配置。使用这个方法可以派生一个具有自己特殊配置的OkHttpClient
以符合我们的特殊要求。
如下示例演示的就是如何派生一个读超时为500毫秒的OkHttpClient
:
OkHttpClient eagerClient = client.newBuilder()
.readTimeout(500, TimeUnit.MILLISECONDS)
.build();
Response response = eagerClient.newCall(request).execute();
关闭(Shutdown
)不是必须的。
线程和连接会一直被持有,直到当它们保持闲置时自动被释放。但是如果你编写的应用需要主动释放无用资源,那么你也可以主动去关闭。通过shutdown()
方法关闭分发器dispatcher
的执行服务,这将导致之后OkHttpClient
收到的请求全部被拒绝掉。
client.dispatcher().executorService().shutdown();
清空连接池可以用evictAll()
,不过连接池的守护线程可能不会马上退出。
client.connectionPool().evictAll();
如果相关比缓存,可以调用close()
,注意如果缓存已经关闭了再创建call
的话就会出现错误,并且会导致call
崩溃。
client.cache().close();
OkHttp
同样会为所有HTTP/2
连接建立守护线程,并且再它们保持闲置状态时自动关闭掉它们。
2. 常规使用
上面的官方释义描述了OkHttpClient
的最佳实践原则和清理操作,接下来我们根据一个简单的GET
请求操作来引出我们要分析的问题:
如下创建一个OkHttpClient
实例,添加了Intercepter
,并在工程目录下建了个名为cache
的Cache
缓存:
Interceptor logInterceptor = chain -> {
Request request = chain.request();
System.out.println(request.url());
System.out.println(request.method());
System.out.println(request.tag());
System.out.println(request.headers());
return chain.proceed(request);
};
okHttpClient = new OkHttpClient.Builder()
.cache(new Cache(new File("cache/"), 10 * 1024 * 1024))
.addInterceptor(logInterceptor)
.build();
然后一个普通的GET
请求是这样的,这里以获取 玩Android 首页列表为例。
public void getHomeList(int page){
// 1. 建立HTTP请求
Request request = new Request.Builder()
.url(String.format("http://wanandroid.com/article/list/%d/json", page))
.get()
.build();
// 2. 基于 Request创建 Call
okhttp3.Call call = okHttpClient.newCall(request);
// 3. 执行Call
call.enqueue(new Callback() {
@Override
public void onFailure(okhttp3.Call call, IOException e) {
e.printStackTrace();
}
@Override
public void onResponse(okhttp3.Call call, Response response) throws IOException {
System.out.println(response.message());
System.out.println(response.code());
System.out.println(response.headers());
if (response.isSuccessful()){
ResponseBody body = response.body();
if (body == null) return;
System.out.println(body.string());
// 每个ResponseBody只能使用一次,使用后需要手动关闭
body.close();
}
}
});
}
3. 执行流程分析
注意到上面的okHttpClient.newCall(request)
,对应的源码如下,可知它创建的实际上是Call
的实现类RealCall
。
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
Call
提供请求任务的执行和取消和相关状态操作方法。类似于FutureTask
,是任务执行单元,其核心的执行方法代码如下,包含同步执行(execute()
)和异步执行(enqueue()
)两种方式。对于同步方法而言,RealCall
仅仅通过executed()
方法将自身记录在Dispatcher
(分发器)的同步请求队列中,这是为了在分发器中统计请求数量,在请求结束之后则通过finished()
方法将自身从分发器中的同步请求队列中移除,而真正进行数据请求的是在拦截器Intercepter
,如下源码:
@Override public Response execute() throws IOException {
// ...
eventListener.callStart(this);
try {
// 1. 仅仅将这个 Call记录在分发器 ( Dispatcher )的同步执行队列中
client.dispatcher().executed(this);
// 2. 通过拦截器链获取响应数据,这里才会真正的执行请求
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
// 3. 拿到响应数据后从分发器的同步执行队列中移除当前请求
client.dispatcher().finished(this);
}
}
跟进至getResponseWithInterceptorChain()
,可以注意到,除了我们在创建OkHttpClient
时添加的拦截器外,每个HTTP
请求都会默认添加几个固有的拦截器,如
RetryAndFollowUpInterceptor
、BridgeInterceptor
、CacheInterceptor
、ConnectInterceptor
、CallServerInterceptor
。
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
关于它们的源码实现会在后面的核心类解读中详细分析,这里先了解一个它们各自的作用:
- RetryAndFollowUpInterceptor:用于失败时恢复以及在必要时进行重定向。
- BridgeInterceptor:应用代码与网络代码的桥梁。首先根据用户请求建立网络请求,然后执行这个网络请求,最后根据网络请求的响应数据建立一个用户响应数据。
- CacheInterceptor:用于从本地缓存中读取数据,以及将服务器数据写入到缓存。
- ConnectInterceptor:用于打开一个到目标服务器的连接,并切换至下一个拦截器。
- CallServerInterceptor:这是拦截器链的最后一环,至此将真正的进行服务器请求。
请求时整个拦截器的调用链的执行次序如下:
拦截器执行链对于请求时拦截器的调用链你可能会有所疑惑,为什么它是按这个次序执行的呢?咱看看RealInterceptorChain#proceed(...)
方法的主要源码,发现,虽然这里看起来只进行了一次调用,但是如果你结合这些拦截器一起分析的话,你就会发现,其实这里对拦截器集合进行了递归取值,因为每次执行proceed()
方法时集合索引index
会 +1
, 并将index
传入新建的RealInterceptorChain
,而拦截器集合唯一,因此相当于每次proceed
都是依次取得拦截器链中的下一个拦截器并使用这个新建的RealInterceptorChain
,执行RealInterceptorChain#proceed
方法,直到集合递归读取完成。
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec, RealConnection connection) throws IOException {
// ...
// 每次执行 proceed() 方法时 index+1, 然后传入新建的 RealInterceptorChain
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
// 但是 拦截器集合是相同的,因此相当于每次都是依次取得拦截器链中的下一个拦截器
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// ...
return response;
}
递归? 是的,如果你观察的够仔细的话,你会发现,其实BridgeInterceptor
、RetryAndFollowUpInterceptor
、CacheInterceptor
、ConnectInterceptor
都会执行RealInterceptorChain#proceed
方法,相当于这个方法在不断地调用自己,符合递归的执行特性,因此Response
响应数据的返回次序刚好是与请求时相反的。BridgeInterceptor#intercept
相应抽取的源码如下:
public final class BridgeInterceptor implements Interceptor {
@Override public Response intercept(Chain chain) throws IOException {
// do something ...
Response networkResponse = chain.proceed(requestBuilder.build());
// do something ...
return responseBuilder.build();
}
}
因而拦截器链的响应数据返回次序如下:
拦截器链的响应次序我靠,是不是觉得设计的非常巧妙,这也是我热衷于源码的重要原因之一,因为不看看别人的代码你就永远不知道别人有多骚。。
根据上面的分析,我们已经知道了原来正真执行请求、处理响应数据是在拦截器,并且对于同步请求,分发器Dispatcher
仅仅是记录下了同步请求的Call
,用作请求数量统计用的,并没有参与到实际请求和执行中来。
OK,来看看异步请求RealCall#enqueue()
和Dispatcher#enqueue()
,毫无疑问,异步请求肯定是运行在线程池中了
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
Dispatcher#enqueue()
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
对于上面的AsyncCall
,核心源码如下,注意到getResponseWithInterceptorChain()
,是不是非常地熟悉了,在上面的同步请求那里已经详细解释过了,就不再累赘了。
final class AsyncCall extends NamedRunnable {
// ...
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
// ...
} catch (IOException e) {
// ...
} finally {
client.dispatcher().finished(this);
}
}
}
至此,OkHttp
的主体运作流程是不是已经清晰了,不过有没有感觉还少点什么,我们只是分析了运作流程,具体到怎么连接的问题还没有分析。
好吧,既然是建立连接,那么极速定位到ConnectInterceptor
,没毛病吧, 核心源码如下:
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
// ...
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
boolean doExtensiveHealthChecks = !request.method().equals("GET");
// 注意这里
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}
注意到上面的streamAllocation.newStream(..)
,源码如下:
public HttpCodec newStream( OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
// ...
// 1. 查找可用连接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
// 2. 建立 HTTP 或 HTTP2 连接
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
}
继续定位到 findHealthyConnection
:
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks) throws IOException {
while (true) { // 完全阻塞式查找,找不到不罢休
// 1. 查找已有连接
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// 2. 如果这是一条全新的连接,那么可以跳过大量的健康检查,直接返回
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// 3. 做一个速度超慢的检查,以确保池中的连接仍然可用,
// 如果不可用了就将其从池中剔除,然后继续查找
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
定位到StreamAllocation#findConnection
,这里查找连接的规则是:先查看当前是否存在可用连接,如果不存在,再从连接池中查找,如果还没有,那就新建一个,用来承载新的数据流。 需要注意的一个细节就是,从连接池查找连接时会查询两次,第一次只是根据当前目标服务器地址去查,如果没有查到,则第二次会重新选择路由表,然后用该地址去匹配。最终如果存在已经创建好的连接,则直接返回使用,如果不存在,则新建一个连接,进行TCP
和TLS
握手,完事之后将这个连接的路由信息记录在路由表中,并把这个连接保存到连接池。还需要注意的一点是:如果有个连接与当前建立的连接的地址相同,那么将释放掉当前建立好的连接,而使用后面创建的连接(保证连接是最新的)
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
synchronized (connectionPool) {
// ...
// 1. 检查当前有没有可用连接,如果有,那么直接用当前连接
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) { // 可用
result = this.connection;
releasedConnection = null;
}
// ...
if (result == null) {
// 2. 不存在已有连接或者已有连接不可用,则尝试从连接池中获得可用连接
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
// ...
if (result != null) { // 已有连接中找到了连接,完成任务
return result;
}
boolean newRouteSelection = false;
// 选择一条路由,这是个阻塞式操作
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// 路由已经选好了,此时再根据路由中的 IP集合去匹配连接池中的连接,
// 这个可能因为连接合并的缘故而匹配到
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// 3. 最后实在没找到已有的连接,那么就只能重新建立连接了
route = selectedRoute;
refusedStreamCount = 0;
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}
// 根据路由匹配到了连接池中的连接
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// 进行 TCP + TLS 握手. 这是阻塞式操作
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route()); // 路由表中记录下这个连接的路由信息
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// 将这个连接记录到连接池
Internal.instance.put(connectionPool, result);
// 如果多个连接指向当前创建的连接的相同地址,那么释放掉当前连接,使用后面创建的连接
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
根据以上分析可得出以下主体执行流程:
OkHttp主体执行流程.png当然这是同步请求的流程,而对于异步请求而言,也仅仅是把拦截器链放到了线程池执行器中执行而已。
三、核心类解读
至此,我们已经清楚了OkHttp
的主干,当然,我们仅仅是把流程给走通了,在本节中,我们将根据源码具体分析OkHttp
中各核心类的作用及其实现,内容很长,请做好心理准备。
1. 拦截器(Intercepter
)
1). RetryAndFollowUpInterceptor
作用:用于失败时恢复以及在必要时进行重定向。
作为核心方法,RetryAndFollowUpInterceptor#intercept
体现了RetryAndFollowUpInterceptor
的工作流程,源码如下,我们来分析分析它是怎么恢复和重定向的,具体实现流程还请看注释:
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
// 仅仅创建流的承载对象,此时并没有建立流
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
int followUpCount = 0; // 用于记录重定向和需要授权请求的数量
Response priorResponse = null;
while (true) {
// 1. 如果此时请求被取消了,那么关闭连接,释放资源
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {
// 2. 推进执行拦截器链,请求并返回响应数据
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// 3. 如果连接失败了,尝试使用失败的地址恢复一下,此时请求可能还没有发送
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getFirstConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// 4. 尝试重新与交流失败的服务器重新交流,这个时候请求可能已经发送了
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// 5. 如果是位置异常,那么释放掉所有资源
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// 6. 如果记录的上一个请求大的响应数据存在,那么将其响应体置空
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Request followUp;
try {
// 7. 处理请求的认证头部、重定向或请求超时问题,如果这些操作都不必要
// 或者应用不了,那么返回 null
followUp = followUpRequest(response, streamAllocation.route());
} catch (IOException e) {
streamAllocation.release();
throw e;
}
//需要处理认证、重定向和超时问题,那么结束处理,返回响应数据
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
// 否则,关闭当前响应,进行后续重定向等问题的处理
closeQuietly(response.body());
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
// ...
if (!sameConnection(response, followUp.url())) {
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
request = followUp;
priorResponse = response;
}
}
2). BridgeInterceptor
作用:应用代码与网络代码的桥梁。首先根据用户请求建立网络请求,然后执行这个网络请求,最后根据网络请求的响应数据建立一个用户响应数据。
BridgeInterceptor#intercept
源码如下,主要做了以下事情:
-
用于请求: 这个是在推进请求拦截器链时进行的,也就是说此时尚未真正地进行网络请求。此时会补充缺失的请求头参数,如
Content-Type
、Transfer-Encoding
、Host
、Connection
、Accept-Encoding
、User-Agent
、Cookie
。如果在请求时添加了gzip
请求头参数,即开启了gzip
压缩,那么在取得响应数据时需要对数据进行解压。 -
用于响应: 这个实在取得网络响应数据后回退拦截器链时进行的,即已经取得了网络响应数据。此时会对相应头部进行处理,如果请求时开启了
gzip
压缩,那么此时会对响应数据进行解压。
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
// 这里处于请求之前
// 1. 此时主要为请求添加缺失的请求头参数
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
}
// ...
// 如果启用了GZIP压缩,那么需要负责解压响应数据
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
//...
// 2. 推进执行拦截器链,进行请求、返回数据
Response networkResponse = chain.proceed(requestBuilder.build());
// 取得网络响应数据后
// 3. 处理响应头,如果请求时开启了GZIP压缩,那么这里需要将响应数据解压
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
// 建立用户响应数据
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
综上所述,BridgeInterceptor
主要用于请求前对用户请求进行完善,补充缺失参数,然后推进请求拦截器链,并等待响应数据返回,取得响应数据后则是将其转换成用户响应数据,此时如果数据进行过gzip
压缩,那么会在这里进行解压,然后重新封装成用户数据。
3). CacheInterceptor
作用:用于从本地缓存中读取数据,以及将服务器数据写入到缓存。
CacheInterceptor#intercept
源码如下,拦截器链执行到这一步主要做了如下事情:
-
请求:
如果开启了缓存,且请求策略是禁用网络仅读缓存的话,那么首先会根据当前请求去查找缓存,如果匹配到了缓存,则将缓存封装成响应数据返回,如果没有匹配到,那么返回一个
504
的响应,这将导致请求拦截器链执行终止,进而返回执行响应拦截器链。如果请求策略是网络加缓存,当那么然网络请求优先,所以就推进请求拦截器链执行请求,
-
网络响应:
在得到网络响应数据后,如果开启了缓存策略其匹配到了旧缓存,那么根据最新网络请求响应数据更新缓存,然后返回响应数据;如果没有匹配到缓存但是开启了缓存,那么将响应数据写入缓存后返回;而如果开启了缓存,但是并不使用缓存策略,那么根据响应数据移除缓存中对应的数据缓存。
@Override public Response intercept(Chain chain) throws IOException {
// 读取候选的旧缓存
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
// 解析请求和缓存策略
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest; // 如果仅读缓存,那么网络请求会为 null
Response cacheResponse = strategy.cacheResponse;
// ...
// 如果禁止使用网络而仅读取缓存的话,那么没匹配到缓存时返回 504
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
// ...
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
// ...
.body(Util.EMPTY_RESPONSE)
.build();
}
// 如果禁止使用网络而仅读取缓存的话,那么匹配到缓存时将其返回
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
// 推进执行请求拦截器链
networkResponse = chain.proceed(networkRequest);
} finally {
// 请求异常则关闭候选缓存实体
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// 根据最新网络请求响应数据更新缓存,然后返回响应数据
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
// ...
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// ...
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
// 无匹配缓存的情况
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// 使用缓存策略且无匹配缓存,则将响应数据写入缓存
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try { // 不使用缓存策略,则删除已有缓存
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
缓存时判断逻辑比较多,不过这里重点在于理解缓存策略,一般会有:仅网络、仅缓存、网络加缓存 三种请求策略。
4). ConnectInterceptor
作用:用于打开一个到目标服务器的连接,并切换至下一个拦截器
因为在上一节末尾分析OkHttp
如何建立连接的问题上已经分析过了,所以不做过多描述。
这里回忆一下连接规则:建立连接时,首先会查看当前是否有可用连接,如果没有,那么会去连接池中查找,如果找到了,当然就使用这个连接,如果没有,那么就新建一个连接,进行TCP
和TLS
握手以建立联系,接着把连接放入连接池以备后续复用,最后推进请求拦截器链执行,将打开的连接交给下一个拦截器去处理。
5). CallServerInterceptor
作用:这是拦截器链的最后一环,至此将真正的进行服务器请求
CallServerInterceptor#intercept
源码如下,作为拦截器链的最后一环,当然要真正地做点实事了,大致操作步骤是:
发送请求头部 --> 读取一下GET
、HEAD
之外的请求(如POST
)的响应数据 --> 结束请求的发送动作 --> 读取响应头部 --> 读取响应数据 --> 封装后返回。
@Override public Response intercept(Chain chain) throws IOException {
// ...
long sentRequestMillis = System.currentTimeMillis();
// 1. 发送请求头部
httpCodec.writeRequestHeaders(request);
Response.Builder responseBuilder = null;
// 2. 检查是否是 GET 或 HEAD 以外的请求方式、读取响应数据
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// 如果请求头部中包含"Expect: 100-continue",那么在转换请求体前等待"HTTP/1.1 100 Continue"
// 响应。如果没有读取到"HTTP/1.1 100 Continue"的响应,那么就不转换请求体(request body)了,
// 而直接将我们得到的响应返回(如 状态为 4XX 的响应 );
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
// 读取、转换响应头部
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {
// 如果存在"Expect: 100-continue",则写入请求体
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
} else if (!connection.isMultiplexed()) {
// 如果不存在"Expect: 100-continue",那么禁止 HTTP/1 连接复用。
// 不过我们仍然必须转换请求体以使连接达到一个固定的状态。
streamAllocation.noNewStreams();
}
}
// 3. 结束请求的发送动作
httpCodec.finishRequest();
// 4. 读取响应头部
if (responseBuilder == null) {
responseBuilder = httpCodec.readResponseHeaders(false);
}
// 5. 构建响应数据
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
// 如果服务器返回 100-continue 响应即使我们并没有这么请求,则重新读取一遍响应数据;
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
// 6. 填充响应数据
if (forWebSocket && code == 101) {
// 连接正在更新,不过我们需要确保拦截器收到的 non-null 的响应体
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
// 这里将 http 输入流包装到响应体
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
// 其他情况...
return response;
}
综上,作为拦截器最后一环的CallServerInterceptor
终于把请求给终结了,完成了与服务器的沟通交流,把需要的数据拿了回来。请求的时候每个拦截器都会插上一脚,响应的时候也一样,把数据转换的工作分给了各个拦截器处理。
2. 分发器(Dispatcher
)
为什么叫分发器呢?如果叫做执行器(Executor
)可能会更好理解一些,因为它的工作就是执行异步请求,虽然会统计请求的数量....嗯~~好吧,换个角度,如果理解为它用于把异步任务分发给线程池执行,起到任务分发的作用,那就理解为啥叫分发器了。
OK,先来观察一下Dispatcher
的构成,部分源码如下,可以先看看注释:
public final class Dispatcher {
private int maxRequests = 64; // 同时执行的最大异步请求数量,数量超过该值时,新增的请求会放入异步请求队列中
private int maxRequestsPerHost = 5; // 每个主机最多同时存在的请求数量
private @Nullable Runnable idleCallback;
// 线程池执行器
private @Nullable ExecutorService executorService;
// 尚未执行的任务队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
// 正在执行的任务队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
// 同步执行的任务队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
// 初始化线程池执行器
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
// ...
}
简单描述一下:Dispatcher
包含三个任务队列,分别用于记录尚未执行的异步请求、正在执行的异步请求、正在执行的同步请求。包含一个线程池,用于执行异步请求,这个线程池执行器的核心线程数量为 0
, 最大线程数量不限(整型的最大值2^31-1
,相当于不限),闲置线程的最大等待超时时间为60
秒,线程池的任务队列使用非公平机制的SynchronousQueue
。这就是Dispatcher
的主要配置。
我们来看看它是如何限制每个主机的请求数量的,直接看注释好了。
synchronized void enqueue(AsyncCall call) {
// 正在执行的异步请求数量小于限定值,且同一主机的正在执行的异步请求数量小于限定值时
// 添加到正在执行的异步请求队列中,并执行。
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else { // 否则就添加到等待队列中
readyAsyncCalls.add(call);
}
}
runningCallsForHost
用于计算当前正在执行的连接到相同主机上异步请求的数量
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {
if (c.get().forWebSocket) continue;
if (c.host().equals(call.host())) result++;
}
return result;
}
3. 连接池(ConnetionPool
)
作用:用于管理HTTP
和HTTP/2
连接的复用以减少网络延迟,因为使用相同地址的请求可能共享一个连接。所以ConnetionPool
实现了维护已打开连接已被后续使用的机制。
线程池啥的就不多说了,这里主要分析一下ConnetionPool
如何维护已打开的连接。从ConnetionPool#put
着手:
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) { // 如果当前不在执行清理任务,那么现在执行
cleanupRunning = true;
executor.execute(cleanupRunnable); // 线程池的作用就是执行清理任务
}
connections.add(connection); // 同时添加到连接队列中
}
cleanupRunnable
源码如下,根据ConnetionPool#put
可知每当我们往连接池中添加一个连接时,如果当前不在执行清理任务(cleanupRunnable
),那么立马会执行cleanupRunnable
,而cleanupRunnable
中会循环执行cleanup
,直到所有连接都因闲置超时而被清理掉,具体还请先看注释。
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime()); // 执行清理
if (waitNanos == -1) return; // cleanup中的情况 4)
if (waitNanos > 0) { // cleanup中的情况 2) 和 3)
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos); // 等待超时
} catch (InterruptedException ignored) {
}
}
}
// 至此情况 1), 2), 3) 都会导致 `cleanup`被循环执行
}
}
};
cleanup
源码如下,它的作用是查找、清理超过了keep-alive
时间限制或者闲置超时闲置的连接。具体还请看注释。
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;
// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
// 1. 查找超时连接
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
// 1). 如果正在使用,那么跳过,继续查找
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
idleConnectionCount++; // 记录闲置连接的数量
// 2). 如果闲置时间超过了最大允许闲置时间,则记录下来在后面清除
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
// 2. 查找完成了,将在这里对闲置连接进行处理
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// 1). 确定已经超时了,那么从连接池中清除,关闭动作会在同步块外面进行
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// 2). 存在闲置连接,但是尚未超时
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// 3). 如果所有连接都正在使用,那么最多保持个`keep-alive`超时时间就又会重新执行清理动作
return keepAliveDurationNs;
} else {
// 4). 压根没有连接,那不管了,标记为非清理状态,并返回-1
cleanupRunning = false;
return -1;
}
}
// 3. 关闭上面查找到的处于情况1)的闲置超时连接
closeQuietly(longestIdleConnection.socket());
// 返回 0 ,表示马上会重新回来执行清理操作
return 0;
}
综上,ConnectionPool
会设定keepAliveDurationNs
、longestIdleDurationNs
两个超时时间,而每次往连接池中添加一个新连接时,如果当前处于非清理装填,都会导致线程池执行器开个线程执行清理动作,而对于清理动作而言,会遍历连接池,查找闲置超时的连接,并记录闲置连接的数量,而遍历完成后,将根据情况 2. 1)、2)、3)、4) 进行相应的处理,而如果是情况 2. 4), 则会当即结束清理循环,意味着连接池中已经没有连接了,此时线程会执行完成而退出,其他几种情况都不会中断循环,因此实际上这个线程池最多只会存在一个连接池维护线程。
四、总结
一般来说,当使用OKHttp
通过URL
请求时,它做了以下事情:
- 使用
URL
并且配置OKHttpClient
来创建地址(Address
),这个地址指定了我们连接web
服务器的方式。 - 尝试从连接池(
ConnectionPool
)中取出与该地址相同的连接。 - 如果在连接池中没有对应该地址的连接,那么它会选择一条新路线(
route
)去尝试,这通常意味着将进行DNS
请求以获取对应服务器的IP
地址,然后如果需要的话还会选择TLS
版本和代理服务器。 - 如果这是一条新路线,它会通过
Socket
连、TLS
隧道(HTTP代理的HTTPS)或者TLS
连接,然后根据需要进行TCP
、TLS
握手。 - 发送
HTTP
请求,然后读取响应数据。
如果连接出现问题,OKHttp
会选择另一条路线再次尝试,这使得OKHttp
在服务器地址子集无法访问时能够恢复,而当从连接池中拿到的连接已经过期,或者TLS
版本不支持的情况下,这种方式同样很有用。一旦接收到响应数据,该连接就会返回到连接池中以备后续请求使用,而连接池中的连接也会在一定时间的不活动状态后被清除掉。
对于整体框架而言,本文已经详细分析了OkHttp
的整体工作流程,相关细节还请回到文中去,这里就不再累赘了。
网友评论