OkHttp介绍:
由Square公司贡献的一个处理网络请求的开源项目,是目前Android使用最广泛的网络框架。从Android4.4开始HttpUrlConnection的底层也换成了OkHttp。
特点:
支持http/2并允许对同一主机的所有请求共享一个套接字
通过连接池减少请求延迟
默认通过GZip压缩数据
响应缓存,避免重复的网络请求
请求失败自动重试主机的其他ip,自动重定向
简单使用
// 创建OkHttpClient
OkHttpClient client = new OkHttpClient.Builder().build();
//也可以直接new出来 OkHttpClient okHttpClient = new OkHttpClient();
// get请求
Request getRequest = new Request.Builder()
.url("")
.get()
.build();
//post请求
RequestBody body = new FormBody.Builder()
.add("","")
.add("","")
. build();
Request postRequest = new Request.Builder().post(body)
.url("")
.build();
// 传入getRequest 或者 postRequest 构建call
Call postCall = okHttpClient.newCall(postRequest);
// 发起请求
call.enqueue(new okhttp3.Callback() {
@Override
public void onFailure(okhttp3.Call call, IOException e) {
}
@Override
public void onResponse(okhttp3.Call call, okhttp3.Response response) throws IOException {
}
});

根据流程图可以看到前面几步都是在自己写的代码当中,直到调用execute或者enqueue时才进入了Okhttp内部去执行请求。
1 Dispatcher
首先需要知道的是Dispatcher内部维护了三个队列和一个线程池
// 线程池
private @Nullable ExecutorService executorService;
// ready状态的异步任务队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
// running状态的异步任务队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
// 同步任务队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
先看一张分发器异步请求流程图:

通过上面的图我们可以发现dispatcher可能向readyAsyncCalls放入请求,也可以向runningAysncCalls中存放那么就存在了问题:
(1). 如何决定放入readyAsyncCalls还是runningAysncCalls?
(2). 满足什么条件才会从把readyAsyncCalls请求移动到runningAysncCalls当中?
(3). 线程池如何创建,如何工作?
问题一:如何决定放入readyAsyncCalls还是runningAysncCalls
// 调用newCall实际是调用了返回了一个RealCall对象
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
// 执行enqueue()方法实际是调用了RealCall的enqueue()方法
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
// 最终调用了分发器的enqueue()
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
synchronized void enqueue(AsyncCall call) {
// 第一个问题的答案就在于此
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
// 放入running队列
runningAsyncCalls.add(call);
// 线程池执行任务
executorService().execute(call);
} else {
// 反之存入ready队列
readyAsyncCalls.add(call);
}
}
到了这里,第一个问题就可以解答了。首先判读运行当中的异步任务队列中的任务个数是不是小于maxRequests(默认是64,可自己配置),其次需要满足对同一个主机的正在请求的个数数需要小于maxRequestsPerHost(默认是5,可自己配置)。也就是说超过了最大并发请求数超过了64或者对同一个服务器访问的任务数超过了5,那么就存放到ready队列当中。
问题二:满足什么条件才会从把readyAsyncCalls请求移动到runningAysncCalls当中
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
......// 省略部分代码
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
// 请求失败
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
// 请求成功
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
// 请求失败
responseCallback.onFailure(RealCall.this, e);
}
} finally {
// 无论成功失败 都会执行
client.dispatcher().finished(this);
}
}
}
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
// 把执行完了的任务从队列中移除
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
// promoteCalls为true
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
private void promoteCalls() {
// running状态的任务个数大于最大请求数则忽略
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
// ready状态的队列为空时忽略。也就是没有ready状态的任务,自然不用移动到running当中
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
// 对同一个服务器的并发访问数要小于5
if (runningCallsForHost(call) < maxRequestsPerHost) {
// 从ready中移除
i.remove();
// 添加到running队列
runningAsyncCalls.add(call);
// 交给线程池执行
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
至此第二个问题的答案也找到了。
问题三:线程池
提到线程池就必须先说一下线程池的好处,可以概括为三点:
1.重用线程池中的线程,避免线程的重复创建和销毁所带来的性能开销
2.能有效控制线程池的最大并发数,避免大量线程因相互抢占系统资源导致阻塞
3.能够对线程进行管理,并提供定时以及指定间隔循环执行等功能
ThreadPoolExecutor是线程池的真正实现,构造方法提供了一系列参数来配置线程池
public ThreadPoolExecutor(int corePoolSize,
int maxinumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory
RejectedExecutionHandler handler)
corePoolSize: 线程池的核心线程数。默认情况下,核心线程会在线程池中一直存活,即使处于闲置状态。如果将ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,那么闲置的核心线程在等待新任务到来的时会有超时策略,这个时间由keepAliveTime指定,当等待时间超过了keepAliveTime所指定的时长之后,核心线程就会被终止。
maxinumPoolSize:线程池所能容纳的最大线程数(包括核心线程数),当活动线程数达到这个值后,后续的新任务会被阻塞。
keepAliveTime:非核心线程闲置的超时时长。当非核心线程闲置时间超过这个数值后,就会被回收。当ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,核心线程同样会被回收。
unit:指定keepAliveTime的时间单位。常用的有TimeUnit.MILLISECONDS(毫秒),TimeUnit.SECONDS(秒),TimeUnit.MINUTES(分钟)
workQueue:线程池中的任务队列。通过线程池的execute方法提交的任务会存在这个参数中。
threadFactory:线程工厂,为线程池创建新的线程。这是一个接口。
handler:拒绝策略。这个参数不常用。当线程池无法执行新任务时,ThreadPoolExecutor会调用handler的rejectedExecution()方法来通知调用者,默认情况下rejectedExecution()方法会抛出一个RejectedExecutionException。RejectedExecutionHandler是一个接口,有四个默认实现类,分别是AbortPolicy(丢弃任务并抛出RejectedExecutionException异常),CallerRunsPolicy(由提交任务的线程处理该任务),DiscardOldestPolicy(抛弃队列最前面的任务,然后重新提交被拒绝的任务 ),DiscardPolicy(丢弃任务,但不抛出异常)。
线程池执行时的规则:
(1).如果线程池中的线程数量小于核心线程数,会直接启动一个新线程来执行任务
(2).如果线程池中的线程数量大于等于核心线程数,那么任务会被放入队列中去等待执行
(3).在2的基础上如果队列满了或者加入队列时失败,且线程数量小于线程池规定的最大线程数,则会启动一个非核心线程来执行任务。
(4).如果线程池中的数量达到了规定的最大线程数,那么直接拒绝执行此任务,就会调用RejectedExecutionHandler的rejectedExecution()方法,否则就创建一个新的非核心线程执行此任务。
了解了一些线程池的原理之后,现在来分析一下OkHttp当中的线程池具体是如何创建以及为什么这样创建。
// 可以传入自己的线程池
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
//使用OkHttp默认的线程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
可以看到OkHttp的线程池是没有核心线程的,假如放入第一个任务A,按照上面线程池的执行规则,A任务会则会被存入队列当中然后等待执行。最大线程数是Integer.MAX_VALUE和keepAliveTime为60,这些没什么好说的。主要看队列SynchronousQueue<Runnable>。SynchronousQueue<Runnable>是无容量队列。也就是说向SynchronousQueue<Runnable>添加任务时一定会失败,则根据线程执行规则3,由于Integer.MAX_VALUE是一个非常大的数值,假如没有闲置的线程,那么线程池就会立即创建一个新的线程来执行任务。
那么OkHttp的线程池是期望获得最大并发量且没有任何等待,但是进程的内存是存在限制的,而每一个线程都需要分配一定的内存,所以线程并不能无限。所以才有了OkHttp最大请求任务执行个数为64的限制。这样即解决了这个问题同时也能获得最大吞吐。
2 Interceptors
OkHttp最核心的也是最重要的就是拦截器功能。默认有5个拦截器。根据责任链设计模式在请求的时候各司其职从上往下调用,在拿到响应之后又会按照从下往上的顺序把Response返回。

(1)重试与重定向拦截器 RetryAndFollowUpInterceptor
RetryAndFollowUpInterceptor在交出给下一个拦截器之前会判断用户是否取消了请求。在获得了结果之后会根据响应码判断是否需要重定向,如果满足条件,那么就会重启执行所有拦截器
(2)桥接拦截器 BridgeInterceptor
BridgeInterceptor在交出之前负责将HTTP必备的请求头加入其中(如host),并添加一些默认行为(如GZIP)。在获得结果之后调用保存cookie接口并解析GZIP数据
(3)缓存拦截器 CacheInterceptor
CacheInterceptor在交出之前读取并判断是否使用缓存。获取结果之后判断是否缓存
(4)连接拦截器 ConnectInterceptor
在交出之前负责找到或者新建一个连接,并获得对应的socket流。在获得结果之后不再处理。
(5)请求服务拦截器 CallServerInterceptor
CallServerInterceptor真正的与服务器进行通信,向服务器发送数据,解析读取的响应数据
下面来看一下何时使用拦截器处理以及如何实现的责任链:
final class AsyncCall extends NamedRunnable {
...... //省略部分代码
@Override protected void execute() {
boolean signalledCallback = false;
try {
// 通过拦截器链获取响应
Response response = getResponseWithInterceptorChain();
// 重试重定向拦截器的职责之一:判断用户是否取消了请求
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
...... //省略部分代码
}
}
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
// 用户可以添加自己的拦截器
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor); // 重试重定向
interceptors.add(new BridgeInterceptor(client.cookieJar())); // 桥接
interceptors.add(new CacheInterceptor(client.internalCache())); // 缓存
interceptors.add(new ConnectInterceptor(client)); //连接
if (!forWebSocket) {
// 用户可以添加自己的拦截器
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket)); // 请求服务
/**
* interceptors 所有的拦截器
* index 第五个参数 默认传入0 可以从interceptors根据index的值取出interceptor
*/
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// 开始处理请求
return chain.proceed(originalRequest);
}
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
...... // 省略部分代码
// 构造一个新的chain index初始传入的是0,也就是说新的chain的index加1之后变成了1
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
// 取出拦截器 由于index 此时为0 取出的也就是retryAndFollowUpInterceptor
Interceptor interceptor = interceptors.get(index);
// 执行retryAndFollowUpInterceptor的拦截方法
Response response = interceptor.intercept(next);
...... // 省略部分代码
return response;
}
// 最终会调到retryAndFollowUpInterceptor的拦截方法中的这一行代码
// realChain就是上面new出来的chain并且index的值为1 那么就会重复上面的proceed()方法,每次index都加1
//直到最终执行完所有的拦截器
// 根据方法参数可以知道retryAndFollowUpInterceptor把request传给了下一个拦截器
response = realChain.proceed(request, streamAllocation, null, null);
RetryAndFollowUpInterceptor 源码分析:
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) { // 取消请求就抛出IO异常
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {
// 把request发给下一个拦截器 并获得response
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// 路由异常 连接未成功,请求还未发出去
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
// 跳过此次循环 重新进行一遍
continue;
} catch (IOException e) {
// 请求发出去了 但是和服务器通信失败了
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// Attach the prior response if it exists. Such responses never have a body.
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
// 重定向
Request followUp = followUpRequest(response, streamAllocation.route());
// 如果为null 也就是不存在需要重定向的Request
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
// 返回结果
return response;
}
closeQuietly(response.body());
// 重定向次数大于20次 则抛出协议异常
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
...... // 省略部分代码
}
}
// 返回true代表要进行重试 false 则不进行重试
private boolean recover(IOException e, StreamAllocation streamAllocation,
boolean requestSendStarted, Request userRequest) {
streamAllocation.streamFailed(e);
// OkhttpClient禁止了重试
if (!client.retryOnConnectionFailure()) return false;
// requestSendStarted在http2才可能为true http1.1 则为false
if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
// 不是可重试的异常就不重试
if (!isRecoverable(e, requestSendStarted)) return false;
//不存在更多的路由时 不进行重试 (一个host可能会被dns解析成多个ip ,如果只有一个就返回false)
if (!streamAllocation.hasMoreRoutes()) return false;
// 默认重试
return true;
}
// 返回true表示时可重试的异常 反之是不需要或不可重试的异常
private boolean isRecoverable(IOException e, boolean requestSendStarted) {
// 协议异常不重试
if (e instanceof ProtocolException) {
return false;
}
if (e instanceof InterruptedIOException) {
// 如果异常时一个 socket超时异常就重试 例如网络波动导致超时
return e instanceof SocketTimeoutException && !requestSendStarted;
}
if (e instanceof SSLHandshakeException) {
// 证书异常不重试
if (e.getCause() instanceof CertificateException) {
return false;
}
}
// 证书校验失败不重试
if (e instanceof SSLPeerUnverifiedException) {
return false;
}
return true;
}
可以看到关于重试的条件还是很苛刻的,需要经过很多的判断。总结下来就是要先看是否禁止了重试,然后是不是可重试的异常,证书有没有问题,是不是有多个ip。
下面看一下重定向
private Request followUpRequest(Response userResponse, Route route) throws IOException {
if (userResponse == null) throw new IllegalStateException();
int responseCode = userResponse.code();
final String method = userResponse.request().method();
switch (responseCode) {
case HTTP_PROXY_AUTH: // 407
Proxy selectedProxy = route != null
? route.proxy()
: client.proxy();
// 是否是http代理
if (selectedProxy.type() != Proxy.Type.HTTP) {
throw new ProtocolException("Received HTTP_PROXY_AUTH (407) code while not using proxy");
}
// 是的话就要去校验身份 在配置okhttpclient的时候可以配置
// 例如 OkHttpClient client = new OkHttpClient.Builder()
// .proxyAuthenticator(new Authenticator() {
// @Nullable
// @Override
// public Request authenticate(Route route, okhttp3.Response response) throws IOException {
// Request request = response.request().newBuilder()
// .addHeader("Proxy-Authorization"
// , Credentials.basic("your name","your pwd"))
// .build();
// return request;
// }
// })
// .build();
return client.proxyAuthenticator().authenticate(route, userResponse);
case HTTP_UNAUTHORIZED: // 401
// 出现很少 用法与上面类似 区别是上面是proxyAuthenticator 这个是authenticator
return client.authenticator().authenticate(route, userResponse);
case HTTP_PERM_REDIRECT: //308
case HTTP_TEMP_REDIRECT: //307
// 请求方法不是GET 并且不是HEAD
if (!method.equals("GET") && !method.equals("HEAD")) {
return null;
}
case HTTP_MULT_CHOICE: //300
case HTTP_MOVED_PERM: //301
case HTTP_MOVED_TEMP: //302
case HTTP_SEE_OTHER: //303
// 客户端是否允许重定向
if (!client.followRedirects()) return null;
// 响应头不包含“Location” 不重定向
String location = userResponse.header("Location");
if (location == null) return null;
// 包含了location则取出来对应的地址
HttpUrl url = userResponse.request().url().resolve(location);
// 没有url 返回null 也就是不重定向
if (url == null) return null;
boolean sameScheme = url.scheme().equals(userResponse.request().url().scheme());
// Scheme不一样并且不允许切换到https 则不重定向
if (!sameScheme && !client.followSslRedirects()) return null;
Request.Builder requestBuilder = userResponse.request().newBuilder();
// method不是get或head
if (HttpMethod.permitsRequestBody(method)) {
// maintainBody = method.equals("PROPFIND")
final boolean maintainBody = HttpMethod.redirectsWithBody(method);
// method不是"PROPFIND"
if (HttpMethod.redirectsToGet(method)) {
// mothod设置为get body为null
requestBuilder.method("GET", null);
} else {
RequestBody requestBody = maintainBody ? userResponse.request().body() : null;
requestBuilder.method(method, requestBody);
}
if (!maintainBody) {
requestBuilder.removeHeader("Transfer-Encoding");
requestBuilder.removeHeader("Content-Length");
requestBuilder.removeHeader("Content-Type");
}
}
if (!sameConnection(userResponse, url)) {
requestBuilder.removeHeader("Authorization");
}
return requestBuilder.url(url).build();
case HTTP_CLIENT_TIMEOUT: //408
// 不允许重试 则不重定向
if (!client.retryOnConnectionFailure()) {
return null;
}
// body是不可重复的body 则不重定向
if (userResponse.request().body() instanceof UnrepeatableRequestBody) {
return null;
}
// 不是响应408的重试结果则不重定向
if (userResponse.priorResponse() != null
&& userResponse.priorResponse().code() == HTTP_CLIENT_TIMEOUT) {
return null;
}
// 服务器未响应Retry-After 或者响应了Retry-After:0 则不重定向
if (retryAfter(userResponse, 0) > 0) {
return null;
}
return userResponse.request();
case HTTP_UNAVAILABLE: // 503 服务器不可用
//不是响应503的重试结果则不重定向
if (userResponse.priorResponse() != null
&& userResponse.priorResponse().code() == HTTP_UNAVAILABLE) {
return null;
}
// 服务器明确响应Retry-After:0 则重定向
if (retryAfter(userResponse, Integer.MAX_VALUE) == 0) {
return userResponse.request();
}
return null;
default:
return null;
}
}
BridgeInterceptor源码分析
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
...... // 省略部分代码
// 没有host 添加host
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}
// 没有Connection添加Connection
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
//Gzip
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
// cookie
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
...... // 省略部分代码
return responseBuilder.build();
}
可以看到桥接拦截器的工作还是很简单的,就是检查请求头,没有的就添加上。简单来说就是补全信息以及cookie和Gzip。
CacheInterceptor源码分析:
@Override public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
// 根据请求和缓存得到缓存策略
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
// 网络请求
Request networkRequest = strategy.networkRequest;
// 缓存响应
Response cacheResponse = strategy.cacheResponse;
......
// 既不存在网络请求 也不存在缓存 直接返回504
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// 不存在网络请求 存在缓存 则返回缓存
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
// 存在网络请求 则发起请求
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
//如果之前存在缓存
if (cacheResponse != null) {
// 最近此次通过网络请求的响应的响应码是304 那么更新缓存
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
......
return response;
}

ConnectInterceptor源码分析:
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
boolean doExtensiveHealthChecks = !request.method().equals("GET");
// 查找连接 找到了就复用 没有就新建
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
乍一看,就这几行代码,而且也看不出什么东西来。其实不然,这其中包含了一个连接池。连接池的作用与线程池的功能类似,不过是保存的socket连接。连接池是在初始化StreamAllocation的时候传入的,而StreamAllocation则是在RetryAndFollowUpInterceptor中被创建的。
//ConnectPool部分源码
//和Dispatcher中的线程池差不多 这个线程池的作用是清理连接池中的连接
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
/**
* maxIdleConnections 最大连接数 默认是5
* keepAliveDuration 存活时间 默认5分钟
*/
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
// 存入队列当中
connections.add(connection);
}
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
// 遍历队列
for (RealConnection connection : connections) {
// 检查是不是与连接池中的连接一致 ,一致的话就可以服用
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
public boolean isEligible(Address address, @Nullable Route route) {
// 连接池中正在使用的不能服用
if (allocations.size() >= allocationLimit || noNewStreams) return false;
// 会去判断DNS,端口号,证书等十种 十个全部一致才可以复用
if (!Internal.instance.equalsNonHost(this.route.address(), address)) return false;
// host一样可以复用
if (address.url().host().equals(this.route().address().url().host())) {
return true;
}
...... //省略部分
}
下面是建立一个socket连接:
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
EventListener eventListener) {
......// 省略部分代码
while (true) {
try {
// 使用了http代理去代理https
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
if (rawSocket == null) {
break;
}
} else {
connectSocket(connectTimeout, readTimeout, call, eventListener);
}
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
break;
} catch (IOException e) {
...... //省略部分
}
}
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket() // 使用http代理
: new Socket(proxy); // 使用socket代理
eventListener.connectStart(call, route.socketAddress(), proxy);
rawSocket.setSoTimeout(readTimeout);
try {
// 会调用socket.connect()去建立真正的连接
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
......
}
}
CallServerInterceptor源码分析:
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
// 写请求头
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
Response.Builder responseBuilder = null;
// 不是get请求且存在请求体
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// 如果请求头包含Expect且对应的内容为100-continue
// Expect:100-continue一般出现于大容量请求体或需要验证 代表先询问服务器是否愿意接受发送的数据
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
// 先读取响应头,如果响应头返回了100则返回一个为null的responseBuilder
responseBuilder = httpCodec.readResponseHeaders(true);
}
if (responseBuilder == null) {
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
// 再把请求体写出去
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
// 正常流程
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
// 读取服务器的响应
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
......
return response;
}
网友评论