OkHttp阻塞式同步请求
- 1.创建一个okHttpClient对象
- 2.创建request对象,和response对象
- 3.通过call对象来操作请求
- 4.call是个接口,所以我们需要使用它的实现类realCall来完成实际的请求工作
在realCall中需要注意:
1.一个call只能被请求一次。如果被请求过了,那么再次请求就会抛出异常。
2.okHttp内部调度器会将同步请求添加到一个队列当中。
3.okHttp会通过拦截器链来做实际的网络请求工作。
- 5.执行完毕之后为了让我们Dispatcher负荷降低,会把同步请求从Dispatcher中移除。
OkHttp非阻塞式异步请求
- 1.生产一个okHttpClient对象
- 2.创建request对象。里面会包含一些网络信息:url等
- 3.通过Call对象的enqueue方法进行异步网络请求,这里他会创建一个call对象加入调度当中,利用callback回调
- 4.不管同步异步,网络请求都是通过拦截器链来实现的。通过依次执行拦截器链中的每一个拦截器,得到服务器的数据返回。
OkHttp内部线程池创建
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
这个线程池是在Dispatcher中创建的。第一个参数“0”指的是核心线程数的数量,为0表示线程空闲后不会被保留。第二个参数表示线程池中可以容纳的最大线程数量。第三个是keep-alive-time,就说当我们线程池中线程数量大于我们的核心线程数时,他会等待60秒后才会被终止。第四个是一个创建好的等待线程队列,他也是个同步队列,先来先服务。最后一个是一个线程工厂,他会创建一个守护线程用来处理我们的Dispatcher类。需要注意的是 SynchronousQueue 这个线程等待队列,说是说队列,但是其实它中的每一个插入操作必须等待另一个线程移除完才能操作。所以说这个队列内部其实是没有任何一个元素的
接下来我们看下executorService().execute(call);这个execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
源码在ThreadPoolExecutor中。他分为三种状态:
- 新过来一个任务,这时候如果核心数为0,那么这时候不需要创建核心线程,直接加到队列中。
- 2.如果这时候有线程刚好空闲,那么他就可以接收任务。
- 3.如果这时候线程池中工作饱和了。他就会入队失败,如果他数量为0的时候他就会调用addworker方法来创建并启动新线程。
Question:如果我一次发起大量的请求就会造成大量的线程冗余。所以光有线程池控制是不够的。线程池只起了一个辅助作用,用来缓存线程,而真正对线程数量进行控制以及他们的执行时机进行调度的是我们的Dispatcher。
Dispatcher的作用:
-
1.维护请求状态
-
2.在维护请求状态的前提下,他又会去维护一个线程池来进行同步或异步请求操作。
/** Ready async calls in the order they'll be run. */ private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>(); /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
我们看到Dispatcher内部维护了3个队列
-
1.第一个是异步的等待队列,
-
2.第二个是异步的执行队列,
-
3.第三个是同步的执行队列。
synchronized void enqueue(AsyncCall call) { if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else { readyAsyncCalls.add(call); } }
他会判断当前的最大请求数是否在允许范围内,以及最大的主机请求数是否在允许范围内。如果是的话他就会将这个请求扔到异步请求队列中执行,然后开启相应线程开启网络请求,如果不符合,他会把这个请求放到异步等待队列当中。等到异步执行队列当中有空闲的线程,他会从异步等待队列中去获取想要执行的网络请求。
我们在看AsyncCall这个类中的execute方法
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
我们可以看到不管成功还是失败他最后都会调用finished方法。那我们看看这个方法内部是做了什么。
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
我们看见他会调用一个promoteCalls方法。
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
我们来看一下这个方法,他首先会判断这个正在运行的异步请求数量是否超过他最大的允许请求数量。如果超过了就返回。
我们再看下一句,如果异步等待队列中是空的话。这时候也直接返回。
之后他就会通过迭代器去循环遍历异步等待队列。判断所有的运行的主机是否小于最大限制。他会将call从等待队列中移除,然后添加到正在执行的队列当中。最后开启线程池去执行我们的请求。
综上所述
我们的Dispatcher配置器 默认情况下会给我们每个域名有一个快速响应的请求数量,但是他还是限制了每个域名的并发数和总体的并发数。线程池做的 仅仅只是缓存线程的功能,真正调度策略还是靠我们的Dispatcher。我们可以根据自己的应用场景去配置我们的Dispatcher。
网友评论