private boolean addWorker(Runnable firstTask, boolean core)
方法说明
此方法用于新增线程池的Worker对象。由于一些条件的限制,此方法并不总是能执行成功。具体的执行情况如下:
- 如果当前线程池处于STOP、TIDYING和TERMINATED中的任一状态,不执行新增Worker操作,返回false;
- 以下描述的是线程池处于SHUTDOWN状态的情况:
- 参数firstTask不为null,也就是说有新任务的提交,不执行新增Worker操作,返回false;
- 参数firstTask为null,并且当前任务队列为空,不执行新增Worker操作,返回false;
- 以下描述的是线程池处于RUNNING状态的情况:
- 线程池中Worker的数量已到达能够容纳的最大值CAPACITY,不执行新增Worker操作,返回false;
- 准备新增的为核心Worker,并且线程池中Worker的数量已到达最大值corePoolSize,不执行新增Worker操作,返回false;
- 准备新增的为非核心Worker,并且线程池中Worker的数量已到达最大值
maximumPoolSize,不执行新增Worker操作,返回false。
成功将新建的Worker放入Worker集合后,会启动Worker对象内部的线程,此方法执行是否成功以这个线程是否启动为准,若线程启动的标志为false,则说明新增Worker的操作失败,要做新增Worker失败处理。
addWorkerFailed方法用于处理新增Worker失败的情况,会执行下面三步操作:
- 新增Worker逻辑已经创建了一个Worker对象,那么此方法就要负责从Worker集合中移除这个Worker对象;
- 将线程池Worker集合大小减1,通过CAS进行操作,自旋直到减成功;
- 调用tryTerminate方法,尝试停止线程池,能否停止成功以tryTerminate的执行为准(尝试性得停止)。
执行流程图
ThreadPoolExecutor - addWorker - 执行流程.jpgprivate Runnable getTask()
方法说明
getTask是个比较重要的方法,它负责从任务队列中获取任务。
getTask方法有两种返回数据:
- null
返回null,则预示着执行此方法的Worker将会被从Worker集合中移除; - 队列任务
timed变量
此方法中有个变量timed,此变量控制获取队列任务的线程的等待方式:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
这里涉及到阻塞队列的知识,从阻塞队列中获取元素,有两种方式:限时阻塞和无限时阻塞。限时阻塞会在指定的等待时限到达后被唤醒,不必须等到队列有元素才会被唤醒;而无限时阻塞必须等队列有元素后被通知唤醒,也就是说如果队列中一直没有元素,那么线程就会一直阻塞。
ThreadPoolExecutor通过参数的配置来确定采用哪种方式:
- 若allowCoreThreadTimeOut参数配置为true,那么的线程都会采用限时阻塞方式来获取队列元素,等待超时时限为keepAliveTime属性值;
- 若allowCoreThreadTimeOut参数配置为false,那么当线程池中Worker总数到达corePoolSize后,观察到这种情况的Worker采用限时阻塞方式。这里要特别注意:到达corePoolSize之前的Worker采用的无限时阻塞方式。通过这样保证当线程池无处理的任务时,这部分采用限时阻塞方式的Worker可以被从Worker集合中移除,使得线程池只保留corePoolSize数量的Worker,而这些Worker正是在未到达corePoolSize之前的那部分无限时阻塞方式的Worker。
执行流程图
ThreadPoolExecutor - getTask() - 流程分析.jpgprivate void processWorkerExit(Worker w, boolean completedAbruptly)
方法说明
这个方法的作用是在Worker退出后做相关的处理。
Worker退出是什么意思?
ThreadPoolExecutor代码中调用processWorkerExit方法的只有一处,就是runWorker方法的finally语句,runWorker的代码如下:
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
while循环体可以说就是Worker的工作区,它会不断地通过getTask()从队列中获取取任务来执行。Worker退出就是指从这个工作区退出来。要从工作区退出来,通过while的条件可以知道:
task != null || (task = getTask()) != null
当task == null && getTask() == null的时候,循环就会结束。task == null这种情况其实在我们这里可以认为总是是成立的,因为task!=null只会出现一次,就是Worker处理新提交的用户任务的时候。
总结一下processWorkerExit可以执行的两种情况:
- while循环体执行出现异常;
completedAbruptly标记用来指示是否是异常退出,如果为true则表示是异常退出,反之则不是; - getTask() == null
getTask() == null
出现getTask() == null的情况如下:
- 线程池中Worker的总数大于maximumPoolSize;
- 线程池已停止(STOP、TIDYIING和TERMINATEDR任意一种);
- 线程池已关闭(SHUTDOWN),并且任务队列为空;
- 线程池Worker的总数已经超过corePoolSize或者allowCoreThreadTimeOut设置为true,上一次获取任务已经超时,当前的Worker不是最后一个或者是最后一个Worker并且任务队列为空。
执行流程图
ThreadPoolExecutor - processWorkerExit - 执行流程.jpgpublic void execute(Runnable command)
方法说明
此方法为线程池任务提交处理(非执行)主方法,根据具体情况决定是否新增Worker,并且在线程池无法接纳新任务时根据具体的拒绝策略执行任务拒绝处理。
以下是处理可能的处理情况:
- 若当前线程池核心Worker数小于corePoolSize,则调用addWorker方法新增Worker来处理提交的任务;
- 若当前线程池核心Worker数等于或大于corePoolSize,并且线程池处于RUNNING状态则将提交的任务放入任务队列中。操作成功,执行recheck操作,来处理任务放入任务队列成功后线程池状态的改变:
- 若此时线程池已处于非RUNNING状态,则从任务队列中移除刚才放入任务队列的任务(并且尝试终止线程池),移除成功,执行任务拒绝处理;移除失败,执行下面所述;
- 若线程池仍然处于RUNNING状态或非RUNNING状态移除入队列的任务失败,则判断当前线程池是否有Worker(可能之前线程池中的Worker们已经死亡),没有Worker则调用addWorker新增Worker。
- 若当前线程池核心Worker数等于或大于corePoolSize,线程池处于RUNNING状态并且因为队列已满等原因无法将新提交的任务放入任务队列中,则调用addWorker新增Worker来处理新提交的任务,操作失败,执行任务拒绝处理。
任务队列为无界队列,正常情况下总是能将任务放入到队列中,那么就算
corePoolSize之内的Worker处于繁忙状态,也不会新增Worker来处理新提交的任务。也就是说这种情况下maximumPoolSize这个配置是无用的。
执行流程图
ThreadPoolExecutor - execute - 流程分析.jpgfinal void tryTerminate()
方法说明
此方法用来停止线程池,将线程池的状态转移到TERMINATED,但不是做强制性的停止,而是尝试式的停止,处理情形如下:
- 线程池当前为RUNNING状态,不符合停止条件,方法返回;
- 线程池当前为TIDYING或TERMINATED,不符合停止条件,方法返回;
- 线程池当前为SHUTDOWN状态,任务队列不为空,不符合停止条件,方法返回;
- 上面三个条件不满足,如果池中还存在Worker,则尝试通过中断某一个Worker内部线程的方式,利用传播性来继续执行tryTerminate()方法,方法返回。
中断线程后,会做Worker退出处理,退出会再次调用tryTerminate()方法。
- 符合了停止条件,则先通过CAS操作将线程池状态转移到TIDYING状态,表明线程池中已无需要处理的任务和无Worker存在,状态转移不成功,则继续tryTerminate的下一轮执行;状态转移成功,则调用terminated(),terminated()调用结束(正常结束或非正常结束),则将线程池状态改为TERMINATED,此状态就表明线程池已经被停止了,然后执行通知操作,通知那些在等待线程池停止的线程。
调用awaitTermination方法的线程会等待线程池停止后的通知。
执行流程图
ThreadPoolExecutor - tryTerminate() - 流程分析.jpgpublic void shutdown()
方法说明
外部通过调用shutdown方法来关闭线程池,此方法执行的步骤为:
- 第一步:检查是否有关闭权限;
- 第二步:将线程池状态转移为SHUTDOWN(循环通过CAS更新,直至成功);
- 第三步:中断空闲的Worker;
- 第四步:调用onShutdown方法;
- 第五步:调用tryTerminate尝试终止线程池。
执行流程图
ThreadPoolExecutor - shutdown() - 流程分析.jpgpublic List<Runnable> shutdownNow()
方法说明
外部通过调用shutdownNow方法来立即关闭线程池,此方法执行的步骤为:
- 第一步:检查是否有关闭权限;
- 第二步:将线程池状态转移为STOP(循环通过CAS更新,直至成功);
- 第三步:中断所有的Worker;
- 第四步:将任务队列中的任务转移出来;
- 第五步:调用tryTerminate尝试终止线程池。
- 第六步:返回转移出来的任务列表。
网友评论