1.OkHttpClient
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
...
@Override
public Call newCall(Request request) {
return RealCall.newRealCall(this, request, false /* for web socket */);
}
@Override
public WebSocket newWebSocket(Request request, WebSocketListener listener) {
RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval);
webSocket.connect(this);
return webSocket;
}
}
public interface Call extends Cloneable {
...
interface Factory {
Call newCall(Request request);
}
}
public interface WebSocket {
...
interface Factory {
/**
* Creates a new web socket and immediately returns it. Creating a web socket initiates an
* asynchronous process to connect the socket. Once that succeeds or fails, {@code listener}
* will be notified. The caller must either close or cancel the returned web socket when it is
* no longer in use.
*/
WebSocket newWebSocket(Request request, WebSocketListener listener);
}
}
OkHttpClient
实现了Call.Factory, WebSocket.Factory
,做为创建Call
和WebSocket
的工厂。这里使用了抽象工厂设计模式。
2.RealCall
2.1 设计模式
final class RealCall implements Call {
final OkHttpClient client;
final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;
/**
* There is a cycle between the {@link Call} and {@link EventListener} that makes this awkward.
* This will be set after we create the call instance then create the event listener instance.
*/
private EventListener eventListener;
...
@Override
public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
eventListener.callFailed(this, e);
throw e;
} finally {
client.dispatcher().finished(this);
}
}
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
@Override
public void cancel() {
retryAndFollowUpInterceptor.cancel();
}
}
从上面的代码中可以看到RealCall
提供了一个对外的接口,内部调用各个子系统,通信是单向的(RealCall--->子系统)所以这是一个Facade模式。
2.2 同步请求代码分析
final class RealCall implements Call {
final OkHttpClient client;
final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;
/**
* There is a cycle between the {@link Call} and {@link EventListener} that makes this awkward.
* This will be set after we create the call instance then create the event listener instance.
*/
private EventListener eventListener;
...
// Guarded by this.
private boolean executed;
...
@Override
public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
// eventListener回调
eventListener.callStart(this);
try {
// 2.2.1对于同步请求,直接把Call对象加入到队列中
client.dispatcher().executed(this);
// 执行请求
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} catch (IOException e) {
// eventListener回调
eventListener.callFailed(this, e);
throw e;
} finally {
// 2.2.1对于同步请求,请求完毕后把Call对象从队列中移出
client.dispatcher().finished(this);
}
}
}
2.2.1 同步请求Call管理
/**
* Policy on when async requests are executed.
*
* <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
* own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
* of calls concurrently.
*/
public final class Dispatcher {
...
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
...
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
/** Used by {@code Call#execute} to signal completion. */
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
}
2.3 异步请求代码分析
final class RealCall implements Call {
...
@Override
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
// eventListener回调
eventListener.callStart(this);
// 2.3.1加入请求队列,等待调度
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
}
/**
* Policy on when async requests are executed.
*
* <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
* own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
* of calls concurrently.
*/
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
...
// 同步调用enqueue
synchronized void enqueue(AsyncCall call) {
// 2.3.1 异步请求入队列
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
/** Returns the number of running calls that share a host with {@code call}. */
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningAsyncCalls) {
if (c.get().forWebSocket) continue;
if (c.host().equals(call.host())) result++;
}
return result;
}
}
2.3.1 异步请求入队列
runningAsyncCalls
定义为private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
,private int maxRequests = 64;
。
从判断条件可以直到,OkHttp
把异步请求分为执行中和待执行。
什么时候将任务直接放到runningAsyncCalls
执行队列中?
首先看下runningCallsForHost(call)
方法,根据它的实现可以知道,这个方法是遍历执行队列runningAsyncCalls
中所有的任务AsyncCall
,统计和当前的AsyncCall
域名相同的AsyncCall
数量。
然后在看判断条件if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost)
就可以知道任务可以直接放到runningAsyncCalls
并行的条件是:
- 执行队列中任务数量小于64
- 在执行的所有请求中,和当前要执行的请求的host相同的请求不能大于5
如果不满足上面的条件,任务就会被放入到readyAsyncCalls
等待请求队列中。
那么readyAsyncCalls
队列总任务什么时候执行呢?
在回答这个问题钱我们先看下异步任务怎么执行的。当异步请求被放入runningAsyncCalls
队列后,就会调用executorService().execute(call);
执行这个请求。
public final class Dispatcher {
...
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
}
executorService()
创建了一个线程池,然后把这个异步请求AsyncCall
放入到线程池中执行。
再看下AsyncCall
的实现
final class RealCall implements Call {
...
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
@Override
protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
}
在execute()
方法中和同步请求执行过程一样都是通过InterceptorChain
执行请求。不同的是,异步请求通过responseCallback
回调将结果告诉调用者。
这里我们需要重点看下finaly
块中的代码。这里调用了Dispatcher.finished(AsyncCall)
方法通知Dispatcher
请求执行结束。
public final class Dispatcher {
...
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
//同步请求执行玩之后,也会调用这个方法,不同的是promoteCalls参数传的是false
finished(runningAsyncCalls, call, true);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
}
最终调用了private <T> void finished(Deque<T> calls, T call, boolean promoteCalls)
方法结束请求。同步请求执行玩之后,也会调用这个方法,不同的是promoteCalls
参数传的是false
。
那我们看下这个promoteCalls
参数有什么作用。根据上面代码我们看到如果promoteCalls
为true
,就会执行promoteCalls()
,根据这个方法实现,可以知道它就是把
readyAsyncCalls
队列中的任务取出,放入到runningAsyncCalls
队列中,然后提交到线程池中执行。
所以到这里我们就知道readyAsyncCalls
中的任务什么时候执行了。
网友评论