文章只针对异步请求做分析,同步在此忽略,如有不足错误之处恳请指正,整个流程图会在最后一篇赠上。
1.关于OKHttp的使用:
OkHttpClient client =new OkHttpClient.Builder().build();
Request request =new Request.Builder() .url(url).get().build();
client.newCall(request).enqueue(newCallback() {....});
2.OkHttpClient是以build模式将部分属性封装进去方便操作,我们以newCall()为入口一步步分析源码:
/**
* Prepares the {@coderequest} to be executed at some point in the future.
*/
@OverridepublicCall newCall(Request request) {
return new RealCall(this,request, false/* for web socket */);
}
3.可以看到上面new出来的是重要的类RealCall,调用的是它的enqueue()方法:
@Override public void enqueue(Callback responseCallback) {
synchronized(this) {
if(executed)throw newIllegalStateException("Already Executed");
executed =true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
4.RealCall的enqueue()方法真正调用的是Dispatcher类的enqueue方法,并创建了一个AsyncCall(这个类会在后面分析)。在此异步请求正式被分发,来看看Dispatcher类的enqueue源码:
synchronized void enqueue(AsyncCall call) {
if(runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
}else{
readyAsyncCalls.add(call);
}}
maxRequests = 64: 最大并发请求数为64
maxRequestsPerHost = 5: 每个主机最大请求数为5
Deque runningAsyncCalls:正在运行的任务,判断并发数量
Deque readyAsyncCalls:缓存等待的请求队列
如果满足条件则将异步请求放入runningAsyncCalls()并添加到线程池进行处理,OKHttp的创建的线程池:
public synchronized ExecutorService executorService() {
if(executorService ==null) {
executorService =new ThreadPoolExecutor(0,Integer.MAX_VALUE,60,TimeUnit.SECONDS,
new SynchronousQueue(),Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
5.new ThreadPoolExecutor()参数如下:
int corePoolSize: 最小并发线程数,这里并发同时包括空闲与活动的线程,如果是0的话,空闲一段时间后所有线程将全部被销毁。
int maximumPoolSize: 最大线程数,当任务进来时可以扩充的线程最大值,当大于了这个值就会根据丢弃处理机制来处理
long keepAliveTime: 当线程数大于corePoolSize时,多余的空闲线程的最大存活时间,类似于HTTP中的Keep-alive
TimeUnit unit: 时间单位,一般用秒
BlockingQueue workQueue: 工作队列,先进先出。
ThreadFactory threadFactory: 单个线程的工厂,可以打Log,设置Daemon(即当JVM退出时,线程自动结束)等
6.线程池的execute(Runnable runnable)正常调用的是Runnable的run()方法,在此看看executorService().execute(call)中AsyncCall的源码,AsyncCall是RealCall的一个内部类:
final class AsyncCall extends NamedRunnable {
private finalCallback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s",redactedUrl());
this.responseCallback = responseCallback;
}
..部分代码省略
@Overrideprotected void execute() {
booleansignalledCallback =false;
try{
Response response = getResponseWithInterceptorChain();
if(retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback =true;
responseCallback.onFailure(RealCall.this, newIOException("Canceled"));
}else{
signalledCallback =true;
responseCallback.onResponse(RealCall.this,response);
}
}catch(IOException e) {
if(signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO,"Callback failure for "+ toLoggableString(),e);
}else{
responseCallback.onFailure(RealCall.this,e);
}}finally{client.dispatcher().finished(this);}}}
7.没有看到run(),继续往下找看看它父类的源码:
public abstract class NamedRunnable implements Runnable {
protected finalString name;
publicNamedRunnable(String format,Object... args) {
this.name = Util.format(format,args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try{
execute();
}finally{
Thread.currentThread().setName(oldName);
}}
protected abstract void execute();}
8.从上面代码可以看到AsyncCall的父类确实是继承Runnable,在run()里面调用的AsyncCall的execute()。回头看AysncCall的execute()方法,getResponseWithInterceptorChain()是设置Interceptor的入口,这个下一篇会分析。在finally里面调用了Dispatcher类的finished()方法,继续看源码:
/** Used by {@codeAsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls,call, true);//同步在此传入的是false
}
private void finished(Deque calls,T call, booleanpromoteCalls) {
intrunningCallsCount;
Runnable idleCallback;
synchronized(this) {
if(!calls.remove(call))throw newAssertionError("Call wasn't in-flight!");
if(promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback =this.idleCallback;
}
if(runningCallsCount ==0&& idleCallback !=null) {
idleCallback.run();
}
}
9.异步请求调用了Dispatcher类的promoteCalls()方法,继续:
private void promoteCalls() {
if(runningAsyncCalls.size() >= maxRequests)return;// Already running max capacity.
if(readyAsyncCalls.isEmpty())return;// No ready calls to promote.
for(Iterator i = readyAsyncCalls.iterator();i.hasNext();) {
AsyncCall call = i.next();
if(runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();//将缓存的call移除
runningAsyncCalls.add(call);
executorService().execute(call);
}
if(runningAsyncCalls.size() >= maxRequests)return;// Reached max capacity.
}
}
如果满足runningCallsForHost(call)<maxRequestsPerHost(每个主机最大请求数为5)条件,则又执行了executorService().execute(call)回到上面的步骤,并且将缓存队列的call移除添加到正在运行的队列中,继续这样循环将处理异步请求。
The End
通过分析,Dispatcher用了两个Deque(runningAsyncCalls和readyAsyncCalls)队列,一个作为等待请求的缓存,一个存储正在运行的请求。并在AsyncCall中execute()的finally()里面调用Dispatcher的finished()方法将等待的缓存进行手动出站以及添加到线程池进行处理和正在运行的队列,自绘流程图如下:
下一篇将对OkHttp的Interceptor做分析。
网友评论