- 分发器
- 线程池
- 拦截器
- 五大拦截器
分发器
Dispatcher类,这个类的作用用于分发提交的网络任务,高并发任务分发和线程池排队;
Dispatcher的工作流程:
首先介绍Dispatcher类的一些核心成员
- ready队列和running队列:
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
- 最大并发数和单位域名最大并发:默认是64和5,可以自定义
private int maxRequests = 64;
private int maxRequestsPerHost = 5;

dispatcher.enqueue()
synchronized void enqueue(AsyncCall call) {
// 比较
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
promoteCalls()
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
线程池
OKHttp内部的线程池:
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
- 核心线程数:0;无核心线程数,不缓存线程
- 阻塞队列:SynchronousQueue;SynchronousQueue的特点是没有容量
当提交了一些任务后,阻塞队列不能缓存任务,会直接新建线程去执行,从而完成了无等待,高并发的特点
拦截器
拦截器是OKHttp最核心的部分,当一个网络任务由分发器提交给线程池后,会交给拦截器处理;

与拦截器相关的核心API:
Chain接口和Intercept接口
public interface Interceptor {
Response intercept(Chain chain) throws IOException;
interface Chain {
Request request();
Response proceed(Request request) throws IOException;
/**
* Returns the connection the request will be executed on. This is only available in the chains
* of network interceptors; for application interceptors this is always null.
*/
@Nullable Connection connection();
Call call();
int connectTimeoutMillis();
Chain withConnectTimeout(int timeout, TimeUnit unit);
int readTimeoutMillis();
Chain withReadTimeout(int timeout, TimeUnit unit);
int writeTimeoutMillis();
Chain withWriteTimeout(int timeout, TimeUnit unit);
}
}
intercept接口是拦截器实现的接口,自带五个默认的拦截器都实现了这个接口,chain接口是责任链模式的接口,拦截器的调用是使用责任链模式(单一责任原则)调用的
chain接口的实现类:RealInterceptorChain
private final List<Interceptor> interceptors;
private final int index;
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
....
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
....
return response;
}
RealInterceptorChain(一下简称realChain)是拦截器责任链模式的主要实现类,他有两个重要的成员变量 interceptors & index
集合用来存放所有需要执行的拦截器,index作为下标从0开始一个个的遍历集合;
getResponseWithInterceptorChain():
// 获取网络请求的Response
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors. 添加拦截器到list中
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
// 通过list创建RealInterceptChain
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// 调用RealChain的proceed方法
return chain.proceed(originalRequest);
}
我们从线程池的execute()开始分析拦截器的工作流程,首先调用getResponseWithInterceptorChain()添加自定义的拦截器和五个OKHTTP自带的拦截器,然后创建一个index为0的realChain,调用realChain(index=0)的proceed();
在proceed方法中获取当前index下标所指的intercept,创建一个新的realChain(index+1=2),执行拦截器的intercept方法,并传入新的realChain;
RetryAndFollowUpInterceptor.intercept(realChain(index = 1))
@Override public Response intercept(Chain chain) throws IOException {
...
try {
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
}
...
}
在intercept中调用了责任链的proceed(),此时的index+1了,如此往返将责任链遍历下去,执行每一个拦截器;
五大拦截器
RetryAndFollowUpInterceptor:重试和重定向
- retry重试
- followUp重定向
当请求发生异常时,RetryAndFollowUpInterceptor会根据异常的条件判断是否需要重试;

RetryAndFollowUpInterceptor.intercept
@Override public Response intercept(Chain chain) throws IOException {
...
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) { // 如果是路由异常/io异常就通过recover()判断是否需要重试
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) { // 如果是路由异常/io异常就通过recover()判断是否需要重试
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
}
...
}
在while循环中catch RouteException & IOException 用recover()判断这个异常是否需要重试,如果需要就continue,不需要则抛出异常结束循环;
RetryAndFollowUpInterceptor.recover()
private boolean recover(IOException e, StreamAllocation streamAllocation,
boolean requestSendStarted, Request userRequest) {
streamAllocation.streamFailed(e);
// The application layer has forbidden retries. client配置是否允许重试()默认允许
if (!client.retryOnConnectionFailure()) return false;
// We can't send the request body again. // http2.0才可能不通过
if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;
// This exception is fatal. 判断是否是重大异常
if (!isRecoverable(e, requestSendStarted)) return false;
// No more routes to attempt. 当前域名只有一个ip
if (!streamAllocation.hasMoreRoutes()) return false;
// For failure recovery, use the same route selector with a new connection.
return true;
}
重大异常的判断
private boolean isRecoverable(IOException e, boolean requestSendStarted) {
// If there was a protocol problem, don't recover.
if (e instanceof ProtocolException) { // 协议异常 ,不重试
return false;
}
// If there was an interruption don't recover, but if there was a timeout connecting to a route
// we should try the next route (if there is one).
if (e instanceof InterruptedIOException) {
return e instanceof SocketTimeoutException && !requestSendStarted; // 是Socket超时异常直接返回true重试,不是继续往下判断
}
// Look for known client-side or negotiation errors that are unlikely to be fixed by trying
// again with a different route.
if (e instanceof SSLHandshakeException) {
// If the problem was a CertificateException from the X509TrustManager,
// do not retry.
if (e.getCause() instanceof CertificateException) { // 证书异常,不重试
return false;
}
}
if (e instanceof SSLPeerUnverifiedException) {
// e.g. a certificate pinning error.
return false;
}
// An example of one we might want to retry with a different route is a problem connecting to a
// proxy and would manifest as a standard IOException. Unless it is one we know we should not
// retry, we return true and try a new route.
return true;
}
总的来说,如果是服务端的问题(证书异常,协议异常)就不会重试,因为再重试也是一样结果,而如果是网络波动超时异常,就会触发拦截器重试(需要满足其他的要求);
当重试环节结束后(即while循环跳出后),会进入重定向阶段followUp,非重点,了解就好
重定向:
客户端向服务器发送一个请求,服务端返回一个url给客户端,客户端访问这个新的url,叫做重定向,这个url放在响应头的Location中;
while (true) {
...
try {
// 循环执行新的重定向
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
}
// 通过响应码判定是否有重定向
Request followUp = followUpRequest(response, streamAllocation.route());
// 如果没有直接返回response
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
// 重定向次数限制
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
// 重定向
request = followUp;
priorResponse = response;
...
}
BridgeInterceptor:桥接
最简单的拦截器,主要的功能:
- 补充请求头
- 设置和读取Cookie
- 设置gzip压缩和解压(Accept-Encoding/Content-Encoding)
CacheInterceptor:缓存
缓存拦截器内部有一个缓存策略类,他有两个重要的成员变量:
/** The request to send on the network, or null if this call doesn't use the network. */
public final @Nullable Request networkRequest;
/** The cached response to return or validate; or null if this call doesn't use a cache. */
public final @Nullable Response cacheResponse;
networkRequest用来表示这个请求的网络访问请求细节(Request),cacheResponse用来表示这个请求的缓存(如果没有为null);
1 cache == null && request == null
缓存为空,请求为空,fail,直接返回一个504空的response
2 cache == null && request != null
缓存为空,请求不空,访问服务器
3 cache != null && request == null
缓存不为空,请求为空,使用缓存
4 cache != null && request != null
缓存不为空,请求不为空,访问服务器,对比缓存(响应码304)
@Override public Response intercept(Chain chain) throws IOException {
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
// 通过 CacheStrategy 获取 cache 和 request
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// 第一种情况,缓存为空,请求为空
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// request为空,直接使用缓存, (缓存为空则return null)
// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
// request不为空, 使用责任链发起请求,获取networkResponse
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
// 第四种情况,request 不为空, 缓存也不为空, 对比缓存
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
return response;
}
缓存策略的实现:
我们先看一下缓存拦截器的缓存策略是如何使用;
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
使用了内部类的get方法获取CacheStrategy 对象;
在缓存策略的内部有一个Factory内部类:在构造Factory时Factory的成员变量用来存储response的一些基本信息备用Date Expires Last-Modified ETag Age
public static class Factory {
final long nowMillis;
final Request request;
final Response cacheResponse;
/** The server's time when the cached response was served, if known. */
private Date servedDate;
private String servedDateString;
/** The last modified date of the cached response, if known. */
private Date lastModified;
private String lastModifiedString;
/**
* The expiration date of the cached response, if known. If both this field and the max age are
* set, the max age is preferred.
*/
private Date expires;
/**
* Extension header set by OkHttp specifying the timestamp when the cached HTTP request was
* first initiated.
*/
private long sentRequestMillis;
/**
* Extension header set by OkHttp specifying the timestamp when the cached HTTP response was
* first received.
*/
private long receivedResponseMillis;
/** Etag of the cached response. */
private String etag;
/** Age of the cached response. */
private int ageSeconds = -1;
public Factory(long nowMillis, Request request, Response cacheResponse) {
this.nowMillis = nowMillis;
this.request = request;
this.cacheResponse = cacheResponse;
if (cacheResponse != null) {
this.sentRequestMillis = cacheResponse.sentRequestAtMillis();
this.receivedResponseMillis = cacheResponse.receivedResponseAtMillis();
Headers headers = cacheResponse.headers();
for (int i = 0, size = headers.size(); i < size; i++) {
String fieldName = headers.name(i);
String value = headers.value(i);
if ("Date".equalsIgnoreCase(fieldName)) {
servedDate = HttpDate.parse(value);
servedDateString = value;
} else if ("Expires".equalsIgnoreCase(fieldName)) {
expires = HttpDate.parse(value);
} else if ("Last-Modified".equalsIgnoreCase(fieldName)) {
lastModified = HttpDate.parse(value);
lastModifiedString = value;
} else if ("ETag".equalsIgnoreCase(fieldName)) {
etag = value;
} else if ("Age".equalsIgnoreCase(fieldName)) {
ageSeconds = HttpHeaders.parseSeconds(value, -1);
}
}
}
}
}
get():
public CacheStrategy get() {
CacheStrategy candidate = getCandidate();
if (candidate.networkRequest != null && request.cacheControl().onlyIfCached()) {
// We're forbidden from using the network and the cache is insufficient.
return new CacheStrategy(null, null);
}
return candidate;
}
getCandidate():
private CacheStrategy getCandidate() {
// No cached response.
if (cacheResponse == null) {
return new CacheStrategy(request, null);
}
// Drop the cached response if it's missing a required handshake.
if (request.isHttps() && cacheResponse.handshake() == null) {
return new CacheStrategy(request, null);
}
// If this response shouldn't have been stored, it should never be used
// as a response source. This check should be redundant as long as the
// persistence store is well-behaved and the rules are constant.
if (!isCacheable(cacheResponse, request)) {
return new CacheStrategy(request, null);
}
CacheControl requestCaching = request.cacheControl();
if (requestCaching.noCache() || hasConditions(request)) {
return new CacheStrategy(request, null);
}
CacheControl responseCaching = cacheResponse.cacheControl();
if (responseCaching.immutable()) {
return new CacheStrategy(null, cacheResponse);
}
long ageMillis = cacheResponseAge();
long freshMillis = computeFreshnessLifetime();
if (requestCaching.maxAgeSeconds() != -1) {
freshMillis = Math.min(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds()));
}
long minFreshMillis = 0;
if (requestCaching.minFreshSeconds() != -1) {
minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds());
}
long maxStaleMillis = 0;
if (!responseCaching.mustRevalidate() && requestCaching.maxStaleSeconds() != -1) {
maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds());
}
if (!responseCaching.noCache() && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) {
Response.Builder builder = cacheResponse.newBuilder();
if (ageMillis + minFreshMillis >= freshMillis) {
builder.addHeader("Warning", "110 HttpURLConnection \"Response is stale\"");
}
long oneDayMillis = 24 * 60 * 60 * 1000L;
if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) {
builder.addHeader("Warning", "113 HttpURLConnection \"Heuristic expiration\"");
}
return new CacheStrategy(null, builder.build());
}
// Find a condition to add to the request. If the condition is satisfied, the response body
// will not be transmitted.
String conditionName;
String conditionValue;
if (etag != null) {
conditionName = "If-None-Match";
conditionValue = etag;
} else if (lastModified != null) {
conditionName = "If-Modified-Since";
conditionValue = lastModifiedString;
} else if (servedDate != null) {
conditionName = "If-Modified-Since";
conditionValue = servedDateString;
} else {
return new CacheStrategy(request, null); // No condition! Make a regular request.
}
Headers.Builder conditionalRequestHeaders = request.headers().newBuilder();
Internal.instance.addLenient(conditionalRequestHeaders, conditionName, conditionValue);
Request conditionalRequest = request.newBuilder()
.headers(conditionalRequestHeaders.build())
.build();
return new CacheStrategy(conditionalRequest, cacheResponse);
}
getCandidate()是获取CacheStrategy 类的核心代码:

ConnectInterceptor 连接
连接拦截器的作用是建立Socket连接和Socket连接池的复用;
相关类 | 作用 |
---|---|
StreamAllocation | 协调请求、连接与数据流三者之间的关系,它负责为一次请求寻找连接,然后获得流来实现网络通信,简单来说就是维护连接,在重定向拦截器创建,连接拦截器使用 |
RealConnection | 封装了Socket与一个Socket连接池,建立与服务器的Socket连接 |
ConnectionPool | Socket连接池,复用Socket连接,关闭一定时间内空闲的连接 |
Socket连接的建立:

- 无代理:直接向Socket指定目标服务器
- Socket代理:向Socket传入Proxy对象,Proxy指定代理服务器,Socket连接目标服务器
- HTTP代理:Socket连接代理服务器,请求报文请求行的第二个参数加上
目标服务器域名前置
- HTTPS代理:Socket连接代理服务器,请求报文需要先请求
CONNECT
,服务器返回connect成功,再包装一层SSLSocket与服务器通信

连接拦截器类并没有多少代码,大部分的逻辑都被封装在了RealConnection和StreamAllocation中,从connect()开始,如果是HTTPS请求,会创建隧道代理,隧道代理的功能代理服务器可以发出身份质疑,关闭连接,保证HTTPS的安全性

CallServerInterceptor 请求服务
将request拼成请求报文,将服务器的响应报文转化成response
一般出现于上传大容量请求体或者需要验证。代表了先询问服务器是否原因接收发送请求体数据,
OkHttp的做法:
如果服务器允许则返回100,客户端继续发送请求体;
如果服务器不允许则直接返回给用户。
同时服务器也可能会忽略此请求头,一直无法读取应答,此时抛出超时异常。

网友评论