okhttp分享2:okhttp的线程管理及任务分发
一、线程池
上文主要分析了同步请求的大致流程,现在我们来看一下异步请求的流程,异步请求涉及的线程管理及任务分发okhttp是使用线程池实现的,因此我们首先看下android线程池的使用。
1.1、线程池适合场景
假如一个服务器完成一项任务的时间为T= T1+T2+T3
- T1 创建线程的时间
- T2 在线程中执行任务的时间,包括线程同步所需要的时间
- T3 线程销毁的时间
为了尽量减少T1与T3的时间,因此我们使用线程池维护一些线程,当需要使用线程时直接从线程池中取出使用,从而节省了T1、T3。
1.2、线程池构造方法
一般我们会使用ThreadPoolExecutor来构建线程池,其最终构造方法如下
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
下面我们主要分析下其参数
- corePoolSize:该线程池中核心线程数最大值
核心线程:线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程。核心线程默认情况下会一直存活在线程池中,即使这个核心线程处于闲置状态。非核心线程会在一定时间后被回收(keepAliveTime)。
如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么核心线程如果不干活(闲置状态)的话,超过一定时间(keepAliveTime),就会被销毁掉。
- maximumPoolSize: 该线程池中线程总数最大值
线程总数 = 核心线程数 + 非核心线程数。
- keepAliveTime:该线程池中非核心线程闲置超时时长
一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉,如果设置allowCoreThreadTimeOut = true,则也会作用于核心线程。
- TimeUnit unit:keepAliveTime的单位
TimeUnit是一个枚举类型,其包括:
NANOSECONDS : 1微毫秒 = 1微秒 / 1000
MICROSECONDS : 1微秒 = 1毫秒 / 1000
MILLISECONDS : 1毫秒 = 1秒 /1000
SECONDS : 秒
MINUTES : 分
HOURS : 小时
DAYS : 天
- workQueue:该线程池中的任务队列:维护着等待执行的Runnable对象
当所有的核心线程都在处于工作状态时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。
常用的workQueue类型:
- SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它, 如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现线程数达到了maximumPoolSize而不能新建线程的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大。
- LinkedBlockingQueue:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize。
- ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误。
- DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务。
- ThreadFactory threadFactory:线程工厂
自定义创建线程的方式,这是一个接口。
- RejectedExecutionHandler handler:饱和策略
AbortPolicy(默认):直接抛弃
CallerRunsPolicy:用调用者的线程执行任务
DiscardOldestPolicy:抛弃队列中最久的任务
DiscardPolicy:抛弃当前任务
1.3、线程池执行逻辑
线程池执行主要是在execute方法,由于篇幅有限,本文不做过多叙述,仅介绍大致流程。当在execute(Runnable)方法中提交新任务并且少于corePoolSize线程正在运行时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。 如果有多于corePoolSize但小于maximumPoolSize线程正在运行,则仅当队列已满时才会创建新线程。 通过设置corePoolSize和maximumPoolSize相同,您可以创建一个固定大小的线程池。 通过将maximumPoolSize设置为基本上无界的值,例如Integer.MAX_VALUE,可以允许池容纳任意数量的并发任务。大致流程图如下
线程池.jpg
以下用 currentSize 表示线程池中当前线程数量
- 当 currentSize < corePoolSize 时,将会直接启动一个核心线程来执行任务。
- 当 currentSize >= corePoolSize ,那么任务会被插入到任务队列中排队等待执行。
- 如果任务队列已满,但 currentSize < maximumPoolSize,那么会立刻启动一个非核心线程来执行任务
- 如果任务队列已满,并且 currentSize > maximumPoolSize,那么就拒绝执行任务,ThreadPoolExecutor 会调用 RejectedExecutionHandler 的 rejectedExecution 方法来通知调用者。
二、异步请求
直接上代码
RealCall中
public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
eventListener.callStart(this);
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
主要看最后一行,首先看下Dispatcher类的相关方法。
private int maxRequests = 64;// 最大并发请求数为64
private int maxRequestsPerHost = 5; //每个主机最大请求数为5
/** Executes calls. Created lazily. */
//消费者池(也就是线程池)
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
// 异步的任务中正在等待被消费的线程集合
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
//正在运行的 异步的任务集合
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
...
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
...
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
executorService()方法实现了懒加载的无数量限制的线程池,根据我们上文介绍可知在OKHttp中,创建了一个阀值是Integer.MAX_VALUE的线程池,它不保留任何最小线程,随时创建更多的线程数,而且如果线程空闲后,只能多活60秒。所以也就说如果收到20个并发请求,线程池会创建20个线程,当完成后的60秒后会自动关闭所有20个线程。他这样设计成不设上限的线程,以保证I/O任务中高阻塞低占用的过程,不会长时间卡在阻塞上。
回到enqueue方法,其执行逻辑也很清晰,如果当前正在执行的call的数量大于maxRequest(64),或者该call的Host上的call超过maxRequestsPerHost(5),则加入readyAsyncCall排队等待,否则加入runningAsyncCalls并执行。
再看下enqueue方法的传入参数AsyncCall,他是RealCall的一个内部类,代码如下
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override
protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}
当线程池执行到call时,会调用execute()方法,我们发现该方法基本流程与同步请求基本一致,不做过多说明。最后调用dispatcer的finished方法,其代码如下
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
...
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
...
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
通过上面代码,大家可以知道finished先执行calls.remove(call)删除call,然后执行promoteCalls(),在promoteCalls()方法里面:如果当前线程大于maxRequest则不操作,如果小于maxRequest则遍历readyAsyncCalls,取出一个call,并把这个call放入runningAsyncCalls,然后执行execute。在遍历过程中如果runningAsyncCalls超过maxRequest则不再添加,否则一直添加。实际上promoteCalls()负责ready的Call到running的Call的转化。
具体流程图如下
okhttp线程分发.png
网友评论