OkHttp是一款非常常用的网络框架,本文试图对其源码进行解析,为方便大家阅读,先列目录如下:
1.基本用法
2.Dispatcher解析
3.拦截器执行流程
4.RetryAndFollowUpInterceptor、BridgeInterceptor和CacheInterceptor
5.ConnectInterceptor
6.CallServerInterceptor
首先构造RealCall对象,若是异步请求则由Dispatcher分发,同步则直接进行请求,最后依次执行五个拦截器的intercept
方法完成请求。
1. 基本用法首先是创建OkHttpClient对象。
OkHttpClient client = new OkHttpClient.Builder().readTimeout(5, TimeUnit.SECONDS).build();
这里采用建造者模式,我们可以很方便的设置各个参数。下面我详细看下OkHttpClient.Builder()
这个内部类的构造方法里有哪些参数。
public Builder() { dispatcher = new Dispatcher(); protocols = DEFAULT_PROTOCOLS; connectionSpecs = DEFAULT_CONNECTION_SPECS; eventListenerFactory = EventListener.factory(EventListener.NONE); proxySelector = ProxySelector.getDefault(); cookieJar = CookieJar.NO_COOKIES; socketFactory = SocketFactory.getDefault(); hostnameVerifier = OkHostnameVerifier.INSTANCE; certificatePinner = CertificatePinner.DEFAULT; proxyAuthenticator = Authenticator.NONE; authenticator = Authenticator.NONE; connectionPool = new ConnectionPool(); dns = Dns.SYSTEM; followSslRedirects = true; followRedirects = true; retryOnConnectionFailure = true; connectTimeout = 10_000; readTimeout = 10_000; writeTimeout = 10_000; pingInterval = 0; }
这里有两个参数比较重要,一个是dispatcher,由它来决定异步请求是直接执行还是放入等待队列;另一个是connectionPool,OkHttp把请求都抽象成Connection,而connectionPool就是来管理这些Connection的。这两者后文都会详细讲解。 第二步就是创建Request对象。
Request request = new Request.Builder().url("http://www.baidu.com").get().build();
同样采用了建造者模式。这里封装了请求的URL、请求方式等信息。 第三步是创建Call对象。
Call call = client.newCall(request);
由于Call是个接口,具体的操作都是在其实现类RealCall里完成的,来看RealCall的构造方法。
RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) { final EventListener.Factory eventListenerFactory = client.eventListenerFactory(); this.client = client; this.originalRequest = originalRequest; this.forWebSocket = forWebSocket; this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);// TODO(jwilson): this is unsafe publication and not threadsafe. this.eventListener = eventListenerFactory.create(this); }
可以看到它将前面创建的OkHttpClient和Request对象都传给了RealCall。 第四步,根据是同步请求还是异步请求调用不同的方法。 同步请求:
Response response = call.execute();
我们来具体看下execute()
方法。
@Override public Response execute() throws IOException { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); try { client.dispatcher().executed(this); Response result = getResponseWithInterceptorChain(); if (result == null) throw new IOException("Canceled"); return result; } finally { client.dispatcher().finished(this); } }
我们直接看重点代码,首先看client.dispatcher().executed(this);
,client.dispatcher()
是获取到Dispatcher对象,然后执行其executed()
方法,我们看下这个方法的实现。
/** Used by {@code Call#execute} to signal it is in-flight. */ synchronized void executed(RealCall call) { runningSyncCalls.add(call); }
就是将当前的请求call添加到runningSyncCalls这个队列中去,而runningSyncCalls就是同步请求的队列。 下面一行Response result = getResponseWithInterceptorChain();
就是具体的网络请求操作,这个后文会细讲。然后我们再来看finally里面的代码client.dispatcher().finished(this);
,finished()
方法代码如下:
privatevoid finished(Dequecalls, T call, boolean promoteCalls) { int runningCallsCount; Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); if (promoteCalls) promoteCalls(); runningCallsCount = runningCallsCount(); idleCallback = this.idleCallback; } if (runningCallsCount == 0 && idleCallback != null) { idleCallback.run(); } }
重点看这一行if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
,将call从runningSyncCalls移除了,如果不能移除就抛出异常。 所以总结一下,同步方法所做的工作就是先将call添加到Dispatcher的runningSyncCalls队列中去,完成请求后再将其移除。 异步请求:
call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { System.out.println("Response:onFailure"); } @Override public void onResponse(Call call, Response response) throws IOException { System.out.println("Response:" + response.body().string()); } });
这里enqueue最终会调用Dispatcher的enqueue方法,代码如下:
synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }
判断进行中的异步请求队列runningAsyncCalls的size是否小于最大请求数及进行中的请求数是否小于每个主机最大请求数,若都满足,则将call添加到异步请求队列runningAsyncCalls,并放入线程池中执行;若不满足,则将call添加到就绪异步请求队列readyAsyncCalls中。 接下来我们具体看下executorService().execute(call);
,也就是将call放入线程池中执行时具体做了哪些操作。
@Override protected void execute() { boolean signalledCallback = false; try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } }
首先在Response response = getResponseWithInterceptorChain();
这一行进行实际的网络请求,这与同步请求是一样的,在finally中执行client.dispatcher().finished(this);
,也与同步请求一样。通过判断retryAndFollowUpInterceptor.isCanceled()
分别回调onFailure和onResponse,这两个回调方法正是我们传入的callback。 ###2. Dispatcher解析 上面我们已经反复提到Dispatcher了,现在我们来具体分析Dispatcher这个类,总的来说,它的作用就是管理请求队列,并用线程池执行请求。
/** Ready async calls in the order they'll be run. */ private final DequereadyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ private final DequerunningAsyncCalls = new ArrayDeque<>(); /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private final DequerunningSyncCalls = new ArrayDeque<>();
首先Dispatcher里维护了这三个队列,readyAsyncCalls是异步请求就绪队列,也就是等待执行的异步请求队列。runningAsyncCalls是进行中的异步请求队列。runningSyncCalls是进行中的同步请求队列。 当执行同步请求时比较简单,首先调用synchronized void executed(RealCall call) { runningSyncCalls.add(call); }
添加到同步请求队列中,执行完后调用if (!calls.remove(call))
将call移除。 当执行异步请求时,方法如下
synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }
如上文所述,通过判断runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost
是否满足来决定将call添加到runningAsyncCalls还是readyAsyncCalls,那么问题来了,如果条件不满足,call被添加到readyAsyncCalls中等待了,待条件线程池中有空余的线程时,如何执行call呢。 上文也有提到,上述方法中的executorService().execute(call);
执行时,最终会调用client.dispatcher().finished(this);
方法,方法如下
privatevoid finished(Dequecalls, T call, boolean promoteCalls) { int runningCallsCount; Runnable idleCallback; synchronized (this) { if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); if (promoteCalls) promoteCalls(); runningCallsCount = runningCallsCount(); idleCallback = this.idleCallback; } if (runningCallsCount == 0 && idleCallback != null) { idleCallback.run(); } }
在if (!calls.remove(call))
runningAsyncCalls进行中的异步请求队列从中将call移除掉,然后执行if (promoteCalls) promoteCalls();
,这里promoteCalls这个标志位,同步为false,即不执行promoteCalls()
,异步为true,会执行,下面我们来看下这个方法的具体实现。
private void promoteCalls() { if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iteratori = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } }
对就绪异步请求队列进行了遍历,只要满足if (runningCallsForHost(call) < maxRequestsPerHost)
,就将其从就绪队列中remove掉,然后添加到进行中的队列,并放入线程池中执行。 最后我们看下线程池的实例化,
public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; }
核心线程数为0,这意味着线程空闲超过等待时间时,所以线程都会被杀掉;最大线程数为int最大值,但实际上Dispatcher对最大请求数做了限制为64个,所以最多只会有64个线程;等待时间为60s,即线程空闲一分钟后会被杀掉。 ###3. 拦截器执行流程 上文我们提到过,不管同步还是异步,最终进行网络请求的都是这一行Response response = getResponseWithInterceptorChain();
,来看其具体实现,
Response getResponseWithInterceptorChain() throws IOException { // Build a full stack of interceptors. Listinterceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
OkHttp进行网络请求的过程就是五个拦截器依次执行的过程,这五个拦截器分别是:
a. 负责失败重试以及重定向的RetryAndFollowUpInterceptor
;
b. 负责把封装一些头部信息的BridgeInterceptor
;
c. 负责读取缓存直接返回、更新缓存的CacheInterceptor
;
d. 负责和服务器建立连接的ConnectInterceptor
;
e. 负责向服务器发送请求数据、从服务器读取响应数据的CallServerInterceptor
.
下面我们理一下这些拦截器是如何依次执行的,上面的代码首先将这些拦截器存入一个list保存起来,然后传入RealInterceptorChain
的构造方法里,然后调用RealInterceptorChain
的proceed
方法,在proceed
里有如下代码
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
这里调用了拦截器的intercept
方法,注意,这里将index+1了,而在每个intercept
又会调用RealInterceptorChain
的proceed
方法,而每次index都会+1,这样就将list里的拦截器的intercept
方法都执行了。接下来我们来看这五个拦截器。
4. RetryAndFollowUpInterceptor、BridgeInterceptor和CacheInterceptor
RetryAndFollowUpInterceptor是用来进行重试和重定向的,在这个拦截器中创建了StreamAllocation对象,但并未使用,一直传到ConnectInterceptor。
BridgeInterceptor是用来封装一些请求头部信息的。
CacheInterceptor是用于处理缓存的,这三个拦截器这里不做过多延伸,重点来看后两个拦截器。
5. ConnectInterceptor
这个拦截器主要的作用是
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
首先获取在RetryAndFollowUpInterceptor
中创建的StreamAllocation
对象,然后获取HttpCodec
对象和RealConnection
对象,HttpCodec
用于编码Request和解码Response,RealConnection
用于实际进行网络IO传输。然后将这两个对象传递给下一个拦截器。接下来我们进入streamAllocation.newStream(client, doExtensiveHealthChecks);
方法详细看看是如何获取HttpCodec对象的。
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, this);
继续跟进到findHealthyConnection
里;
* Finds a connection and returns it if it is healthy. If it is unhealthy the process is repeated
* until a healthy connection is found.
*/
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}.
这是个死循环,获取到candidate后首先判断if (candidate.successCount == 0)
,满足即代表这是个全新的RealConnection,直接返回;不满足则检查是否健康可用,不可用则跳出此次循环继续获取下个RealConnection对象,直到找到可用的。
findConnection
里有如下核心代码,
// Attempt to use an already-allocated connection.
RealConnection allocatedConnection = this.connection;
if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
return allocatedConnection;
}
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
return connection;
}
首先尝试复用RealConnection
,如果不为空则直接将其返回;若为空则从connectionPool
里获取一个。如果获取不到,就new一个并放入connectionPool
。最终调用connect
方法进行打开一个socket链接。
6. CallServerInterceptor
具体的网络请求就是在这个拦截器的intercept
方法中执行的,重点的代码有以下几步:
a. httpCodec.writeRequestHeaders(request);
向socket中写入头部信息。
b. request.body().writeTo(bufferedRequestBody);
向socket中写入body信息。
c. httpCodec.finishRequest();
写入完毕。
d. responseBuilder = httpCodec.readResponseHeaders(false);
读取头部信息。
e. response = response.newBuilder().body(httpCodec.openResponseBody(response)).build();
读取body信息。
至此本文对OkHttp的分析已全部完毕,限于篇幅,部分环节的深度还显不够,待日后再补充。如有错漏之处还请指出。
网友评论