首先,为什么需要读这段源码呢?其实主要就一点,“知其然,知其所以然”。当理解其中的实现方式,就更加明白该如何去使用。
为什么需要使用线程池?其原因正如像众所周知的,当我们需要不断的执行各种小型的任务时,而创建与销毁线程所带来的成本将影响系统的性能,因此使用线程池来减少线程的创建、销毁。
状态
在此,状态是指线程池的状态。其中状态转换如下图
状态转换当然,在程序中,每个状态都会有一个值来代替。
RUNNING:-1;SHUTDOWN:0;STOP:1;TIDYING:2;TERMINATED:3
并将所有的值左移29位。
在程序中用ctl来表示当前线程池的状态。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl是一个原子变量,后续的每一个操作都会使用CAS来进行原子操作。当然我们还需要记录当前线程池中启动的线程数,这个线程数也是通过ctl来记录的。
这儿有一个巧妙的设计方式,将ctl的32位比特位分开,将其中前3位用来表示线程池的状态,而后面的29位用来记录当前线程池的线程数。其中每次获取线程数或者获取线程的状态的操作都使用位操作来获得。
构造方法
public ThreadPoolExecutor(int corePoolSize,//核心线程上限数量
int maximumPoolSize,//最大线程数
long keepAliveTime,//允许线程空闲等待时间(允许setKeepAliveTime()修改)
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//任务等待队列
ThreadFactory threadFactory,//线程构造工厂
RejectedExecutionHandler handler) {//任务拒绝方案
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)//校验参数
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();//初始化上下文
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
其中需要注意的有两点。第一点,最大线程数代表的是在线程池中执行任务所允许的最大线程数。在线程池中,分为两种线程,一种是“固定工作人员”线程,另一种是“临时工”,只是在线程池有压力的情况下才会请用“临时工”。而最大线程数代表的是所有允许工作的线程数,核心线程上限数量为最多允许的“固定工作人员”线程数。
第二点,在构造方法中,有允许默认线程构造方式,以及默认拒绝方案等。默认拒绝方案是接受任务时,不做处理,直接抛出异常。拒绝方案总共:CallerRunsPolicy、AbortPolicy、DiscardPolicy、DiscardOldestPolicy。
CallerRunsPolicy:默认创建线程执行任务,除非执行器关闭
AbortPolicy:默认执行方案;不执行任务,直接抛出异常
DiscardPolicy:什么也不做。Does nothing
DiscardOldestPolicy:将任务队列的队头任务丢弃,再执行尝试加入队列。除非执行器关闭
execute
execute方法为线程池接收任务的方式。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
其中整体思路也是比较简单的明了的。
1、首先,校验运行线程数小于核心数,尝试增加新的核心线程直接运行任务,结束;
2、如果线程池在运行状态,尝试加入等待队列;加入成功后再次校验运行状态:非运行状态则移除任务并拒绝任务(拒绝任务时会执行拒绝方式);如果线程池没有运行的线程,则加入一个新的非核心线程(运行任务将从队列中去获取);
3、对于上一点开始的判断不满足,则再次尝试加入一个非核心线程运行当前任务,失败则拒绝任务。
在第2点中为什么需要再次校验线程池中没有线程,并加入线程呢?其中在于并发情况下,当我在加入任务到等待队列时,刚好前面运行的线程都执行完任务,并队列为空,如果没有等待时间或者等待时间很短,那么这些线程都会消亡,那么在加入任务到线程池等待队列后,线程池没有消费任务的线程。
为什么添加“非核心”线程呢?如果添加的是核心线程会有什么不一样吗?其实换个角度想就明白了。核心线程与非核心线程在线程池中有明显的区分吗?并没有。只是有表示核心线程数上限与总线程数上限。加入线程时判断是否加入核心线程,只是判断当前线程总数是否到达核心线程数的限制。而这儿,线程总数是空的,因此加入一个新的线程,不管核心与否都行,但是必须要传这个参数。
当然有一种情况,如果在此处加入非核心线程的前,其他允许核心数的线程加入线程,那么当前线程的加入没有达到最大线程数限制则不会失败,在我看来这儿传true/false本质上是相同的,ThreadPoolExecutor本身具有一定的自适应调节能力。重点是必须要传一个参数。
...
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
...
在第3点中,如果任务加入队列失败且创建了一个新的非核心线程。这种情况代表着当前情况下,目前的生产者的生产数量大于消费者数量,需要增加消费者的数量,等到不需要时,再释放临时工。
其中addWorker函数,参数firstTask代表初始化任务,core代表创建的线程是否核心线程,而返回结果true/false分别代表新建线程加入并启动成功/失败。
addWorker()函数流程shutdown
shutdown函数逻辑如下:
1、先获得自旋锁,并加锁
2、对每个worker检查是否有修改线程权限(需要加锁)
3、循环尝试修改执行状态为SHUTDOWN
4、尝试阻塞每个线程(修改阻塞标志)
5、关闭锁后并调用tryTerminate()
函数
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
tryTerminate方法将在循环中执行下叙流程:
1、判断线程池状态,运行中,TIDYING,或者SHUTDOWN同时等待队列不为空则结束;
2、判断当前线程执行数不为0,阻塞一个线程,结束(只是标识为阻塞);
3、获取锁并加锁;
4、尝试状态位设置为按位或上TIDYING;
5、执行terminated,预留可被重载;
6、状态位设置为按位或上TERMINATED;
7、signalAll,唤醒此lock上等待的所有线程;
8、关闭锁;
shutdownNow
shutdownNow()函数与shutdown()函数的主要区别是,shutdownNow会马上停止线程池,而shutdown不会马上停止线程池,会待目前线程池中的任务执行完成后再停止线程池。
在代码上体现为三点区别:
- shutdownNow修改线程池的状态为STOP状态;
- shutdownNow调用阻塞线程是调用的是interruptWorkers()方法,而shutdown调用的是interruptIdleWorkers()方法。
这两个线程方法具有本质的区别,interruptWorkers会直接将循环所有的线程进行阻塞线程,而interruptIdleWorkers会阻塞没有运行的任务的线程,其中在于后者会拿到Worker的的锁后再调用阻塞; - shutdownNow会直接清空任务队列里没有运行的任务,并返回。
Worker::run
首先,Worker是对于线程的一个封装,其中有当前线程,初始化任务,完成任务数等信息。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
其中逻辑如下:
1、尝试从队列中拿出一个任务;(初始化时任务可能为空,后续将在此不断循环)
2、加锁;
3、判断线程池是否停止,如果停止则中断线程;
4、执行任务;
5、task置空,并将当前线程完成任务数自增,并关闭锁;
6、清理。
在清理中,会判断当前执行任务异常而导致线程结束,则将worker数减1,并执行一些清理性的工作。
getTask在本处也是一个很有趣的方法。其实也很简单,就是循环尝试从任务队列中拿出一个新的任务,如果拿出了就返回,这是一个消费者。
1、如果当前线程池的状态是STOP或者将STOP,则将worker数减1并结束;
2、计算是否有存活时间,以及线程数是否大于允许核心数;
3、根据2的计算结果判断线程是够需要结束,尝试将线程数减1;
4、尝试从队列中拿出任务,poll方法可以超时,并返回(会阻塞等待);
5、没有拿出任务则把timeOut置为true;
其实在这儿就可以明白,keepAliveTime所代表的当线程为空时,线程死亡前的等待时间,其实是设置为线程等待任务队列中拿到任务的超时时间。
总结
在ThreadPoolExecutor通过充分的利用锁以及CAS操作,保证了线程安全性,并且也不会有性能上的限制。并且使用一个变量来直接代表两个意义的使用方式挺惊艳。以及在核心线程与非核心线程上的区分都设计得恰到好处。整体设计方案值得学习。
网友评论