核心线程池的内部实现
对于核心的几个线程池,newFixedThreadPool
、newSingleThreadExecutor
和newCachedThreadPool
,其内部都使用了ThreadPoolExecutor
实现。
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue());
}
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue())
);
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
ThreadPoolExecutor构造函数:
public ThreadPoolExecutor( int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数含义如下:
- corePoolSize:线程池中的线程数量。
- maximumPoolSize:线程池中的最大线程数量。
- keepAliveTime:当线程池中线程数量超过corePoolSize,多余空闲线程的存活时间。
- unit:keepAliveTime的单位。
- workQueue:任务队列,被提交但是尚未执行的任务。
- threadFactory:线程工厂,用于创建线程,一般使用默认。
- handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。
以上参数重点:workQueue和handler。
workQueue
workQueue
指被提交但尚未被执行的任务队列,它是一个BlockingQueue
接口的对象,用于存放Runnable
对象。
ThreadPoolExecutor
构造函数可使用的几种BlockingQueue
:
- 直接提交的队列:该功能由
SynchronousQueue
对象提供。SynchronousQueue
没有容量,每个插入操作都要等待一个相应的删除操作,每个删除操作都要等待一个对应的插入操作。SynchronousQueue
并不会保存提交的任务,而总是将新提交的任务马上执行(进队马上出队,像通道),如果没有空闲线程,则尝试新建线程,如果线程数量达到最大值,则执行拒绝策略。因此使用SynchronousQueue
时,maximumPoolSize
要设得很大,否则很容易执行拒绝策略。 - 有界的任务队列:有界的任务队列可使用
ArrayBlockingQueue
实现。ArrayBlockingQueue
的构造函数接受一个容量参数,指定队列的容量。若有新任务需要执行,如果线程池的实际线程小于corePoolSize
,则优先创建新的线程,若大于corePoolSize
,则将新任务加入到等待队列。如果队列已满导致无法加入,若总线程数小于maximumPoolSize
,则创建新的线程执行任务,若总线程数大于maximumPoolSize
,则执行拒绝策略。 - 无界的任务队列:无界任务队列可通过
LinkedBlockingQueue
类实现。与有界队列不同,除非系统资源耗尽,否则不存在任务入队失败的情况。当新任务提交,若线程数小于corePoolSize
,线程池会创建新的线程执行任务,若线程数达到corePoolSize
,不再会创建新线程,新任务会被加入到队列。 - 优先任务队列:优先任务队列是带有执行优先级的队列。通过
PriorityBlockingQueue
实现,可以控制任务的执行先后顺序,是一个特殊的无界队列(非FIFO)。
几种线程池使用的workQueue
:
-
newFixedThreadPool
返回一个corePoolSize
和maximumPoolSize
相同,workQueue
为LinkedBlockingQueue
的线程池。因为线程数固定,不存在动态变化,因此maximumPoolSize
可以随便设定。因为使用了无界任务队列,如果任务提交频繁处理不过来,任务队列会膨胀,从而耗尽系统资源。 -
newSingleThreadExecutor
返回一个单个线程的线程池。它和newFixedThreadPool
基本一样,只不过将线程数设置为1
而已。 -
newCachedThreadPool
返回一个corePoolSize
为0,maximumPoolSize
为无穷大,workQueue
为SynchronousQueue
的线程池。该线程池若无任务提交,池中则无线程,若有任务提交,则创建线程执行任务。SynchronousQueue
是直接提交队列,线程池总是会创建线程来执行新提交的任务。当所有的任务都执行完,由于corePoolSize = 0
,也就是池中所有线程都是多余的空闲线程了,那么线程池会在指定时间内(60s)销毁所有线程。
ThreadPoolExecutor线程池核心调度
public void execute(Runnable var1) {
if (var1 == null) {
throw new NullPointerException();
} else {
int var2 = this.ctl.get();
//线程总数小于corePoolSize,创建线程执行任务
if (workerCountOf(var2) < this.corePoolSize) {
if (this.addWorker(var1, true)) {
return;
}
var2 = this.ctl.get();
}
//线程总数大于corePoolSize,将任务加入队列
if (isRunning(var2) && this.workQueue.offer(var1)) {
int var3 = this.ctl.get();
if (!isRunning(var3) && this.remove(var1)) {
this.reject(var1);
} else if (workerCountOf(var3) == 0) {
this.addWorker((Runnable)null, false);
}
//入队失败(有界队列已满,或使用了SynchronousQueue)
} else if (!this.addWorker(var1, false)) {
//线程数达到最大,执行拒绝策略
this.reject(var1);
}
}
}
addWorker(var1, true/false)
的第二个参数为true
时表示线程总数小于corePoolSize
,否则大于corePoolSize
。
拒绝策略
当线程池的线程用完了,同时等待队列也塞满了线程,无法再创建新的线程来执行任务时,需要有一套机制合理地处理这个问题。
JDK内置提供了四种拒绝策略:
-
AbortPolicy
策略:该策略直接抛出异常,阻止系统正常工作。 -
CallerRunsPolicy
策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。这样做不会真正的丢弃任务,但任务提交线程的性能极有可能急剧下降。 -
DiscardOldestPolicy
策略:该策略将丢弃最老的请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。 -
DiscardPolicy
策略:该策略默默地丢弃无法处理的任务,不予任何处理。
以上内置的策略均实现了RejectedExecutionHandler
接口,若以上策略无法满足实际应用,可自己扩展该接口。
网友评论