一:简介
ThreadPoolExecutor继承AbstractExecutorService,AbstractExecutorService实现了ExecutorService接口,ExecutorService接口主要包含以下方法:
image.png
submit方法返回一个future类型的对象,future表示任务的执行结果,可以调用future.get返回实际的结果。
下面来看下ThreadPoolExecutor的主要变量:
//一个阻塞队列用于存放任务
private final BlockingQueue<Runnable> workQueue;
//可重入锁
private final ReentrantLock mainLock = new ReentrantLock();
//存放任务的集合
private final HashSet<Worker> workers = new HashSet<Worker>();
//锁的状态
private final Condition termination = mainLock.newCondition();
//最大线程数量
private int largestPoolSize;
private long completedTaskCount;
//线程工厂,用来生产线程
private volatile ThreadFactory threadFactory;
//
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
//核心线程池大小
private volatile int corePoolSize;
//最大线程池大小
private volatile int maximumPoolSize;
//拒绝策略
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
还有一个原子变量AtomicInteger,其高三位用来存储线程池状态信息,低三位表示线程池中任务的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
线程池一共分五种状态,从小到大依次为RUNNING,SHUTDOWN,STOP,TIDYING和TERMINATED。
RUNNING:线程池接受新任务,对新添加的任务进行处理
SHUTDOWN:线程池不接受新任务,但对已添加的任务进行处理
STOP:线程池不接受新任务,不对添加的任务进行处理,并且会中断正在处理的任务
TIDYING:当所有任务已终止,ctl低29位记录的任务数量为0,线程池会变成TIDYING状态,线程池变成TIDYING状态时会执行terminated方法,terminated方法在ThreadpoolExecutor中是空的,用户可以重载该函数在线程池处于TIDYING时做某些特定的处理。
TERMINATED:线程池彻底终止。
二.主要方法:
通过线程池执行一个任务时,调用的是ExecutorService的submit方法,该方法定义在AbstractExecutorService中,如下:
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
由此可见最终是调用execute方法来执行,如下为execute方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果任务数量小于核心线程池大小,则调用addworker方法,addworker方法会创建一个新线程来执行这个任务。该任务为这个线程的第一个任务。addworker方法会检查线程池的状态和worker的数量以防止线程池在不能添加线程的情况下添加线程。
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
这里反复提到worker,到底什么时worker呢?worker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//rs >= SHUTDOWN: 如果线程池的状态为ruuning则允许线程加入,如果是shutdown及更之后的状态,则继续看看其他条件
// ! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()): 如果线程池当前状态不是running,但是满足当前状态为shutdown,提交的任务又是null,而且workQueue又是非空的这三个条件,也允许线程加入
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
首先是一个判段
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
这段代码的含义是只有两种情况线程池允许添加一个线程。
1.线程池状态为running
2.线程池状态为shutdown,任务task为null,且workqueue不为空。添加这样的线程目的是为了消耗workQueue中的线程。
网友评论