8.3 配置 ThreadPoolExecutor
ThreadPoolExecutor
为一些 Executor
提供了基本的实现,这些 Executor
是由 Executors
中的 newCachedThreadPool
、newFixedThreadPool
和 newScheduledThreadExecutor
等工厂方法返回的。
如果默认的执行策略不能满足需求,那么可以通过 ThreadPoolExecutor
的构造函数来实例化一个对象,并根据自己的需求来定制。
// 程序清单 8-2
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {}
线程池的基本大小(Core Pool Size)、最大大小(Maximum Pool Size)以及存活时间等因素共同负责线程的创建与销毁。基本大小也就是线程池的目标大小,记载没有任务执行时①线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。线程池的最大大小表示可同时活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。
① 在创建
ThreadPoolExecutor
初期,线程并不会立即启动,而是等到有任务提交时才会启动,除非调用prestartAllCoreThreads
newFixedThreadPool
工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。
newCachedThreadPool
工厂方法将线程池的最大大小设置为 Integer.MAX_VALUE
,而将基本大小设置为零,并将超时设置为 1 分钟,这种方法创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩。
8.3.2 管理队列任务
ThreadPoolExecutor
允许提供一个 BlockingQueue
来保存等待执行的任务。基本的任务排队方法有 3 种:无界队列、有界队列和同步移交(Synchronous Handoff)。队列的选择与其他的配置参数有关,例如线程池的大小等。
newFixedThreadPool
和 newSingleThreadExecutor
在默认情况下将使用一个无界的 LinkedBlockingQueue
。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理它们的速度,那么队列将无限制地增加。
一种更稳妥的资源管理策略是使用有界队列,例如 ArrayBlockingQueue
、有界的 LinkedBlockingQueue
、PriorityBlockingQueue
。有界队列有助于避免资源耗尽的情况发生,但它又带来了新的问题:当队列填满后,新的任务该怎么办?下一节的饱和策略将讨论解决这个问题。
对于非常大或者无界的线程池,可以通过使用 SynchronousQueue
来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue
不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入 SynchronousQueue
中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么 ThreadPoolExecutor
将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是首先放在队列中,然后由工作者线程从队列中提取任务。只有当线程池是无界的或者可以拒绝任务时,SynchronousQueue
才有实际价值。在 newCachedThreadPool
工厂方法中就使用了 SynchronousQueue
。
8.3.3 饱和策略
当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor
的饱和策略可以通过调用 setRejectedExecutionHandler
来修改。(如果某个任务被提交到一个已被关闭的 Executor
时,也会用到饱和策略)JDK 提供了几种不同的 RejectedExecutionHandler
实现,每种实现都包含由不同的饱和策略:AbortPolicy
、CallerRunsPolicy
、DiscardPolicy
和 DiscardOldestPolicy
。
“中止(Abort)” 策略是默认的饱和策略,该策略将抛出未检查的 RejectedExecutionException
。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。
当新提交的任务无法保存到队列中等待执行时,“抛弃(Discard)”策略会悄悄抛弃该任务。
“抛弃最旧的(Discard-Oldest)”策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。
“调用者运行(Caller-Runs)”策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者。它不会再线程池的某个线程中执行新提交的任务,而是在一个调用了 execute
的线程中执行该任务(就是调用者线程)。由于执行任务需要一定时间,因此主线程至少在一段时间内不能提交任务任务,从而使得工作者线程有时间来处理完正在执行的任务。
当工作队列被填满后,饱和策略并不能阻塞 execute
的提交。此时,我们可以通过使用 Semaphore
来限制任务的到达率。下面的 程序清单 8-4 使用了一个无界队列(因为不能限制队列的大小和任务的到达率),并设置信号量的上界,设置为线程池的大小加上可排队任务的数量。
// 程序清单 8-4
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command) throws InterruptedException {
semaphore.acquire(); // 到达上限后,任务提交将在这里被阻塞
try {
exec.execute(() -> {
try {
command.run();
} finally {
semaphore.release();
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}
网友评论