任务提交
线程池的创建仅仅只是创建了线程池的对象,要想线程池运行起来需要我们从提交给线程池。本章只分析任务提交给work处理。work内部处理留在下一章。
回顾线程池如何处理任务
image前面说到一个线程需要运作起来需要任务。当一个订单被指派到工厂时是如何运作的呢?
1 当员工人数 < corePoolSize,每接到一个任务就会去雇佣一个新员工来完成这个任务.对于线程池来说就创建一个work标记为core work线程执行接收的任务
2 如果 (员工人数> corePoolSize) 且(员工人数 < maximumPoolSize),将任务放入流水线,不在雇佣员工。对于线程来说线程会将任务放入工作队列。不在创建新的work.
3 如果情况2中流水线容量满了,说明当前任务已经超负荷。需要雇佣新员工来处理新的任务。对于线程池来说就创建一个work来执行新任务
4 如果雇佣的员工已达到上线maximumPoolSize,且流水线容量也满,则新任务只好让售前拒绝。对于线程池来说就是交给RejectedExecutionHandler处理。
[图片上传失败...(image-533abc-1562078216202)]
execute 本质是提交一个任务给线程池执行。
public interface Executor {
void execute(Runnable command);
}
ThreadPoolExecutor源码实现:
执行任务 execute
/**
* 执行任务
*/
public void execute(Runnable command) {
/** 提交任务为null 抛出异常。 **/
if (command == null)
throw new NullPointerException();
/** 获取ctl **/
int c = ctl.get();
/** work线程数量少于corePoolSize **/
if (workerCountOf(c) < corePoolSize) {
/** 创建新work线程并设置为核心线程执行任务 addWorker(command, true) **/
if (addWorker(command, true))
return;
c = ctl.get();
}
/** 进入此逻辑表示work线程数量大于corePoolSize或者前一步执行失败 **/
/** 判断线程池是Running运行状态,将任务添加到workQueue尾部成功(队列满了返回false) **/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
/** Double Check下当前线程状态是不是Running运行状态,不是就删除刚刚添加的任务,执行拒绝任务 **/
if (! isRunning(recheck) && remove(command))
reject(command);
/** 异常情况 前面workerCountOf(c) < corePoolSize说明当时还存在大量work,说明线程池突然停止,为保证任务都能处理,
* 创建一个临时work去处理当前workQueue中的任务 **/
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/** 队列满了,创建一个非核心work执行新添加任务 **/
else if (!addWorker(command, false))
/** 执行失败,执行拒绝任务 **/
reject(command);
}
创建work执行任务 addWorker
校验
addWorker 返回boolean说明addWorker存在校验的逻辑
校验线程池的状态
/**
* rs < SHUTDOWN 正常运行状态,通过校验
* rs > SHUTDOWN情况下,线程池状态处于,STOP、TIDYING、TERMINAT* ED状态都不接收新任务 退出
* 当rs == SHUTDOWN 需要 firstTask == null && !workQueue.isEmpty() 表示创建一个work执行空任务。就是去执行任务队列任务可以通过校验
* 其他情况 退出
* **/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
校验执行的线程数
/** 使用CASwork数量+1 **/
for (;;) {
/** 获取当前work数量 **/
int wc = workerCountOf(c);
/** 核心work数量大于corePoolSize,总work大于maximumPoolSize直接返回 **/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/** worker + 1,成功跳出retry循环 **/
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
/** 如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断 **/
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
创建work并执行任务
/** 创建work并执行任务 **/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
/** 实例化:Worker,并分配任务firstTask **/
w = new Worker(firstTask);
final Thread t = w.thread;
/** work中工作线程不为null **/
if (t != null) {
/** 获取主锁:mainLock **/
final ReentrantLock mainLock = this.mainLock;
/** 加锁 **/
mainLock.lock();
try {
/** 获取当前线程池状态 **/
int rs = runStateOf(ctl.get());
/** 当前线程池状态为运行,或当前线程池状态为SHUTDOWN,提交是null任务通过校验。
*
* **/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//将创建的work添加到workers集合中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
/** 释放锁 **/
mainLock.unlock();
}
/** 创建成功,启动work执行任务 **/
if (workerAdded) {
/** 启动work **/
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
/** 失败创建work只能当前线程池状态不是运行状态 **/
addWorkerFailed(w);
}
return workerStarted;
添加work失败
在addWorker()方法中,如果线程t==null,或者在add过程出现异常,会导致workerStarted == false,那么在最后会调用addWorkerFailed()方法:
/**
* 失败创建work只能当前线程池状态不是运行状态
*/
private void addWorkerFailed(Worker w) {
/** 获取主锁:mainLock **/
final ReentrantLock mainLock = this.mainLock;
/** 加锁 **/
mainLock.lock();
try {
/** 尝试从workers删除 **/
if (w != null)
workers.remove(w);
/** 将work数量-1 **/
decrementWorkerCount();
/** 尝试将线程池状态设置为Terminate **/
tryTerminate();
} finally {
/** 释放 **/
mainLock.unlock();
}
}
尝试将线程池状态设置为Terminate
/**
* 尝试将线程池状态设置为Terminate
*/
final void tryTerminate() {
for (;;) {
int c = ctl.get();
/**
* 判断线程池能否进入TERMINATED状态
* 如果以下3中情况任一为true,return,不进行终止
* 1、还在运行状态
* 2、状态是TIDYING、或 TERMINATED,已经终止过了
* 3、SHUTDOWN 且 workQueue不为空
* 4
*/
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
/** 线程池workQueue不为空 return,并中断workQueue其中一个work**/
/**
* 线程池为stop状态,且还存在work,中断唤醒一个正在等任务的空闲worker,
* 再次调用getTask(),线程池状态发生改变,返回null,work工作线程退出循环
*/
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
/** 获取主锁:mainLock **/
final ReentrantLock mainLock = this.mainLock;
/** 加锁 **/
mainLock.lock();
try {
/** 将线程池状态设置为TIDYING **/
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
/** 释放子类实现 **/
terminated();
} finally {
/** 将线程池状态设置为TERMINATED **/
ctl.set(ctlOf(TERMINATED, 0));
/** 释放锁 **/
termination.signalAll();
}
return;
}
} finally {
/** 释放锁 **/
mainLock.unlock();
}
// else retry on failed CAS
}
}
网友评论