线程池核心参数
- corePoolSize
核心线程数,当有任务提交的时候,便会创建一个线程,如果创建的线程数量达到核心线程数,后续任务便会放入阻塞队列中。可以使用prestartAllCoreThreads()提前创建所有核心线程数。 - maximumPoolSize
最大线程数。当阻塞队列已满,后续还有任务进入的时候,便继续创建线程,最大上限是maximumPoolSize。 - keepAliveTime
表示线程的存活时间。当空闲线程数大于corePoolSize时所存活的时间。 - unit
存活时间的单位 - workQueue
等待被执行任务的阻塞队列
ArrayBlockingQueue:数组形式的有界阻塞队列,FIFO
LinkedBlockingQueue:链表形式无界阻塞队列,FIFO
SynchronousQueue:不存储元素的阻塞队列,插入操作必须等待另外一个线程调用remove操作。
priorityBlockingQueue:优先队列 - ThreadFactory
线程工厂,可以自定义线程名。 - RejectedExecutionHandler
队列满之后的处理策略
AbortPolicy:直接抛异常
CallerRunsPolicy:使用调用线程来执行任务,也就是主线程
DiscardOldestPolicy:抛弃最前的任务并执行当前任务
DiscardPolicy:直接丢弃任务
线程池种类
- newFixedThreadPool
使用LinkedBlockingQueue作为阻塞队列,corePoolSize与maximumPoolSize相等。 - newSingleThreadExecutor
线程池只有一个线程,使用LinkedBlockingQueue。 - newCacheThreadPool
corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,使用SynchronousQueue作为阻塞队列。 - newScheduledThreadPool
通过new ScheduledThreadPoolExecutor(corePoolSize)的方式进行创建。maximunPoolSize为Integer.MAX_VALUE,使用DelayedWorkQueue。
这里先说说几种线程池的状态
- RUNNING:能够接受新任务,线程池一旦创建就处于running状态。
- SHUTDOWN:不接受新任务,但会处理已添加的任务。调用shutdown接口时,由running状态变成shutdown。
- STOP:不接受新任务,不处理已经被添加的任务。并且会中断正在处理的任务。调用shutdownnow后状态为stop
- TIDYING:所有任务终止。在处于tidying状态后会执行terminated。
- TERMINATED
在ThreadPoolExecutor中,可以看到使用3位表示线程池状态,29位表示线程数。
看下下面的实例
public class ExecutorTest {
public static void main(String[] args) {
Executor executor = Executors.newFixedThreadPool(15);
for(int i=0;i < 30;i++){
executor.execute(new Test());
}
}
static class Test implements Runnable{
public void run() {
System.out.println(Thread.currentThread().getId()+" "+Thread.currentThread().getName());
}
}
}
跟进execute看看。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 1. 如果少于corePoolSize的线程在运行,则尝试对第一个任务开启新
* 线程。对addWorker的调用会自动检测runState和workerCount,以防
* 止在不该添加线程的时候添加线程出现错误警告。
* 2. 如果一个任务能够成功入队,仍然需要对是否增加线程做双重检查
* 或者可能从进入该方法后线程池就shutdown了。所以需要对state做
* 再次检查,如果有必要的话一旦pool停止,就需要回滚入队操作,或
* 者当没有在没有任何线程的情况下去创建一个新的。
* 3. 如果我们不能让任务入队,我们将尝试增加一个线程。如果失败
* 了,将尝试新增一个线程。如果失败了,便知道pool处于shut down
* 或者饱和状态,必须要拒绝任务。
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
- 首先计算当前的线程数,如果小于核心线程数,则执行addWorker增加线程数。如果没有则进入下面一个判断。
- 判断线程是否处于running状态,如果是则尝试offer这个任务,offer是会直接返回true或者false。如果入队成功则做二次检查,并继续判断如果不处于running状态就需要删除刚才加入的任务,并调用reject方法。如果处于running,并且线程数为0则进行addWorker操作。这里为什么addWorker后面的参数是false到具体方法再讲。
- 如果不是running状态或者入队失败,则尝试添加任务,如果再失败则采取拒绝策略。
那么来看看addWorker代码
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- 先判断pool的状态,如果大于等于shutdown,或者等于shutdown的时候有任务传入或是workQueue为空都会直接返回fasle。shutdown不接受新任务,如果队列为空肯定也是要返回false,不为空的话shutdown状态需要把剩余的任务处理完。
- 接着一个循环判断线程数,capacity是2的29次方-1,也就是最大线程数。注意到core参数,会传入true或者是false,这里用来选择使用core还是maximum来判断。如果core是true,wc大于等于核心数,则添加失败。如果core是false,则wc与maximum做判断。所以如果wc小于核心线程则跳出循环去创建线程。
- 创建线程通过可重入锁和Worker类配合进行。Worker类继承AbstractQueuedSynchronizer并实现了Runnable接口。
代码中有new Worker(Task),先看看构造
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
这里使用newThread,将worker自己传入,因为其实现了Runnable,所以启动线程的时候会调用其run方法,而其run方法中调用了runWoker。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
这里通过Worker加锁的方式,在run方法之前做一个自定义的beforeExecute处理,接着执行任务的run方法,最后执行afterExecute。
再看上述判断中的getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
这里有个allowCoreThreadTimeout,这是线程池提供是否允许核心线程Timeout的一个设置属性。如果允许或者wc大于核心线程数,则time为true,使用workQueue.poll来获取任务,如果队列为空则会返回null,因为是允许超时所以会等待keepalivetime。如果是false,则使用workQueue.take,如果为空则会阻塞。
网友评论