https://juejin.cn/post/7095315542202351623
这篇文章分析了,Request如何构建以及加入调度队列的。
源码分析基于 3.14.4
发起请求
//RealCall类中
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
client.dispatcher().enqueue(new AsyncCall(responseCallback));//1
}
//Dispatcher类中
void enqueue(AsyncCall call) {
synchronized (this) {
readyAsyncCalls.add(call);//2
// the same host.
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());//3
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
promoteAndExecute();//4
}
- 注释1:将Call封装成AsyncCall,然后加入分发器等待被执行;
- 注释2:将AsyncCall加入等待队列;
- 注释3:findExistingCallWithHost查找是否有相同host的请求,这是为统计同一个主机的请求有多少,便于做负载控制;
- 注释4:promoteAndExecute,判断是否开启线程执行;
private boolean promoteAndExecute() {
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // 1
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // 2
i.remove();
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);//3
runningAsyncCalls.add(asyncCall);//4
}
isRunning = runningCallsCount() > 0;
}
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());//5 调用下面的executeOn函数
}
return isRunning;
}
//AsyncCall类中
void executeOn(ExecutorService executorService) {
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
......
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
- 注释1:判断当前执行的任务数,默认最大值为64;
- 注释2:判断同一个Host的任务数,默认最大值为5;
- 注释3、4:如果当前执行的任务数小于64而且每个host的任务数小于5,则将任务从等待队列中移除,加入执行队列;我理解executableCalls是等待执行队列与执行中队列间的过渡变量,记录该次要被调度的任务;
- 注释5:执行刚才从等待队列移除的任务,线程池开始调度任务;executorService()返回线程池对象;AsyncCall实现了Runnable,所以可以被线程池调度;
- 为了保证,等待队列的任务能及时被调度,所以调用enqueue、setMaxRequests、setMaxRequestsPerHost、finished函数,promoteAndExecute会被触发;
OKHttp的线程池
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
- executorService()返回线程池,单例模式;
- 这个线程池有点意思,核心线程数为0,线程数最大是 Integer.MAX_VALUE,SynchronousQueue是一个没有容量的队列;所以当添加任务时,因为核心线程为0,那么任务将添加SynchronousQueue中,由于它没有容量,所以线程池会创建线程来执行任务;
- 补充知识,任务往SynchronousQueue添加时,会有两个情况:1.线程A调用了take或poll取任务,线程B调用offer添加任务将成功,但是里面被线程A取走了并执行,2.没有线程调用take或poll,线程B调用offer添加任务将失败,线程池新建线程来执行任务;利用没有容量的队列,可以使任务能马上得到响应,没必要等队列满才新建线程执行任务;
//AsyncCall类中
@Override protected void execute() {
......
try {
Response response = getResponseWithInterceptorChain();//1
......
responseCallback.onResponse(RealCall.this, response);//2
} catch (IOException e) {
......
responseCallback.onFailure(RealCall.this, e);
} catch (Throwable t) {
......
responseCallback.onFailure(RealCall.this, canceledException);
}
throw t;
} finally {
client.dispatcher().finished(this);
}
}
- 注释1:前面提到AsyncCall实现Runnable接口,当线程池执行AsyncCall.run,最后会执行AsyncCall.execute;获取结果;
- 注释2:回调结果;
- 至此任务被调度了。
疑问1:线程数最大可以是Integer.MAX_VALUE,会不会撑爆虚拟机?
其实不会的,因为在Dispatcher.promoteAndExecute添加任务时,已经限制了数量;
疑问2:为什么核心线程数为0?
默认情况下,核心线程是不会被回收,除非线程池shutdown了,而其他线程可以设置超时时间,后来我想了想,核心线程数不为0,通过allowCoreThreadTimeOut设置true,也可以达到目的,只不过直接设置0比较简单;
总结
- 执行call.enquque,只是把任务加入等待队列,等待被执行;
- 当前执行中的任务数小于64,且所属host任务数小于5,则将任务移入正在执行队列,触发线程池执行;
- 线程池核心线程数为0,线程数最大为Integer.MAX_VALUE,利用没有容量的SynchronousQueue,让请求都能马上得到响应;
以上分析有不对的地方,请指出,互相学习,谢谢哦!
网友评论