发起请求流程
简单用例如下:
OkHttpClient client=new OkHttpClient();
final Request request=new Request.Builder()
.url("你的网络地址")
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
步骤可以分三步:一、创建OkHttpClient,(注意:因为OkHttpClient的每个实例都有自己的任务队列、线程池的,所以建议项目里使用单例减少资源开销),二、新建请求(Builder开发模式),三、创建Call并把它放入队列中(ArrayDeque)等待处理(上面是拿异步请求为例)
builder模式创建request,套路就那样,将url、请求方式method、请求头参数headers、请求参数body等传给Request对象,这些数据在发起请求的时候用到
Request(Builder builder) {
this.url = builder.url;
this.method = builder.method;
this.headers = builder.headers.build();
this.body = builder.body;
this.tag = builder.tag != null ? builder.tag : this;
}
client.newCall(request)将生成一个RealCall实例
@Override public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.eventListener = client.eventListenerFactory().create(call);
return call;
}
RealCall的构造函数如下
private RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
this.client = client;
this.originalRequest = originalRequest;
this.forWebSocket = forWebSocket;
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
它继承Call接口,持有了okHttpClient、request,接着调用RealCall的enqueue(Callback responseCallback)
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
最重要的一行就是client.dispatcher().enqueue(new AsyncCall(responseCallback)),它新建的一个AsyncCall对象,然后传给了dispatcher。我们先分析下AsyncCall,它是RealCall的内部类,它可以直接使用RealCall中的originalRequest、client等变量,继承抽象类NamedRunnable,而NamedRunnable实现了Runnable接口且在run()中执行了抽象方法execute(),AsyncCall实现抽象方法execute(),也就是说execute()就是run()方法
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
对AsyncCall先分析到这里,接着看下dispatcher对AsyncCall到底做了什么?
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
maxRequests=64,maxRequestPerHost=5,这个判断条件是正在运行的call小于最大值64且相同主机的call小于最大值的话就将call加入正在运行队列runningAsyncCalls中并马上执行call,否则加到准备队列readyAsyncCalls.add中等待被执行。executorService().execute(call);这里使用了线程池来调用,前面说到AsyncCall实现的Runnable接口所以线程池可以执行它,这样就把AsyncCall的execute()方法放到了子线程中执行。现在我们在回到execute()中分析下它到底是做了什么?它做了两件事,一、调用getResponseWithInterceptorChain()获取网络数据response,并通过responseCallback回调;二、调用dispatcher.finish(this)
先分析RealCall中的getResponseWithInterceptorChain()
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
生成了一个Interceptor数组interceptors,然后用它创建一个拦截器链RealInterceptorChain,接着调用chain.proceed(originalRequest),让每个拦截器依次处理originalRequest,proceed()做了什么?
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
RealConnection connection) throws IOException {
//省略....
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
省略.....
return response;
}
重要的就上面几行,index指示要拿interceptors中的第几个,这里重新创建了一个RealInterceptorChain拦截器链next(它的拦截器指示index加1了,所以next拦截链会执行拦截器数组的下一个),然后取出第index个拦截器interceptor,然后调用它的intercept(next),一开始index的值是0,所以先执行interceptors中的第一个拦截器retryAndFollowUpInterceptor
拦截器retryAndFollowUpInterceptor
它主要是负责网络请求的重定向,分析请看代码注释
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
//后面会用这个获取连接
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
//重定向的次数,记录它不能让它大于最大值MAX_FOLLOW_UPS
int followUpCount = 0;
Response priorResponse = null;//
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {
//执行剩下的拦截器获取数据
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
} finally {
// We're throwing an unchecked exception. Release any resources.
if (releaseConnection) {
//当realChain.proceed(request, streamAllocation, null, null);异常才会释放streamAllocation
streamAllocation.streamFailed(null);
streamAllocation.release();
}
}
// Attach the prior response if it exists. Such responses never have a body.
//将前一次的response传给这一次的response,前一次的response的body置空,因为它只是重定向的东西没用了
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
//followUpRequest()作用是通过response的code判断是否需要重定向或超时重试请求
Request followUp = followUpRequest(response, streamAllocation.route());
//followUp为空的时候一切正常不需重定向,所以直接将response返回,否则往下处理重定向
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
closeQuietly(response.body());
//限制重定向的次数
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
if (!sameConnection(response, followUp.url())) {
//判断不是同一个连接需重新创建一个streamAllocation,因为url变了
streamAllocation.release();
streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(followUp.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
} else if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
//将重定向的follwUp赋值给request,然后进入下次循环,这样realChain.proceed(request, streamAllocation, null, null);跑的就是重定向的请求了
request = followUp;
priorResponse = response;
}
}
拦截器BridgeInterceptor
这个拦截器主要的作用就是添加一些header信息以及对数据的解压,分析请看代码注释
@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
//省略....添加header
//处理下一个拦截器
Response networkResponse = chain.proceed(requestBuilder.build());
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
//如果header设置了Content-Encoding为gzip就结合okio进行解压缩
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
String contentType = networkResponse.header("Content-Type");
responseBuilder.body(new RealResponseBody(contentType, -1L, Okio.buffer(responseBody)));
}
return responseBuilder.build();
}
CacheInterceptor拦截器
分析如下
@Override public Response intercept(Chain chain) throws IOException {
//1. 读取候选缓存
Response cacheCandidate = cache != null
? cache.get(chain.request())
: null;
long now = System.currentTimeMillis();
//2. 创建缓存策略(强制缓存,对比缓存等策略);
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy);
}
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
//根据策略,不使用网络,缓存又没有直接报错;
// If we're forbidden from using the network and the cache is insufficient, fail.
if (networkRequest == null && cacheResponse == null) {
return new Response.Builder()
.request(chain.request())
.protocol(Protocol.HTTP_1_1)
.code(504)
.message("Unsatisfiable Request (only-if-cached)")
.body(Util.EMPTY_RESPONSE)
.sentRequestAtMillis(-1L)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
}
// 4. 根据策略,不使用网络,有缓存就直接返回;
// If we don't need the network, we're done.
if (networkRequest == null) {
return cacheResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.build();
}
Response networkResponse = null;
try {
// 5. 前面个都没有返回,读取网络结果(跑下一个拦截器);
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
//6. 接收到的网络结果,如果是code 304证明数据没有改变, 使用缓存,返回缓存结果并且更新缓存(对比缓存)
// If we have a cache response too, then we're doing a conditional get.
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
//7. 读取网络结果;
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
//8. 对数据进行缓存;
if (HttpHeaders.hasBody(response)) {
CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
response = cacheWritingResponse(cacheRequest, response);
}
//9. 返回网络读取的结果。
return response;
}
缓存策略有两种,强制缓存和对比缓存,强制缓存就是有缓存就直接返回缓存不请求网络数据;对比缓存就是即使有缓存也要发起网络请求然后对比缓存进行缓存。
第一部读取候选缓存,在本例中cache肯定是null的,在realCall中创建缓存拦截器new CacheInterceptor(client.internalCache())传入的InternalCache参数来自client
InternalCache internalCache() {
return cache != null ? cache.internalCache : internalCache;
}
这个方法返回的是InternalCache接口(依赖倒置,依赖接口不依赖具体实现),我们寻找cache的赋值操作,发现它只在OkHttpClient的Builder中有赋值,但是本例中我们是通过new OkHttpClient()来获取client的,它使用了一个无参构造函数生成的Builder,这个builder并没有设置cache这个参数,所以本例cache是null的,同样的方式分析internalCache也是null的。这里我们得出了结论是倘若你想让OkHttpClient支持缓存那么你得调用它的builder如下两个方法
/** Sets the response cache to be used to read and write cached responses. */
void setInternalCache(@Nullable InternalCache internalCache) {
this.internalCache = internalCache;
this.cache = null;
}
/** Sets the response cache to be used to read and write cached responses. */
public Builder cache(@Nullable Cache cache) {
this.cache = cache;
this.internalCache = null;
return this;
}
我们可以实现InternalCache接口,然后调用setInternalCache()设置就能实现自己的缓存了。OkHttp中给提供了一个缓存方式,就是Cache类,想开启缓存直接向builder传个Cache实例进去就可以了,那么这个Cache是如何实现缓存的呢?从前面的internalCache()方法我们知道当cache不空的时候会返回cache的内部变量internalCache,这个internalCache就是InternalCache的简单实现,初略一看你就会发现,这个internalCache都是转为调用Cache中对应的方法的,等价于Cache类直接实现接口InternalCache接口。Cache的构造函数如下:
Cache(File directory, long maxSize, FileSystem fileSystem) {
this.cache = DiskLruCache.create(fileSystem, directory, VERSION, ENTRY_COUNT, maxSize);
}
重点就是这个DiskLruCache了,它是开源的一个工具类,OkHttp直接拷来用了,是一个最近最少使用的磁盘缓存,它是支持Okio的,缓存的读写也是基于okio的,okio简化io操作且提高性能,不用原生的outStream、inStream(太多派生类了乱)之类的了.
ConnectInterceptor拦截器
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
从拦截器的名字我们就可以知道它是负责发起连接的,代码只有几行,但是这里是复杂的。streamAllocation是在retryAndFollowUpInterceptor里创建的,streamAllocation.connection()里只是返回的RealConnection实例,真正发起连接的是streamAllocation.newStream(client, chain, doExtensiveHealthChecks),
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
这个方法负责两件事,一、寻找一个可用的RealConnection;二、创建HttpCodec,用于http的编解码,它有两种实现Http2Codec和Http1Codec。
/**
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
//判断candidate.successCount==0说明它是新创建的连接,可以直接返回,不需后面的检查了
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
//检查这个连接是否可用,不可的话需要使用onNewStreams()清理
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
这里一个死循环调用findConnection寻找RealConnection,直到找到为止。
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
// 同步线程池,来获取里面的连接
synchronized (connectionPool) {
// 做些判断,是否已经释放,是否编解码类为空,是否用户已经取消
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
// Attempt to use an already-allocated connection. We need to be careful here because our
// already-allocated connection may have been restricted from creating new streams.
// 尝试用一下现在的连接,判断一下,是否有可用的连接
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// We had an already-allocated connection and it's good.
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}
if (result == null) {
// Attempt to get a connection from the pool.
// 尝试在连接池中获取一个连接,get方法中会直接调用,注意最后一个参数为空
// 里面是一个for循环,在连接池里面,寻找合格的连接
// 而合格的连接会通过,StreamAllocation中的acquire方法,更新connection的值。
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result;
}
// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
// 寻找一个可用的线路选择器
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
//根据路由从连接池中获取可用连接
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
//实在从连接池中拿不到就创建一个
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
//真正连接
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
//将连接放入连接池
// Pool the connection.
Internal.instance.put(connectionPool, result);
// 多路复用的判断,这个是http2才有的
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
大体流程:
1、尝试使用StreamAllocation当前的连接,如果可用直接返回
2、从连接池中寻找可用的连接,找到返回
3、根据路由选择器的路由信息取连接池的查找,找到返回
4、实在找不到就创建一个RealConnection,并开启连接(socket与okio配合使用)
5、将新创建的连接加入连接池,连接池去重(多路复用同一个地址只需一个连接),返回连接result
关于连接池的分析感谢这位同学的分享(https://www.jianshu.com/p/a2fcf1dad6b5)
CallServerInterceptor
最后一个拦截器,这个主要是负责跟服务器数据交换,结合okio与socket来实现
我有点敲不下去了--》》感谢这位同学的总结(https://www.jianshu.com/p/1c4978046bd5)
总结
对OkHttp的的分析就是对每个拦截器的分析,通过源码的分析知道OkHttp为什么这么多人使用的,最重要的一个原因就是多路复用,减少了Tcp创建的次数。如果你熟悉glide的话,我们可以将glide的网络请求换成用OkHttp,glide中默认是用urlConnect的方式来下载图片的,这样每下一张图就需要创建新的连接,如果改用okHttp的话性能会提升不少。(哪里写错了的,多多指教)
网友评论