线程池,关注如何缩短频繁线程创建和销毁的时间,通过线程复用技术,减少非核心任务的时间损耗(创建和销毁的时间),提高程序性能。它的主要原理是采用阻塞任务队列实现线程复用的方案。ThreadPoolExecutor是线程池的具体实现类,它的继承关系图。
ThreadPoolExecutor继承关系图.jpg线程池类型
源码提供了三种类型的线程池,通过不同的参数设置,缓存线程池、单线程线程池、固定数量线程的线程池。ThreadPoolExecutor的构造方法。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
这几个参数决定线程池类型,核心参数。
corePoolSize,核心线程数。
maximumPoolSize,线程池的允许的最大线程,超过会报异常。
keepAliveTime,非核心线程允许存留时间,(保持活跃性的时间)。
TimeUnit,时间度量参数。
BlockingQueue<Runnable>,任务队列,可以使配置成无限或有限队列或栈。
ThreadFactory,线程工厂,创建线程优先级以及统计线程数量等。
线程池的对任务的控制流程
- 线程数量<corePoolSize,新建线程,处理任务。
- 线程数量>=corePoolSize,将任务放入workQueue队列,若有核心线程空闲,从workQueue队列取任务处理。
- 当任务队列有限且已满,再次新建线程处理任务,这时,要保证总量不超过最大允许值,否则,导致RejectedExecutionHandler异常。
- 再次新建的线程是非核心线程,空闲时最大存留keepAliveTime时间。
三种线程池是Executors的静态方法创建。
缓存线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
核心线程数是0,允许的最大线程无限,任务队列SynchronousQueue不做具体存储,线程活跃时间60秒。
每个任务先到SynchronousQueue队列,它其实一个管道,不保存,若不存在空闲线程则新建,新建线程数量永远不会超过允许的最大值。
若一开始并发任务较多,会创建不少线程,每个线程任务完成后,变空闲线程,空闲时间未达到60s,可重用空闲线程处理新进任务,线程最大数量不限。即可以保障任务繁重时,空闲线程可复用,又能办证在没有任务时,保持一定时间后消失。
单线程线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
核心线程是1,允许的最大线程是1,任务队列LinkedBlockingQueue,不设空闲时间,无界队列。
只有一个核心线程处理任务,新任务入队列。若队列是有界的,多并发任务状态下队列总有满的时候,若队列满了就得新建临时线程,肯定会超允许的最大线程,报异常,因此,队列必须无界。
固定数量线程的线程池
Executors#newFixedThreadPool方法创建固定数量线程的线程池。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory
threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
核心线程与最大线程数量自己配置,任务队列LinkedBlockingQueue,不设空闲时间,无界队列。
任务队列无界,因此,不会创建临时工线程,只有核心线程工作,keepAlieveTime无需设置。
综上所述
允许的最大线程数量与队列必须保证一个是无界的,否则,在高并发条件下导致异常。根据特定的业务场景灵活配置。
工作原理
当线程池ThreadPoolExecutor创建后,我们一般调用它的execute方法,向线程池派发任务,将任务交给线程池,不需要自己再管理,会自动执行。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//小于核心线程
if (addWorker(command, true))//创建核心线程
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//加入队列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))//队列满了,创建非核心临时线程
reject(command);
}
首先,workerCountOf(c)方法,返回工作线程数量,若小于核心线程数量,调用addWorker方法,创建核心线程(设标志位参数),该任务将由新建的核心线程处理。
然后,若工作线程数量>=核心线程,isRunning(c)方法,表示c<0,将任务加入队列,offer方法,加入后返回成功标志。
因为offer是非阻塞方法,也就是说,如果队列已满,将直接返回失败,这时,将调用addWorker方法,创建非核心线程(不设标志位参数)。该任务将由新建的非核心线程处理。
以下是上面代码用到的一些变量与方法。
//AtomicInteger确保高并发整型值自增时线程安全。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;//32-3=29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;//536870911
private static final int RUNNING = -1 << COUNT_BITS;//-536870912
private static final int SHUTDOWN = 0 << COUNT_BITS;//0
private static final int STOP = 1 << COUNT_BITS;//536870912
private static final int TIDYING = 2 << COUNT_BITS;//1073741824
private static final int TERMINATED = 3 << COUNT_BITS;//1610612736
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
ctl的初始值与RUNNING相同,值是-536870912,即11100000 00000000 00000000 00000000。
-1存储的补码是1111...(32个1),向左移动29位,即右边加29个0。
workerCountOf方法将c与CAPACITY进行与操作。
runStateOf方法将c与CAPACITY的取反进行与操作。
CAPACITY是536870911,即00011111 11111111 11111111 11111111。
CAPACITY的取反是11100000 00000000 00000000 00000000。
workerCountOf方法,第一次调用时,ctl初始值与CAPACITY与操作结果是0,若ctl不断自增,与CAPACITY操作的值不断自增1,工作线程初始是0,每增加一个工作线程,ctl自增一次。
runStateOf方法,第一次调用时,ctl初始值与反CAPACITY与操作结果为反CAPACITY。
总之,workerCountOf方法返回工作线程数量,runStateOf方法返回ctl的初始值-536870912。
五个运行状态,RUNNING状态<0,isRunning方法,判断c是否<0,每次新建线程后c值自增1,c初始值-536870912,一般情况下,不足以使达到c>=0,因此,isRunning(c)方法一般返回true。
下面分析一下addWorker创建线程方法。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
...
for (;;) {
int wc = workerCountOf(c);//工作线程数量
if (wc >= CAPACITY ||// 已经等于最大值。
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建线程,绑定任务。
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//上锁
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);//任务放入集合
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true; //后续根据此标志启动线程。
}
} finally {
mainLock.unlock();//解锁
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
两层死循环,若工作线程>CAPACITY容量,或>=允许的最大值(创建核心线程>=核心线程数量),返回失败。自增ctl,跳出循环。创建一个Worker对象,构造方法。
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
它是Runnable类型,封装派发给线程池的Runnable任务,ThreadFactory的newThread方法,创建一个新线程,将Worker类作为任务主体。该类的结构关系图。
线程池的Worker任务关系图.jpg 我们再回到addWorker方法,runStateOf(ctl.get())<0时(一般<0),将Worker任务放入HashSet集合,设置workerAdded标志。然后,根据该标志,调用Thread的start方法,启动线程,设置启动标志workerStarted,表示线程已启动执行任务。最后,返回启动标志。新线程执行Worker任务,触发Worker#run方法。该方法调用runWorker方法。它是外部类ThreadPoolExecutor的方法,入参就是该Worker。
final void runWorker(Worker w) {//处理Worker内的派发任务,在循环中进一步访问任务队列。
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
....
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
该方法中,新建线程借助Worker任务,开始为线程池工作,内部Runnable是用户需要完成的任务。在while循环中,第一个优先处理execute派送,Worker内部的线程池任务,完成后,线程也不会结束,而是getTask方法,继续从任务队列中获取务,如果getTask返回空,结束线程,否则,继续执行,该方法可能会阻塞。
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
...//队列空时,返回null。
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
如果工作线程数量wc>核心线程,设置timed标志。从队列取任务采用阻塞等待一定时间的poll方式,等待时间设置线程存活时间keepAliveTime,因此,即使队列没有任务,线程仍然存活,(保证任务来到可立即开始工作)。
如果当工作线程数量wc<核心线程,采用一直阻塞的take方案,队列是空时线程一直阻塞,核心线程不会死亡。
如果队列是空,poll时间已到,设置timeOut超时标志,进入下次循环,这时,如果再发生工作线程数量wc>核心线程,会使得timed和timedOut标志同时存在,此时,工作线程数量自减,返回空,退出while循环,线程结束。如果工作线程数量wc<核心线程(仅有核心线程),工作线程不会自减,for循环继续,阻塞在take查询任务。
如果设置核心线程TimeOut,也会采用poll方式,存活时间一到,队列无任务,即使wc数量<核心线程,线程也会退出,允许核心线程死亡。
每一个新建的线程在该方法的执行逻辑是相同。根据当前线程数量和超时标志决定从任务队列的获取方法是否阻塞。
总结
线程池的本质,每一个线程在完成派发任务后,并未结束,继续访问任务队列。根据当前线程数量,利用任务队列的阻塞特性,实现线程的存留时间。通过留存工作线程消费,不新建线程,实现任务的线程复用。
任重而道远
网友评论