线程池解决的问题
1.为执行大量数据提供改进的性能异步任务
2.提供了一种限制和管理资源的方法, 也包括执行任务集合时消耗的线程
3.每个ThreadPoolExecutor
还维护了一些基本的统计数据, 比如完成的任务的数量
当提交新任务[execute(Runnable)]
1.当前运行中的线程数少于 corePoolSize
, 会创建新线程处理任务[即使其他工作线程是空闲状态]
2.当前运行汇总的线程数大于 corePoolSize
且少于maximumPoolSize
, 并且任务队列[BlockingQueue<Runnable> workQueue
]满了, 才会创建新的线程
3.设置corePoolSize
和maximumPoolSize
相同, 则创建了一个固定大小的线程池
4.设置 maximumPoolSize=Integer.MAX_VALUE
, 则表示线程池容纳任意值并发任务的数量
5.corePoolSize
、maximumPoolSize
是在构造方法中设置的, 但是也可以通过 setCorePoolSize
,setMaximumPoolSize
动态调整
6.默认情况线程池核心线程创建和开始在有任务提交. 如果需要预先启动线程可以重写prestartCoreThread
, prestartAllCoreThreads
方法.
7.线程通过 ThreadFactory[Executors#defaultThreadFactory]
创建, 创建的线程在同一个线程组[ThreadGroup]
、同一个优先级、非守护状态。通过提供一个自定义的ThreadFactory
, 可以更改线程名、线程组、优先级、守护状态等等。
8.如果通过ThreadFactory.newThread()
方法创建线程失败得到null,Executor
会继续, 但是不会执行任何任务。
保活时间
1.如果池中当前线程超过 corePoolSize
, 超出corePoolSize
的线程的空闲时间超过 keepAliveTime
, 那么这些线程将会终结.
2.通过设置keepAliveTime=Long.MAX_VALUE
取消空闲线程的关闭
3.一般情况keepAliveTime
是控制超出corePoolSize
的线程保活时间, 如果设置allowCoreThreadTimeOut()
则keepAliveTime
也会作用在corePoolSize
上
任务队列[BlockingQueue]
1.如果超过corePoolSize
线程在运行, 当有新任务请求, 线程池更倾向将任务加到queue
中
2.如果当前运行的线程数超过 maximumPoolSize
, 并且队列满了, 则新提交的任务将会被拒绝
拒绝策略:
当提交的线程数过多, 并且线程数达到 maximumPoolSize
, 则会执行拒绝策略.通过调用RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)
方法
四种提供的拒绝策略:
1.ThreadPoolExecutor.AbortPolicy
: 抛出一个运行时异常 RejectedExecutionException
[默认拒绝策略]
2.ThreadPoolExecutor.CallerRunsPolicy
: 用执行task的线程执行超出的任务
3.ThreadPoolExecutor.DiscardPolicy
: 饱和task不会被执行, 直接遗弃
4.ThreadPoolExecutor.DiscardOldestPolicy
: 将队列头的任务遗弃[最老的任务抛弃]
查看ThreadPoolExecutor
源代码:
构造方法[单纯的属性赋值]:
其中默认 threadFactory = Executors.defaultThreadFactory()
, handler=new AbortPolicy()
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
参数解析:
// 11100000000000000000000000000000 [高3 位表示线程池状态, 低29 位表示worker数量]
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 536870911
// 0001 1111 1111 1111 1111 1111 1111 1111
// 允许的最大线程数, 即worker 数量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 线程池的5种状态:
// 1110 0000 0000 0000 0000 0000 0000 0000
private static final int RUNNING = -1 << COUNT_BITS;
// 0
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
// 0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
// 0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 获取线程池状态, 按位与
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 获取worker 的数量
private static int workerCountOf(int c) { return c & CAPACITY; }
// 根据线程池状态和线程池worker 数量, 生成ctl 值
private static int ctlOf(int rs, int wc) { return rs | wc; }
任务提交方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 工作中的线程小于corePoolSize 时, 直接创建worker 执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// worker 数量超过核心线程数, 任务直接进入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 线程状态不是RUNNING, 说明执行过shutdown 命令, 需要对新加入的任务执行reject
// 这边为什么需要recheck, 因为任务入队列前后, 线程池的状态可能会发生变化
if (! isRunning(recheck) && remove(command))
reject(command);
// 判断0值, 主要是在线程池构造方法中, 核心线程数允许为0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//
else if (!addWorker(command, false))
reject(command);
}
addWorker()
方法: java.util.concurrent.ThreadPoolExecutor#addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
// 外层自旋
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// (rs > SHUTDOWN) ||
// (rs >= SHUTDOWN && firstTask != null) ||
// (rs >= SHUTDOWN && workQueue.isEmpty())
// 1. 线程池状态大于SHUTDOWN, 直接返回false
// 2. 线程池状态 等于SHUTDOWN, 且firstTask 不为空, 直接返回false
// 3. 线程池状态等于SHUTDOWN, 且队列为空, 直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层自旋
for (;;) {
int wc = workerCountOf(c);
// worker 数量超过容量或超过指定线程容量, 直接返回
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 使用cas 方法增加worker 数量
if (compareAndIncrementWorkerCount(c))
// 执行成功跳出外层自旋
break retry;
c = ctl.get(); // Re-read ctl
// 线程池状态发生变化, 对外层循环进行自旋
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// worker 添加必须串行, 加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
// 重新检查线程状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 线程已经运行
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// worker 创建并添加worker
workers.add(w);
int s = workers.size();
// 更新 largestPoolSize 变量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动worker 线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// worker 启动失败, 说明线程池状态发生了变化, 关闭操作被执行。
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
runWorker()
: java.util.concurrent.ThreadPoolExecutor#runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
网友评论