1.构造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
各参数的含义参考线程池-执行机制ThreadPoolExecutor官方文档
2.ctl
The main pool control state, ctl, is an atomic integer packing
two conceptual fields
workerCount, indicating the effective number of threads
runState, indicating whether running, shutting down etc
In order to pack them into one int, we limit workerCount to
(2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
billion) otherwise representable. If this is ever an issue in
the future, the variable can be changed to be an AtomicLong,
and the shift/mask constants below adjusted. But until the need
arises, this code is a bit faster and simpler using an int.
The workerCount is the number of workers that have been
permitted to start and not permitted to stop. The value may be
transiently different from the actual number of live threads,
for example when a ThreadFactory fails to create a thread when
asked, and when exiting threads are still performing
bookkeeping before terminating. The user-visible pool size is
reported as the current size of the workers set.
The runState provides the main lifecycle control, taking on values:
RUNNING: Accept new tasks and process queued tasks
SHUTDOWN: Don't accept new tasks, but process queued tasks
STOP: Don't accept new tasks, don't process queued tasks,
and interrupt in-progress tasks
TIDYING: All tasks have terminated, workerCount is zero,
the thread transitioning to state TIDYING
will run the terminated() hook method
TERMINATED: terminated() has completed
The numerical order among these values matters, to allow
ordered comparisons. The runState monotonically increases over
time, but need not hit each state. The transitions are:
RUNNING -> SHUTDOWN
On invocation of shutdown(), perhaps implicitly in finalize()
(RUNNING or SHUTDOWN) -> STOP
On invocation of shutdownNow()
SHUTDOWN -> TIDYING
When both queue and pool are empty
STOP -> TIDYING
When pool is empty
TIDYING -> TERMINATED
When the terminated() hook method has completed
Threads waiting in awaitTermination() will return when the
state reaches TERMINATED.
Detecting the transition from SHUTDOWN to TIDYING is less
straightforward than you'd like because the queue may become
empty after non-empty and vice versa during SHUTDOWN state, but
we can only terminate if, after seeing that it is empty, we see
that workerCount is 0 (which sometimes entails a recheck -- see
below).
原子整数类型的ctl打包了两个概念:
- workerCount,表示有多少有效线程
- runState,表明线程池的状态
workerCount占用29位,总数限制在(2^29)-1,如果未来要扩展,可以使用AtomicLong。
workerCount表示的允许启动但不允许停止的工作线程数量。该值可能与实际的活跃线程数短暂不同,例如,当ThreadFactory无法创建线程时,或者在终止前线程仍在执行簿记。
runState提供主要的生命周期控制,可取下面的值:
- RUNNING:接收新任务,处理队列中的任务
- SHUTDOWN:不接收新任务,但处理队列中的任务
- STOP:不接收新任务,也不处理队列中的任务,并且中断正在执行的任务
- TIDYING:所有的任务已终止,并且workCount为,处于TIDYING的任务会执行terminated()钩子方法
- TERMINATED:terminated()执行完毕
这些值之间的数字顺序很重要,以允许有序比较。 runState随着时间的推移单调增加,但不需要命中每个状态。
- RUNNING -> SHUTDOWN
调用shutdown(),可能在finalize中调用 - (RUNNING or SHUTDOWN) -> STOP
调用shutdownNow() - SHUTDOWN -> TIDYING
当队列和线程池都为空 - STOP -> TIDYING
当线程池为空 - TIDYING -> TERMINATED
当钩子方法terminated()
awaitTermination()中等待的线程在状态到达TERMINATED时会返回。
检测从SHUTDOWN到TIDYING的转换不如你想要的那么简单,因为在非空状态下队列可能变空,反之亦然。只能在看到workerCount为0时才能终止。(在看到为空之后,有时需要重新检查)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
- 111 —— RUNNING
000 —— SHUTDOWN
001 —— STOP
010 —— TIDYING
011 —— TERMINATED
3.execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- step1.如果运行线程少于corePoolSize,则新建线程。addWorker会自动检查runState和workerCount
- step2.如果任务成功入队,需要双重检查是否需要增加一个线程(上次检查后有线程死了)或者线程池是否已经关闭。
如果线程关闭则回滚入队;
如果没有线程,则新建一个线程。 - step3.如果任务不能入队,则尝试新加一个线程
- step4.如果线程池关闭或者饱和,则拒绝任务
3.1 addWorker
/**
* Checks if a new worker can be added with respect to current
* pool state and the given bound (either core or maximum). If so,
* the worker count is adjusted accordingly, and, if possible, a
* new worker is created and started, running firstTask as its
* first task. This method returns false if the pool is stopped or
* eligible to shut down. It also returns false if the thread
* factory fails to create a thread when asked. If the thread
* creation fails, either due to the thread factory returning
* null, or due to an exception (typically OutOfMemoryError in
* Thread.start()), we roll back cleanly.
*
* @param firstTask the task the new thread should run first (or
* null if none). Workers are created with an initial first task
* (in method execute()) to bypass queuing when there are fewer
* than corePoolSize threads (in which case we always start one),
* or when the queue is full (in which case we must bypass queue).
* Initially idle threads are usually created via
* prestartCoreThread or to replace other dying workers.
*
* @param core if true use corePoolSize as bound, else
* maximumPoolSize. (A boolean indicator is used here rather than a
* value to ensure reads of fresh values after checking other pool
* state).
* @return true if successful
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
根据当前线程池状态和给定限界(core和max)检查是否可以添加新工作线程。
- 如果可以,则相应地调整线程数,创建并启动新工作线程,将firstTask作为第一个任务运行。
- 如果池已经停止或者关闭已经准备好了,则此方法放回false()
- 如果线程工厂无法创建线程,也会返回false。
如果线程创建失败,要么是因为线程工厂返回null,要么是因为异常(Thread.start()发生OutOfMemoryError),会进行回滚。
整体流程:
- step1.线程池状态为> SHUTDOWN 或者 = SHUTDOWN时,队列已经为空,返回false;
线程数已经不允许创建了,返回false。 - step2.先对ctl的线程数进行修改
- step3.新增线程Worker,第一个任务为firstTask,并加入workers
- step4.如果添加线程失败,需要回滚清理
/**
* Lock held on access to workers set and related bookkeeping.
* While we could use a concurrent set of some sort, it turns out
* to be generally preferable to use a lock. Among the reasons is
* that this serializes interruptIdleWorkers, which avoids
* unnecessary interrupt storms, especially during shutdown.
* Otherwise exiting threads would concurrently interrupt those
* that have not yet interrupted. It also simplifies some of the
* associated statistics bookkeeping of largestPoolSize etc. We
* also hold mainLock on shutdown and shutdownNow, for the sake of
* ensuring workers set is stable while separately checking
* permission to interrupt and actually interrupting.
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
4.Worker
Class Worker mainly maintains interrupt control state for
threads running tasks, along with other minor bookkeeping.
This class opportunistically extends AbstractQueuedSynchronizer
to simplify acquiring and releasing a lock surrounding each
task execution. This protects against interrupts that are
intended to wake up a worker thread waiting for a task from
instead interrupting a task being run. We implement a simple
non-reentrant mutual exclusion lock rather than use
ReentrantLock because we do not want worker tasks to be able to
reacquire the lock when they invoke pool control methods like
setCorePoolSize. Additionally, to suppress interrupts until
the thread actually starts running tasks, we initialize lock
state to a negative value, and clear it upon start (in
runWorker).
Worker类主要维护线程的中断控制状态,以及其他小型簿记。该类继承AQS简化了获取和释放围绕每个任务执行的锁。这可以防止意图唤醒等待任务的工作线程而不是中断正在运行的任务的中断。实现了一个简单的非重入互斥锁而不是使用ReentrantLock,因为我们不希望工作任务在调用setCorePoolSize等池控制方法时能够重新获取锁。 另外,为了在线程实际开始运行任务之前禁止中断,我们将锁状态初始化为负值,并在启动时清除它(在runWorker中)。
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
4.1 Worker.run -> runWorker
/**
* Main worker run loop. Repeatedly gets tasks from queue and
* executes them, while coping with a number of issues:
*
* 1. We may start out with an initial task, in which case we
* don't need to get the first one. Otherwise, as long as pool is
* running, we get tasks from getTask. If it returns null then the
* worker exits due to changed pool state or configuration
* parameters. Other exits result from exception throws in
* external code, in which case completedAbruptly holds, which
* usually leads processWorkerExit to replace this thread.
*
* 2. Before running any task, the lock is acquired to prevent
* other pool interrupts while the task is executing, and then we
* ensure that unless pool is stopping, this thread does not have
* its interrupt set.
*
* 3. Each task run is preceded by a call to beforeExecute, which
* might throw an exception, in which case we cause thread to die
* (breaking loop with completedAbruptly true) without processing
* the task.
*
* 4. Assuming beforeExecute completes normally, we run the task,
* gathering any of its thrown exceptions to send to afterExecute.
* We separately handle RuntimeException, Error (both of which the
* specs guarantee that we trap) and arbitrary Throwables.
* Because we cannot rethrow Throwables within Runnable.run, we
* wrap them within Errors on the way out (to the thread's
* UncaughtExceptionHandler). Any thrown exception also
* conservatively causes thread to die.
*
* 5. After task.run completes, we call afterExecute, which may
* also throw an exception, which will also cause thread to
* die. According to JLS Sec 14.20, this exception is the one that
* will be in effect even if task.run throws.
*
* The net effect of the exception mechanics is that afterExecute
* and the thread's UncaughtExceptionHandler have as accurate
* information as we can provide about any problems encountered by
* user code.
*
* @param w the worker
*/
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
从队列中反复获取任务并执行,同时解决许多问题:
- 1.从一个初始任务开始,该情况不需要获取任务。否则只要线程池还在运行,就会从getTask获取任务。如果返回null,线程会退出因为池状态改变或者配置参数改变。其他退出由于外部代码抛出异常引起, completedAbruptly,通常导致processWorkerExit 替换此线程
- 2.在运行任务前,获取锁防止其他池中断正在执行的任务。确保除非池停止,否则此线程不会设置中断
- 3.每个任务运行前都会调用beforeExecute,可能会抛出异常,会导致线程死亡(completedAbruptly为true打破循环),不会执行任务
- 4.假设beforeExecute正常完成,运行任务,收集任何抛出的异常以发送到afterExecute。分别处理RuntimeException、Error和任意Throwables。因为我们不能在Runnable.run中重新抛出Throwables,所以我们将它们包含在出错的Errors(线程的UncaughtExceptionHandler)。任何抛出的异常也会保守地导致线程死亡。
- 5.在task.run完成之后,我们调用afterExecute,这也可能引发异常,这也会导致线程死亡。即使task.run抛出异常,该异常也有效。
注意w.lock会忽略掉中断,在执行每个任务期间,都需要加锁,因此任务执行是不会被中断打断的:
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
5.从队列中取任务
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait, and if the queue is
* non-empty, this worker is not the last thread in the pool.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
根据当前配置,执行阻塞或定时等待任务,如果此线程由于以下情况而必须退出时,则返回null:
- 1.多于maximumPoolSize线程(因为调用了setMaximumPoolSize)
- 2.线程池停止了
- 3.池关闭,且队列为空
- 4.线程等待任务超时,超时线程将被终止(allowCoreThreadTimeOut || workerCount > corePoolSize),并且队列非空,线程不是池中最后一个线程
6.shutdown和shutdownNow
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
通过w.tryLock()来判断该线程是否空闲,因为线程执行任务时会加锁。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
shutdownNow比较粗暴,直接给所有未中断线程设置中断标识。
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
将任务从阻塞队列中删除,放到链表中返回。
7.总结
7.1 线程池有四个核心对象
- 首先就是线程池本身:ctl状态管理、执行任务execute及submit、线程增加和缩减、以及shutdown和shutdownNow操作
- 线程本身Worker,本身实现了AQS,在执行任务加锁,屏蔽了中断
- 阻塞队列,从队列中取任务getTask
- 任务
7.2 任务两级调度模型
7.3 Executor框架的三大部分
- 1.任务——Runnable Callable
- 2.任务的执行——ThreadPoolExecutor、ScheduledThreadPoolExecutor
-
3.异步计算的结果——Future(FutureTask)
网友评论