前言:
ThreadPoolExecutor类是Executor框架的核心,负责创建线程池,执行线程池任务,下图为ThreadPoolExecutor整体架构!
1:ThreadPoolExecutor类:
这个类我们常用来创建线程池,我们来深八一下底层到底是怎么实现的。要想了解此类的实现我们首先需要了解一下ThreadPoolExecutor,而且实际上《阿里编程规约》里面要求我们用ThreadPoolExecutor而不是Executors类来创建线程池以做到更精确的配置。
ThreadPoolExecutor类的成员变量:
//高三位表示状态,低29位表示有效线程数量,所以线程池最多可容纳线程为2的29次方-1
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程数量的最大比特位数
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
//线程池5种状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 打包线程状态和线程数量,以及解包还原两个数值
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
//阻塞队列用于存放线程执行的任务
private final BlockingQueue<Runnable> workQueue;
//用于保存线程池里的线程,访问该Set需要mainLock锁
private final HashSet workers = new HashSet();
//用于创建线程的线程工厂,该工厂调用addWorker生产线程
private volatile ThreadFactory threadFactory;
//线程池保存的最小存活线程数量
private volatile int corePoolSize;
//线程池最大存活的线程数量
private volatile int maximumPoolSize;
//线程池饱和或者线程池关闭时的处理策略,默认为拒绝接受任务
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
两个关键字段:workerCount和runState,前者表示当前线程池里有效线程数量,后者表示当前线程池的状态。其他字段注解里已经说明。我们来重点关注下worker,即真正干活的线程长什么样:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
private static final long serialVersionUID = 6138294804551838833L;
//这里的Thread就是真正干活儿的工人了
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//调用Thread里面Run方法的方法
public void run() {
runWorker(this);
}
// 0代表非锁状态,1代表锁定状态,这个在熟悉不过了,AQS框架里的资源
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
上述代码就是Worker类的全貌了,他实际上是AQS类的儿子,里面自定义实现了lock和unlock方法,设置独占AQS的state资源。最重要的就是里面组合了Thread,该Worker的Thread有ThreadFactory工厂生产,并通过run方法实现线程的执行。
认识了这些worker了我们就得考虑怎么把这些worker放到池子里:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//我是分割线,我的上面是判断当前线程池是否满足添加线程的条件,不满足当然加不了喽
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
不要被上面冗长的代码吓坏了,分割线以上是判断当前线程池的状态是否满足添加新线程的条件。下面才是真正添加线程的代码,而且下面的代码也很好理解:首先调用Worker构造方法利用其内部的ThreadFactory创建一个线程并把任务塞给这个新线程;然后把新线程的模型worker加入到内部的worker的Set中、改变线程池的一些属性,由于这个过程不是线程安全的(workers实际是HashSet)所以采用了锁;最后调用这个线程的start方法真正启动该线程,线程启动肯定是执行Runnable的run方法啊,run 方法在哪?好好看看Worker类(实现了Runnable)。如果添加失败了没关系,还是锁定操作worker的set集合。
好了,上面就是我们的线程池模型,现在工人也有了,工人也被我们添加到池子里去了,下面应该让工人干活儿了,工人干活儿的方法是:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
DougLea真是烦人呐,又搞了这么长的代码,代码随长,还是很清晰的。线程start之后执行了Worker类中的run方法,而run方法内部实际上调用的就是runWorker方法。该方法的关键在于while循环,循环条件是firstTask或者从BlockingQueue中取的task不为空的时候执行,该方法给我们留了两个钩子,即beforeExecute和afterExecute,如果我们实现一个子类可以实现这两个方法给自己定制。真正执行的部分在于task.run()。
上面代码我们关注getTask()方法:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
这个方法的关键在于这句话Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();这句话就是阻塞队列取元素的方法,所以该方法是线程安全的。
OK,上面就是ThreadPoolExecutor的具体实现,然而该类对外部暴露的api最常用的则是构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造方法分为很多中,每种可以实现不同的线程池,《阿里巴巴编程规约》里要求我们直接用构造方法定制一定数目的线程池。
OK,以上我们解决了构造函数的问题,现在我们看一下线程池是怎么执行的:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
*如果池子里线程数量小于最小线程数,那么新起一个线程并把command当作第 一个任务,
*如果任务成功加入队列中,我们仍需要检测是否有线程加入workers里了
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
以上代码最重要的部分就是addworker,实际上首先保证有足够线程,如果没有就创建线程执行任务(不是创建core个线程哦)如果有就把任务放到BlockingQueue中等待线程去拿任务执行。
最后看看线程池的关闭:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
//中断线程池里没有执行任务并且等待任务的线程,调用的是t.interupt方法
interruptIdleWorkers();
//给定时线程池留的钩子
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public ListshutdownNow() { List tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
//直接中断所有线程,调用的是interrupt方法
interruptWorkers();
//把没执行完的任务放到List里面返回
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
两个关闭线程池方法略有区别,shutdown方法只改变线程池状态为SHUTDOWN,这个时候线程池拒绝接受任务但是会执行完原有的任务。shutdownNow直接关闭线程池,即不接受任务也不执行原来的任务。停止线程调用的都是Thread.interruput方法!!!!
本篇实际上漏掉了AbstractExecutorService里的一些submit方法,没关系,我们在下一篇ScheduledThreadPoolExecutor中补回来!
网友评论