call.enqueue()
异步请求, Call是一个接口。他的实现类是RealCall
String url = "http://www.baidu.com";
OkHttpClient okHttpClient = new OkHttpClient();
OkHttpClient build = okHttpClient.newBuilder()
.readTimeout(30, TimeUnit.SECONDS)
.connectTimeout(30, TimeUnit.SECONDS)
.build();
RequestBody body = RequestBody.create( MediaType.parse("text/x-markdown; charset=utf-8"),"");
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
Call call = build.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});
找到RealCall中的enqueue方法 client.dispatcher().enqueue(new AsyncCall(responseCallback));
从okhttpClient拿到分发器(Dispatcher,里面包括同步队列和异步队列),并且调用enqueue方法时传入了AsyncCall(这是一个Runnable),可以看作一个请求任务(里面包括一些请求参数什么的)
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
//最终会调用到Dispatcher的enqueue方法,AsyncCall看作一个请求任务
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
下面是Dispatcher的enqueue方法
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();//异步,准备执行或者叫等待队列
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();//异步,正在执行的队列
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();//正在执行的同步队列
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//加入到正在执行的队列中
runningAsyncCalls.add(call);
//使用线程池执行这个任务
executorService().execute(call);
} else {
//加入到等待队列中
readyAsyncCalls.add(call);
}
}
- readyAsyncCalls 异步,准备执行或者叫等待队列
- runningAsyncCalls 异步,正在执行的队列
-
runningSyncCalls 正在执行的同步队列
enqueue方法中有个if判断:正在执行的异步队列的大小有没有满(最大正在执行任务数64). 并且请求同一个服务器的地址的个数也不能大于5,满足以上条件的直接放入正在执行的队列中,不满足的则放入等待队列中,加入到正在执行的队列后通过executorService().execute(call);
来执行任务(线程池执行任务),上面说过AsyncCall 其实就是一个Runnable,所以会执行到run(),我们看一下他的父类NamedRunnable 中的run()方法
@Override
public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
//子类(AsyncCall)执行任务
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
ReaCall.java,
@Override protected void execute() {
boolean signalledCallback = false;
try {
//使用拦截器链获取response
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//不管请求失败还是成功,最后都会走到这
client.dispatcher().finished(this);
}
}
请求的拦截器先不看,先看看finally,这里是一个请求任务的最后一步了,调用了Dispatcher的finished方法
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//将执行完的这个任务从正在执行的队列中删除
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//这个方法很重要,将等待队列中的Call加入到正在执行的队列中
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
promoteCalls()方法,遍历readyAsyncCalls(等待队列),将任务加入到runningAsyncCalls(正在执行的队列)中并执行
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
上面就是Okhttp异步请求的大概过程,还有以下两个问题
- 为什么要使用Deque队列,而不使用LinkedList或者其他的
- 什么时候去请求的网络获取数据
先说第一个,去百度了一个ArrayDeque, Api中有这么一句话
This class is likely to be faster than [Stack
], when used as a stack, and faster than [LinkedList
]when used as a queue.
此类用作堆栈时比 [Stack
]更快,并且用作队列时比[LinkedList
] 更快
而且他还是一个双端队列,正常的堆栈和队列都是只有 一端是入口也是出口,或者一端是入口另一端是出口,而他不一样,两端都可以是入口和出口
然后是第二个什么时候做的网络请求获取的数据,这得看一下ReaCall.execute
方法中的getResponseWithInterceptorChain();
,这个方法是通过拦截器链获取响应数据
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
//用户自己添加拦截器
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
//缓存拦截器
interceptors.add(new CacheInterceptor(client.internalCache()));
//内部真正的建立连接
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
// 用户添加的network拦截器
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
originalRequest, this, eventListener, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
return chain.proceed(originalRequest);
}
- retryAndFollowUpInterceptor 重试,当发生IOException异常或者RuteException时进行重试
- BridgeInterceptor 设置一些 Content-type、Cookie(缓存)、 Connection(Keep-Alive)等,将用户没有设置 的初始化默认值,并构建一个新的Request(里面包括用户设置的和一些没有设置但又是必须的参数)
- CacheInterceptor 缓存
- ConnectInterceptor 真正建立连接的 ,下面是几个关键代码
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
//找到历史连接并且复用,这个方法里面调用了findConnection(),复用了Socket连接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
完!!!
网友评论