1.1 简单对比
image.png1.2 简单使用
// new方式实例化
OkHttpClient okHttpClient = new OkHttpClient();
// 构建者模式实例化
okHttpClient = new OkHttpClient.Builder().build();
// 实例化Request对象
Request request = new Request.Builder().url("...").tag("测试").build();
// 实例化Call对象
Call call = okHttpClient.newCall(request);
try {
// 同步请求,阻塞当前线程并等待结果返回
Response response = call.execute();
// 异步请求,在子线程执行与回调
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
// 可关闭请求
call.cancel();
} catch (IOException e) {
e.printStackTrace();
}
推荐让 OkHttpClient
保持单例,用同一个 OkHttpClient
实例来执行你的所有请求,因为每一个 OkHttpClient
实例都拥有自己的连接池和线程池,重用这些资源可以减少延时和节省资源,如果为每个请求创建一个 OkHttpClient
实例,显然就是一种资源的浪费。当然,也可以使用如下的方式来创建一个新的 OkHttpClient
实例,它们共享连接池、线程池和配置信息。
OkHttpClient eagerClient = client.newBuilder()
.readTimeout(500, TimeUnit.MILLISECONDS)
.build();
Response response = eagerClient.newCall(request).execute();
3.1 RealCall:请求执行者
3.1.1 同步请求
@Override
public Response execute() throws IOException {
synchronized (this) {
// 说明一次请求只能执行一次
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
// dispatcher调度器默认已配置好,将RealCall加到队列 Deque<RealCall> runningSyncCalls
client.dispatcher().executed(this);
// 不管同步还是异步都交由拦截器链处理
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
//为了降低负荷,最后会将请求移除
client.dispatcher().finished(this);
}
}
3.1.2 异步请求
//异步请求
@Override
public void enqueue(Callback responseCallback) {
// 使用锁
synchronized (this) {
// 说明一次请求只能执行一次
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
//这里会涉及线程池
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
异步请求封装的AsyncCALL
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
//不管同步还是异步都交由拦截器链处理
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
从以上代码可以看到,不管是同步请求还是异步请求,都涉及了拦截器链的处理,最终返回Response。
3.1.3 拦截链
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
// 配置 OkHttpClient 时设置的 interceptors 也就是我们的自定义拦截器
interceptors.addAll(client.interceptors());
// 负责失败重试以及重定向
interceptors.add(retryAndFollowUpInterceptor);
// 桥拦截器,添加一些Header和移除一些header,例如body的contentLength是-1,则移除Content-Length这个header
interceptors.add(new BridgeInterceptor(client.cookieJar()));
// 负责读取缓存直接返回、更新缓存
interceptors.add(new CacheInterceptor(client.internalCache()));
// 负责和服务器建立连接
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
// 配置 OkHttpClient 时设置的 networkInterceptors
interceptors.addAll(client.networkInterceptors());
}
// 负责向服务器发送请求数据、从服务器读取响应数据
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
// 使用责任链模式开启链式调用
return chain.proceed(originalRequest);
}
/**
* A concrete interceptor chain that carries the entire interceptor chain: all application
* interceptors, the OkHttp core, all network interceptors, and finally the network caller.
*/
public final class RealInterceptorChain implements Interceptor.Chain {
private final List<Interceptor> interceptors;
private final StreamAllocation streamAllocation;
private final HttpCodec httpCodec;
private final RealConnection connection;
private final int index;
private final Request request;
private final Call call;
private final EventListener eventListener;
private final int connectTimeout;
private final int readTimeout;
private final int writeTimeout;
private int calls;
public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
this.interceptors = interceptors;
this.connection = connection;
this.streamAllocation = streamAllocation;
this.httpCodec = httpCodec;
this.index = index;
this.request = request;
this.call = call;
this.eventListener = eventListener;
this.connectTimeout = connectTimeout;
this.readTimeout = readTimeout;
this.writeTimeout = writeTimeout;
}
}
3.1.4 拦截器
3.1.4.1 RetryAndFollowUpInterceptor(重试重定向拦截器)
/**
* This interceptor recovers from failures and follows redirects as necessary. It may throw an
* {@link IOException} if the call was canceled.
*/
public final class RetryAndFollowUpInterceptor implements Interceptor {
/**
* How many redirects and auth challenges should we attempt? Chrome follows 21 redirects; Firefox,
* curl, and wget follow 20; Safari follows 16; and HTTP/1.0 recommends 5.
*/
private static final int MAX_FOLLOW_UPS = 20;
@Override
public Response intercept(Chain chain) throws IOException {
while (true) {
...
//交给下一个拦截器处理并获得最终的response
response = realChain.proceed(request, streamAllocation, null, null);
...
// 从返回的response中判断响应中是否有失败或者重定向(Location)标志
// 失败的话返回response.request,重定向的话构造新的Request返回
Request followUp = followUpRequest(response, streamAllocation.route());
//如果没有失败或重定向,返回response
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
// 检测重试次数
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
...
// 用新重定向的request继续while循环走拦截链
request = followUp;
priorResponse = response;
}
}
}
3.1.2.2 BridgeInterceptor(桥拦截器):header配置
/**
* Bridges from application code to network code. First it builds a network request from a user
* request. Then it proceeds to call the network. Finally it builds a user response from the network
* response.
*/
public final class BridgeInterceptor implements Interceptor {
public Response intercept(Chain chain) throws IOException {
...
RequestBody body = userRequest.body();
// contentType不为空就加Content-Type这个header
if (body != null) {
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
long contentLength = body.contentLength();
//contentLength不是-1就加Content-Length这个header
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
//移除Content-Length 这个header
requestBuilder.removeHeader("Content-Length");
}
}
// 后面还有响应的header
...
// 交由下一个拦截器处理
Response networkResponse = chain.proceed(requestBuilder.build());
}
}
3.1.2.3 CacheInterceptor(缓存拦截器):缓存复用
/** Serves requests from the cache and writes responses to the cache. */
public final class CacheInterceptor implements Interceptor {
public Response intercept(Chain chain) throws IOException {
//缓存策略
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
// If we're forbidden from using the network and the cache is insufficient, fail.
// 如果被禁止试用网络,同时缓存为空,则构造一个失败的Response返回
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// If we don't need the network, we're done.
// 如果不需要走网络,直接返回缓存数据
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
//通过继续拦截链走网络
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
}
// 网络返回之后的策略
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
// 缓存不为空而且网络返回304的情况,会使用缓存,
// 304意味着服务端已经执行了GET,但文件未变化。
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
// 更新缓存标识
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
// 返回缓存
return response;
}else {
closeQuietly(cacheResponse.body());
}
}
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
// 更新本地缓存
if (cache != null) {
if (HttpHeaders.hasBody(response) && CacheStrategy.isCacheable(response, networkRequest)) {
// Offer this request to the cache.
CacheRequest cacheRequest = cache.put(response);
return cacheWritingResponse(cacheRequest, response);
}
if (HttpMethod.invalidatesCache(networkRequest.method())) {
try {
cache.remove(networkRequest);
} catch (IOException ignored) {
// The cache cannot be written.
}
}
}
// 返回请求结果
return response;
}
}
3.1.2.4 ConnectInterceptor(连接池拦截器):连接复用
/** Opens a connection to the target server and proceeds to the next interceptor. */
public final class ConnectInterceptor implements Interceptor {
public final OkHttpClient client;
public ConnectInterceptor(OkHttpClient client) {
this.client = client;
}
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
// 获取HttpCodec,会从连接池获取连接
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
// 交由下一个拦截器处理
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
}
public final class StreamAllocation {
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
// 查找一个可用的连接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
// 找到为止
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
}
ConnectInterceptor主要是从连接池去取连接,http请求要先3次握手才能建立连接,而复用连接可以免去握手的时间。
3.1.2.5 CallServerInterceptor(呼叫服务器拦截器):数据交互
/** This is the last interceptor in the chain. It makes a network call to the server. */
public final class CallServerInterceptor implements Interceptor {
public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
HttpCodec httpCodec = realChain.httpStream();
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request();
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
...
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
// 最终获取response方法 httpCodec.openResponseBody(response)
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
...
//拦截链已经到底部,直接返回 response,将response回传给上一个拦截器
return response;
}
}
}
最终获取response方法 httpCodec.openResponseBody(response)。httpCodec的实现有两个,Http1Codec 和 Http2Codec。Http1Codec,Http2Codec原理一样,内部就是通过Okio.buffer去请求网络。
public final class Http1Codec implements HttpCodec {
@Override
public ResponseBody openResponseBody(Response response) throws IOException {
streamAllocation.eventListener.responseBodyStart(streamAllocation.call);
String contentType = response.header("Content-Type");
if (!HttpHeaders.hasBody(response)) {
Source source = newFixedLengthSource(0);
return new RealResponseBody(contentType, 0, Okio.buffer(source));
}
if ("chunked".equalsIgnoreCase(response.header("Transfer-Encoding"))) {
Source source = newChunkedSource(response.request().url());
return new RealResponseBody(contentType, -1L, Okio.buffer(source));
}
long contentLength = HttpHeaders.contentLength(response);
if (contentLength != -1) {
Source source = newFixedLengthSource(contentLength);
return new RealResponseBody(contentType, contentLength, Okio.buffer(source));
}
return new RealResponseBody(contentType, -1L, Okio.buffer(newUnknownLengthSource()));
}
}
public final class Http2Codec implements HttpCodec {
@Override
public ResponseBody openResponseBody(Response response) throws IOException {
streamAllocation.eventListener.responseBodyStart(streamAllocation.call);
String contentType = response.header("Content-Type");
long contentLength = HttpHeaders.contentLength(response);
Source source = new StreamFinishingSource(stream.getSource());
return new RealResponseBody(contentType, contentLength, Okio.buffer(source));
}
}
3.2 Dispatcher:请求调度器
public final class Dispatcher {
// 最大请求数
private int maxRequests = 64;
// 每个主机最大请求数
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
//缓存等待队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
//正在请求的异步请求队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
//正在请求的同步请求队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
synchronized void enqueue(AsyncCall call) {
//当运行的队列中的数值小于64, 并且同时访问同一个机器目标HOST请求数小于5
//满足条件则加入正在请求的异步请求队列
//这里也就起到了对线程池的维护作用,不会导致无限制线程的创建
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);//线程池执行操作
} else {
//否则加入等待请求的异步请求队列
readyAsyncCalls.add(call);
}
}
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
// RealCall请求返回后无论成功还是失败,将会调用该方法,达到管理请求池的目的
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
// 移除失败,抛出错误
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
// 异步任务的移除
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
//从这里会发现最终还是通过调度器来判断当前情况进而达到维护线程池、维护请求的作用
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
public synchronized ExecutorService executorService() {
//SynchronousQueue是一个有一个入队列就有一个必须出队列,而后立刻执行,非常适合于高频请求
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
链接:https://juejin.cn/post/7064596735795920904
作者:复制粘贴改改改
网友评论