
执行一次异步(开启工作线程,不会阻塞当前线程,结果通常采用接口回调方式回传)请求,大概需要经历如下:
//1,构建 client
OkHttpClient client = new OkHttpClient.Builder().readTimeout(10, TimeUnit.SECONDS).build();//可以通过添加各种属性
//2,构建request请求
Request request = new Request.Builder().url("www.baidu.com").build();//可以继续添加各种数据
//3,获得call对象
Call call = client.newCall(request);
//4,执行异步步请求
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
//子线程
}
@Override
public void onResponse(Call call, Response response) throws IOException {
//子线程
}
});
这里前三步,跟同步请求差不多,分析一下第四步,异步请求:
执行异步调用的是Call.enqueue()
方法:
//RealCall.java
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));//重点
}
上面可以看出,executed
保证了只能执行一次,enqueue(AsyncCall call)
中需要一个AsyncCall
,源码可以看出,AsyncCall
是RealCall
的一个内部类,并且继承自NamedRunnable
具体如下:
final class AsyncCall extends NamedRunnable {
//.....省略部分代码
//execute() 运行在run()方法中,从下面的NamedRunnable中可以看出
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();//拦截器链,后续具体分析
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);//将结果回调返回,responseCallback就是执行enqueue方法,传入的callback
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
这里可以大概猜出,它其实是一个Runnable
,接着看一下NamedRunnable
:
public abstract class NamedRunnable implements Runnable {//实现了Runnable接口
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();//在run方法中,执行了抽象方法 execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();//抽象方法,具体在上面的 AsyncCall 实现
}
分析完 AsyncCall
,可以继续看client.dispatcher().enqueue(new AsyncCall(responseCallback)); 也就是 Dispatcher.enqueue()
方法
//Dispatcher.java
private int maxRequests = 64; //最大请求数量 64
private int maxRequestsPerHost = 5;
synchronized void enqueue(AsyncCall call) {
// 下面逻辑,为了判断不能超过最大请求数量
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);//将本次请求call,添加到运行队列
executorService().execute(call);//线程池,执行
} else {
readyAsyncCalls.add(call);// 添加到准备队列
}
}
Dispatcher内部维护了一个线程池来执行请求,上面代码executorService().execute(call);
,也就是线程池执行了call:
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
executorService().execute(call);其实也就是执行,run()
方法,也就回到了上面NamedRunnable
的run()
方法,run()
方法调用了AsyncCall.execute()
接着看一下,AsyncCall execute()-->client.dispatcher().finished(this);最后的时候,调用了Dispacher.finished()
方法
//Dispatcher.java
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);//异步,这里三个参数为 true
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");//将现在执行的call,从对应队列中删除
if (promoteCalls) promoteCalls(); // promoteCalls = true ,调用 promoteCalls()
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
promoteCalls()
方法:
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // 当前运行异步call队列大小,不能超过 maxRequests(maxRequests = 64)
if (readyAsyncCalls.isEmpty()) return; // 异步,准备队列如果没有数据,return
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {//遍历,异步准备Call队列
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);//将ready --> running状态
executorService().execute(call);//线程池,继续执行队列中的call
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
RealCall # getResponseWithInterceptorChain()
拦截器链分析:
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());//自定义拦截器,定义在最初的地方
interceptors.add(retryAndFollowUpInterceptor);//重试拦截器
interceptors.add(new BridgeInterceptor(client.cookieJar()));//桥接拦截器
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
-
retryAndFollowUpInterceptor
失败重连拦截器,主要对网络请求出现异常的时候重连操作(最多重连20次),完成后,继续调用下个拦截器对response进行处理 -
BridgeInterceptor
桥接拦截器,其中主要是对 请求参数,如Header的处理、body中的类型Content-Type
、长度Content-Length
、User-Agent
等等的处理,在进行这些处理完成,也会去链式调用chain.proceed(requestBuilder.build());
继续递归处理剩余拦截器。 -
CacheInterceptor
缓存拦截器,主要对请求进行缓存处理,使用的时候,只需要在创建Client的时候,添加Cache类即可。 -
ConnectInterceptor
连接拦截器,底层通过TCP进行网络连接。 -
CallServerInterceptor
连接服务拦截器,也是最终的拦截器,内部没有chain.proceed
方法,通过 读写来进行Response的返回,将结果返回给前一个拦截器,依次进行下去。
拦截器链工作流程.png
网友评论