前言: OKHttp源码基于3.10.0版本进行跟踪,因为后期版本源码基于Kotlin写,不好跟踪。
implementation "com.squareup.okhttp3:okhttp:3.10.0"
流程图.png
一、OKHttp执行请求的流程
var okhttpClient = OkHttpClient.Builder().build();
var request: Request? = Request.Builder().url("www.baidu.com").build()
var call = okhttpClient.newCall(request)
call?.enqueue(object: Callback {
override fun onFailure(call: Call, e: IOException) {
TODO("Not yet implemented")
}
override fun onResponse(call: Call, response: Response) {
TODO("Not yet implemented")
}
})
用法我们都知道:
- OKHttpClient.newCall()实际上返回的RealCall对象
- 在RealCall中调用enqueue()方法中放入队列中,在放入队列时,先判断是否重复添加了该请求,如果重复就报错,不然就调用dispatcher.enqueue方法
- dispatcher.enqueue()方法继续判断,如果当前运行队列中的数值小于64,并且同时访问同一台服务器的请求小于5,就直接运行,不然就加入等待队列中。
- 当进入运行队列时,就可以直接运行,这时候调用AsyncCall.execute()方法,然后通过Response response = getResponseWithInterceptorChain()方法使用责任链模式开始执行请求响应
二、OKHttp中关键类及其作用
-
OKHttpClient:OkHttpClient.Builder().build()建造者模式创建OKHttpClient实例,其实OKHttpClient并不是一个单例,可以在App中创建多个,但是还是建议只创建一个。因为每个OKHttpClient有自己的请求池,线程池和连接池等,创建太多,对内存消耗不太友好。
-
Request: 请求的封装类,包含了url、method、header、RequestBody等属性
-
RealCall: 它继承至Call,实现了同步请求执行和异步请求入队等。
-
AsyncCall: 它是RealCall的内部类,他实现了Runnable接口,是一个异步任务。分发器Dispatcher负责管理这些异步请求,在可请求数量满足的情况下会交给线程池执行它的execute方法。
-
Dispatcher: 它是请求的分发器,内部有线程池负责执行请求,同时有运行队列和等待队列来管理请求执行、等待和取消。
-
StreamAllocation:它协调三个entities的关系
Connections: 与远程服务器端额物理Socket连接,这些连接建立起来可能很慢,所有有必要能够Cancel正在建立连接的Connection
Streams: 它是在connections这层上的Http请求和响应对。每个Connection都有自己的分配限制,它定义了自身可以承载的并发数量。HTTP1.x链接一次可以承载1个,而Http2可以多个。
Calls: a logical sequence of streams, typically an initial request and its follow up requests. We prefer to keep all streams of a single call on the same connection for better behavior and locality. -
ConnectionPool:连接池,管理RealConnection,最多可容纳5个空闲连接,这些连接如果5分钟内不活动,将会被清除。
-
RealConnection:
-
HttpCodec: 接口类,给Http请求编码,并解码response;他有两个子类Http1Codec和Http2Codec分别对应Http1.1和Http2.0
责任链拦截器:
- RetryAndFollowUpInterceptor:重试和请求重定向转发
- BridgeInterceptor:桥拦截器 -- 封装Header属性,压缩和解压缩等
- CacheInterceptor:缓存拦截器
- ConnectInterceptor:连接拦截器 socket通信,数据库连接池的概念,从连接池拿出socket,准备好
- CallServerInterceptor:跟服务器连接之后,对响应内容,并根据header content-type字段进行封装
三、OKHttp发送request流程源码跟踪
1. OkHttpClient.Builder().build()建造者模式创建OKHttpClient实例
其实OKHttpClient并不是一个单例,可以在App中创建多个,但是还是建议只创建一个。因为每个OKHttpClient有自己的请求池,线程池和连接池等,创建太多,对内存消耗不太友好。
这个没啥好说的建造者模式本质可以设置很多属性,但也可以不设置任何属性,也可以创建出一个复杂的对象来。
2. 通过Request.Builder().build()也是建造者模式构建Request实例
作为请求信息的封装,内部包含了请求url,请求方法method,请求头headers,请求体RequestBody,tag标签等
这里也比较简单,没啥好跟踪的。
3. OKhttpClient实例对象调用newCall(request)方法实例化一个Call对象
Call实际上是一个接口,实际上返回的是RealCall对象
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
4. 调用RealCall对象的enqueue方法开始请求数据
而enqueue()方法中又调用了dispatcher的enqueue()方法,而dispatcher.enqueue 又创建了个AsyncCall对象传入。
这里要注意的是它有个判断request是否被执行,如果被执行,重复请求将抛出异常。
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
5. Dispatcher中的enqueue方法
- 在Dispatcher类中有三个双端队列:
- Deque<AsyncCall> runningAsyncCalls: 运行中的异步Call队列
- Deque<AsyncCall> readyAsyncCalls: 异步等待队列
- Deque<RealCall> runningSyncCalls:同步运行队列
if中的判断条件是如果异步运行队列的size > 64或者运行队列中同一host的异步任务大于5 的话,不能加入运行队列,放入等待队列中,满足条件加入运行队列,并通过线程池执行AsyncCall。
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
- 它有一个线程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
- 0:设置了核心线程数为0
- Integer.MAX_VALUE: 线程池中最大可容纳额线程数
- 60, TimeUnit.SECONDS:当线程池的线程数量大于核心线程数,空闲线程就会等待60s才会被终止,如果小于就会立刻停止。
- new SynchronousQueue<Runnable>():线程池等待队列
- Util.threadFactory("OkHttp Dispatcher", false): 线程池需要新增线程时的生产方法
SynchronousQueue队列里要不为0,要不为1。该队列最大只能同时容纳一个任务,之所以用这个队列,相当于把任务提交个线程池,就会立即执行,不会等待。
因为Dispatcher中已经维护了等待队列和运行队列了,所以线程池使用了SynchronousQueue,不需要线程池内部再去维护等待队列了,提交线程池就能够执行。
6. 因为AsyncCall继承只NameRunnable,实际就是一个异步任务,接下来就执行AsyncCall.execute()方法了
@Override protected void execute() {
boolean signalledCallback = false;
try {
147行 Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
看到147行:Response response = getResponseWithInterceptorChain();就进入到责任链模式的拦截器执行真正的请求了。
7. 这里责任链拦截器请求的先不看,看到 client.dispatcher().finished(this);
最后Response响应回来后,又调用了dispatcher的finished()方法。
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
...
synchronized (this) {
...
if (promoteCalls) promoteCalls();
...
}
...
}
它最终调用了它的另一个finish()的重载方法,并去调用了promoteCalls(),它在这里边就会去进行队列的管理并执行下一个请求。
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
- 先判断运行队列的数量有没有大与最大值64
- 在判断等待队列是否还有任务
- 然后遍历等待队列,读取等待队列中的任务。在条件不满足数量大于64,同一host的不大于5的条件下,将等待队列中的任务提取到运行队列中,并开始执行
四、OKHttp真正发送请求时的责任链拦截器源码跟踪
4.1 上边我们跟踪到了AsyncCall的execute方法了,然后调用了getResponseWithInterceptorChain()方法返回Response
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
//1. 添加自己设置的拦截器
interceptors.addAll(client.interceptors());
//2. 添加重试重定向拦截器
interceptors.add(retryAndFollowUpInterceptor);
//3. 添加桥拦截器
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//4. 添加缓存拦截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//5. 添加连接拦截器
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) { interceptors.addAll(client.networkInterceptors());
}
//6. 添加CallServer拦截器
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
4.1 首先RealInterceptorChain实现了Interceptor.Chain,Chain链条的意思,相当于一个责任链连点。
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
这一句将我们的拦截器集合interceptors,原始请求originalRequest,index=0传入构造出一个Chain对象,并调用了chain.proceed方法
4.2 RealInterceptorChain的proceed方法会返回Response对象,在它里面实际会生成下一个Chain,并从拦截器集合interceptors中取出对应的拦截器interceptor,并把nextChain,交给interceptor.intercept(next)执行。
这实际上就是责任链将任务一层一层往下传,只处理本拦截器该干的事儿。
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
...
// Call the next interceptor in the chain.
143行 RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
...
}
4.3 这里看下RetryAndFollowUpInterceptor重试重定向拦截器的intercept()方法
@Override public Response intercept(Chain chain) throws IOException {
...
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
...
126行 response = realChain.proceed(request, streamAllocation, null, null);
...
}
这里126行,实际执行了上一个RealInterceptorChain传过来的chain的proceed()方法,然后将request等有传入进去,这里有循环进入了RealInterceptorChain的proceed方法,将request请求责任链一层层往下传递。
直到最后一个拦截器执行完真正的请求后,返回response,在一层层往上传递,并进行相应的处理。
4.4 RetryAndFollowUpInterceptor重试重定向拦截器做了啥
@Override public Response intercept(Chain chain) throws IOException {
...
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
...
117行 while (true) {
...
126行 response = realChain.proceed(request, streamAllocation, null, null);
...
158行 Request followUp = followUpRequest(response, streamAllocation.route());
...
169行 if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
}
}
-
126行 通过realChain.proceed责任链将request传递给下面的拦截器执行
-
158行 返回数据后需要,判断是否需要重试或者重定向
-
169行 当重试次数大于最大值MAX_FOLLOW_UPS(20)时,抛出异常执行失败。
followUpRequest()方法
private Request followUpRequest(Response userResponse, Route route) throws IOException {
...
// fall-through
case HTTP_MULT_CHOICE:
case HTTP_MOVED_PERM:
case HTTP_MOVED_TEMP:
case HTTP_SEE_OTHER:
// Does the client allow redirects?
if (!client.followRedirects()) return null;
305行 String location = userResponse.header("Location");
if (location == null) return null;
HttpUrl url = userResponse.request().url().resolve(location);
...
}
305行 从Response的响应头中取出location字段标识的重定向url,并拼装重定向request
它还将SSLFactory传入Address对象中
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
194行 private Address createAddress(HttpUrl url) {
SSLSocketFactory sslSocketFactory = null;
HostnameVerifier hostnameVerifier = null;
CertificatePinner certificatePinner = null;
if (url.isHttps()) {
sslSocketFactory = client.sslSocketFactory();
hostnameVerifier = client.hostnameVerifier();
certificatePinner = client.certificatePinner();
}
return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
}
- 创建StreamAllocation实例的时候,通过createAddress()方法传入了Address,而在createAddress()方法中我们看到了OKHttp在处理Https请求时的操作。
- 它会获取OKHttpClient里的sslSocketFactory,这里又有两种,一种是客户端传入的,一种是默认的;默认的就是信任所有证书。
if (builder.sslSocketFactory != null || !isTLS) {
this.sslSocketFactory = builder.sslSocketFactory;
} else {
this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
}
private SSLSocketFactory systemDefaultSslSocketFactory(X509TrustManager trustManager) {
try {
SSLContext sslContext = Platform.get().getSSLContext();
sslContext.init(null, new TrustManager[] { trustManager }, null);
return sslContext.getSocketFactory();
} catch (GeneralSecurityException e) {
throw assertionError("No System TLS", e); // The system has no TLS. Just give up.
}
}
4.5 BridgeInterceptor桥拦截器做了啥
@Override public Response intercept(Chain chain) throws IOException {
...
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}
requestBuilder.header("Accept-Encoding", "gzip");
...
97行Response networkResponse = chain.proceed(requestBuilder.build());
100行 if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
}
- 桥拦截器主要对request的header进行封装赋值,如链接属性Connection(保持长链接)、Accept-Encoding(使用gzip压缩数据)等等,
- 然后同样执行chain.proceed(requestBuilder.build())传递request给下一个拦截器。
- 100行 收到Response后,如果使用了gzip压缩,还需要对其进行解压缩
4.6 CacheInterceptor缓存拦截器做了啥
- 首先,它会去从Cache中获取Response,并通过CacheStrategy.Factory缓存策略工厂得到缓存策略,实际上就是各种判定条件,看是否有缓存,缓存是否有效,是否需要进行网络请求等,这里就不细看,跟进去看看挺清晰的。
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
- 如果没有缓存的话,就传递request给下一个拦截器执行
networkResponse = chain.proceed(networkRequest);
- 最后,将response更新或者缓存起来
cache.update(cacheResponse, response);
4.7 ConnectInterceptor连接拦截器做了啥
连接拦截器其实做的事情不是很多:
1.尝试当前连接是否可以复用。
2.尝试连接池中找可以复用的连接
3.切换路由,继续在连接中尝试找可以复用的连接
4.以上都没有则new一个新的。
5.新的连接放入连接池
6.建立连接,开始握手
- 通过前面传过来的StreamAllocation来获得一个HttpCodec和一个RealConnection实例,并传递给CallServerInterceptor拦截器
@Override public Response intercept(Chain chain) throws IOException {
。。。
StreamAllocation streamAllocation = realChain.streamAllocation();
。。。。
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
缓存策略有两种:
-
强制缓存:直接向缓存数据库请求数据,如果找到了对应的缓存数据,并且是有效的,就直接返回缓存数据。如果没有找到或失效了,则向服务器请求数据,返回数据和缓存规则,同时将数据和缓存规则保存到缓存数据库中。
-
对比缓存:是先向缓存数据库获取缓存数据的标识,然后用该标识去服务器请求该标识对应的数据是否失效,如果没有失效,服务器会返回304未失效响应,则客户端使用该标识对应的缓存。如果失效了,服务器会返回最新的数据和缓存规则,客户端使用返回的最新数据,同时将数据和缓存规则保存到缓存数据库中。
4.8 CallServerInterceptor拦截器做了啥
这里就是最后一个拦截器了,它不会调用realChain.proceed把request再往下传递了,它将真正执行http请求:
1.先写入请求Header
httpCodec.writeRequestHeaders(request);
2.如果请求头的Expect: 100-continue时,只发送请求头,执行3,不然执行4
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}
3.根据后台返回的结果判断是否继续请求流程
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
streamAllocation.noNewStreams();
}
4.写入请求体,完成请求
httpCodec.finishRequest();
5.得到响应头,构建初步响应
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}
6.构建响应体,完成最终响应
7.返回响应
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
网友评论