1 ThreadPoolExecutor中的状态
(1)RUNNING:可以接受新任务,可以处理阻塞队列中的任务。
(2)SHUTDOWN:不接受新任务,可以处理阻塞队列中的任务,中断空闲Worker。
(3)STOP:不接受新任务,不处理阻塞队列中的任务,中断所有Worker。
(4)TIDYING:所有任务执行完毕,workerCount等于0,开始执行terminated方法。
(5)TERMINATED:terminated方法执行完毕。
2 ThreadPoolExecutor中的状态转换
(1)RUNNING到SHUTDOWN:调用了shutdown方法。
(2)(RUNNING或SHUTDOWN)到STOP:调用了shutdownNow方法。
(3)SHUTDOWN到TIDYING:阻塞队列中和线程池中没有任务。
(4)STOP到TIDYING:线程池中没有任务。
(5)TIDYING到TERMINATED:terminated方法执行完毕。
3 ThreadPoolExecutor中的部分字段
(1)ctl的高3位代表线程池的状态,低29位代表workerCount。
(2)COUNT_BITS=32-3=29。
(3)CAPACITY=2^29-1=536870911,二进制为00011111111111111111111111111111。
(4)RUNNING的二进制为11100000000000000000000000000000。
(5)SHUTDOWN的二进制为00000000000000000000000000000000。
(6)STOP的二进制为00100000000000000000000000000000。
(7)TIDYING的二进制为01000000000000000000000000000000。
(8)TERMINATED的二进制为01100000000000000000000000000000。
(9)runStateOf方法可以获取线程池的状态。
(10)workerCountOf方法可以获取workerCount。
(11)ctlOf方法可以根据线程池的状态和workerCount获取ctl。
(12)runStateLessThan方法可以比较线程池的两个状态。
(13)runStateAtLeast方法可以比较线程池的两个状态。
(14)isRunning方法可以判断线程池是否处于Running状态。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
(15)workQueue:阻塞队列。
(16)mainLock:访问workers集合、访问largestPoolSize等线程池的统计型变量、执行shutdown方法、执行shutdownNow方法都需要获取该锁。
(17)workers:保存ThreadPoolExecutor.Worker对象的HashSet集合。只有获取mainLock,才能访问该集合。
(18)termination:和mainLock绑定的Condition对象,执行awaitTermination方法和tryTerminate方法时使用该Condition对象。
(19)threadFactory:线程工厂。
(20)handler:拒绝策略。
(21)keepAliveTime:空闲线程的存活时间。
(22)allowCoreThreadTimeOut:是否允许结束空闲的核心线程。如果allowCoreThreadTimeOut等于true,则keepAliveTime必须大于0。
(23)corePoolSize:核心线程数。
(24)maximumPoolSize:最大线程数。
(25)defaultHandler:默认的拒绝策略。
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet<Worker>();
private final Condition termination = mainLock.newCondition();
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
4 ThreadPoolExecutor中的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
4.1 Executors中的defaultThreadFactory方法
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
4.1.1 Executors.DefaultThreadFactory内部类
Executors.DefaultThreadFactory实现了ThreadFactory接口。
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
4.1.1.1 ThreadFactory接口
ThreadFactory接口中只定义了newThread方法。
public interface ThreadFactory {
Thread newThread(Runnable r);
}
5 ThreadPoolExecutor.Worker内部类
ThreadPoolExecutor.Worker继承了AbstractQueuedSynchronizer,实现了Runnable接口。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
// Worker中的线程对象
final Thread thread;
// 线程的第一个任务
Runnable firstTask;
volatile long completedTasks;
// 传入的firstTask可以等于null
Worker(Runnable firstTask) {
// 将state设为-1
// 执行runWorker方法之前,state一直等于-1
// 查看interruptIfStarted方法可以发现,如果state小于0,不允许中断线程对象
setState(-1);
this.firstTask = firstTask;
// 使用线程工厂创建线程对象
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// state等于0代表Worker锁未被占用,state等于1代表Worker锁已被占用
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
6 ThreadPoolExecutor中的execute方法
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取ctl的数值
int c = ctl.get();
// 如果workerCount小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 尝试启动一个新线程,并且将这个线程的第一个任务设为command
// 如果新线程启动成功,直接返回
if (addWorker(command, true))
return;
c = ctl.get();
}
// 运行到这里有两种可能,要么workerCount大于等于核心线程数,要么新线程启动失败
// 如果线程池处于RUNNING状态,并且成功将当前任务command添加到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 成功将当前任务command添加到阻塞队列中之前,其它线程可能尝试关闭线程池,因此需要重新获取线程池的状态
// 如果线程池已不处于RUNNING状态,并且成功从阻塞队列中移除了command
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
// 运行到这里有两种可能,要么线程池还处于RUNNING状态,要么线程池已不处于RUNNING状态但command已开始执行或执行完毕
// 如果workerCount等于0
else if (workerCountOf(recheck) == 0)
// 尝试启动一个新线程,并且将这个线程的第一个任务设为null
addWorker(null, false);
}
// 运行到这里有两种可能,要么线程池不处于RUNNING状态,要么阻塞队列已满
// 尝试启动一个新线程,并且将这个线程的第一个任务设为command
// 如果新线程启动失败,说明线程池不处于RUNNING状态或workerCount已达到最大线程数
else if (!addWorker(command, false))
// 执行拒绝策略
reject(command);
}
6.1 ThreadPoolExecutor中的addWorker方法
// firstTask:线程的第一个任务,可以传入null
// core:线程数量的界限。true代表以corePoolSize作为界限,false代表以maximumPoolSize作为界限
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池处于STOP或TIDYING或TERMINATED状态,直接返回false
// 如果线程池处于SHUTDOWN状态,firstTask不等于null,直接返回false
// 如果线程池处于SHUTDOWN状态,firstTask等于null,阻塞队列中没有任务,直接返回false
// 如果线程池处于SHUTDOWN状态,firstTask等于null,阻塞队列中还有任务,可以创建Worker
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
// 如果workerCount已达到线程数量的界限,直接返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 如果CAS操作成功,说明所有条件都已满足,可以开始创建Worker
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// 如果CAS操作失败,说明还有其它线程正在创建Worker
// 此时需要重新获取线程池的状态。如果状态不变,继续内层循环;如果状态改变,回到外层循环
if (runStateOf(c) != rs)
continue retry;
}
}
// 运行到这里,开始创建Worker
// workerStarted代表是否已启动新创建的Worker中的线程对象
boolean workerStarted = false;
// workerAdded代表是否已将新创建的Worker添加到workers这个HashSet中
boolean workerAdded = false;
Worker w = null;
try {
// 传入firstTask,创建Worker
w = new Worker(firstTask);
// 获取Worker中的线程对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 获取mainLock之前,其它线程可能尝试关闭线程池,因此需要重新获取线程池的状态
// 如果(线程池处于RUNNING状态)或(线程池处于SHUTDOWN状态并且firstTask等于null)
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果已启动Worker中的线程对象
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
// 更新largestPoolSize
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果已将新创建的Worker添加到workers这个HashSet中
if (workerAdded) {
// 启动新创建的Worker中的线程对象
t.start();
workerStarted = true;
}
}
} finally {
// 如果没有启动新创建的Worker中的线程对象
if (! workerStarted)
// 回滚操作
addWorkerFailed(w);
}
return workerStarted;
}
6.1.1 ThreadPoolExecutor中的addWorkerFailed方法
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
// workerCount减1
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
6.1.2 ThreadPoolExecutor.Worker中的run方法
启动新创建的Worker中的线程对象,JVM会自动调用Worker中的run方法。
public void run() {
runWorker(this);
}
6.1.2.1 ThreadPoolExecutor中的runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 将Worker中的state设为0,允许中断线程对象
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// 执行任务前,获取Worker锁
w.lock();
// 如果线程池处于STOP或TIDYING或TERMINATED状态,确保当前线程已被中断
// 如果线程池处于RUNNING或SHUTDOWN状态,确保当前线程未被中断
// interrupted方法会清除当前线程的中断状态
// 如果线程池处于RUNNING或SHUTDOWN状态,interrupted方法返回true,说明在此之前当前线程已被中断,现在当前线程的中断状态已被清除。其它线程可能调用shutdownNow方法,因此需要重新判断线程池是否处于RUNNING或SHUTDOWN状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 中断当前线程
wt.interrupt();
try {
// 空方法,子类中可以重写该方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行Runnable对象中的run方法不会抛出Checked Exception
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 空方法,子类中可以重写该方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 执行任务后,释放Worker锁
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 运行到这里,需要结束当前线程
// 运行到这里有两种可能,要么getTask方法返回null,要么执行任务时抛出了异常或错误
// 第一种情况,completedAbruptly等于false,执行getTask方法时workerCount已减1
// 第二种情况,completedAbruptly等于true,workerCount未减1
processWorkerExit(w, completedAbruptly);
}
}
6.1.2.1.1 ThreadPoolExecutor中的getTask方法
private Runnable getTask() {
// timedOut代表上一次调用poll方法获取任务是否超时
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果(线程池处于STOP或TIDYING或TERMINATED状态)或(线程池处于SHUTDOWN状态并且阻塞队列中没有任务)
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// workerCount减1
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// timed代表如果获取任务超时是否允许结束Worker
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果workerCount大于最大线程数,尝试CAS操作
// 如果(上一次调用poll方法获取任务超时)并且(获取任务超时允许结束Worker)并且(workerCount大于1),尝试CAS操作
// 如果(上一次调用poll方法获取任务超时)并且(获取任务超时允许结束Worker)并且(阻塞队列中没有任务),尝试CAS操作
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 如果CAS操作成功,workerCount减1
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果获取任务超时允许结束Worker,调用poll方法获取任务
// 如果获取任务超时不允许结束Worker,调用take方法获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
// 如果成功获取任务
if (r != null)
return r;
// 运行到这里,说明调用poll方法获取任务超时
timedOut = true;
} catch (InterruptedException retry) {
// 调用poll方法或take方法获取任务时,当前线程可能被中断,此时需要捕获InterruptedException,重置timedOut,重新开始循环
timedOut = false;
}
}
}
6.1.2.1.2 ThreadPoolExecutor中的processWorkerExit方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果completedAbruptly等于true
if (completedAbruptly)
// workerCount减1
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 从workers这个HashSet中移除当前Worker
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
// 如果线程池处于RUNNING或SHUTDOWN状态
if (runStateLessThan(c, STOP)) {
// 如果completedAbruptly等于false,说明getTask方法返回null,需要判断是否尝试启动一个新线程
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果允许结束空闲的核心线程,并且阻塞队列中还有任务
if (min == 0 && ! workQueue.isEmpty())
// 至少保留一个线程
min = 1;
if (workerCountOf(c) >= min)
return;
}
// 尝试启动一个新线程,并且将这个线程的第一个任务设为null
addWorker(null, false);
}
}
6.2 ThreadPoolExecutor中的reject方法
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
6.2.1 ThreadPoolExecutor中的四种拒绝策略
(1)CallerRunsPolicy:如果线程池处于Running状态,直接在提交任务的线程中执行任务;如果线程池不处于Running状态,直接丢弃任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
public boolean isShutdown() {
return ! isRunning(ctl.get());
}
(2)AbortPolicy:抛出异常。ThreadPoolExecutor默认使用该拒绝策略。
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
(3)DiscardPolicy:直接丢弃任务。
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
(4)DiscardOldestPolicy:如果线程池处于Running状态,从阻塞队列中移除头部任务并重新调用execute方法。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
6.3 ThreadPoolExecutor中的remove方法
public boolean remove(Runnable task) {
// 从阻塞队列中移除指定任务
boolean removed = workQueue.remove(task);
tryTerminate();
return removed;
}
网友评论