零:相关类UML图

一:Executor框架
public interface Executor{
void execute(Runable command);
}
Executor 是个简单的接口,它为灵活且强大的异步任务执行框架提供了基础,该框架能够支持多种不同类型的任务执行策略。它提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用 Runable 表示任务。
Executor 基于生产者 - 消费者模式,提交任务的模式相当于生产者(生成待完成的任务单元),执行任务的线程相当于消费者(执行完这些任务单元)。
1.1 Executor 的生命周期
Executor 的实现通常会创建线程来执行任务,但 JVM 只有在所有非守护线程全部终结后才会退出,因此,如果无法正确的关闭 Executor,那么 JVM 将无法结束。
由于 Executor 以异步的方式来执行任务,因此在任何时刻,之前提交任务的状态不是立即可见的,有的任务可能已经完成,有的可能正在运行,而其他的任务甚至可能在队列中等待执行。当关闭应用程序时,可能采用最平缓的关闭形式(完成所有已经启动的任务,并且不再接受新的任务),也可能采用最粗暴的关闭形式(直接关掉机房的电源),以及其他可能的形式。既然 Executor 是为应用程序提供服务的,不管是采用平缓或者粗暴的方式,他们也应该都是可关闭的,并将在关闭过程中受影响的任务的状态反馈给应用程序。
为了解决执行服务的生命周期的问题, 有了 Executor 的扩展接口 ExecutorService 接口, ExecutorService 接口添加了一些用于生命周期的方法,以及一些用于任务提交的便利方法。
public interface ExecutorService extends Executor{
//平缓的关闭过程
void shutdown();
//粗暴的关闭过程
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
//用于任务提交的一些便利方法,未列全
Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
}
ExecutorService 的生命周期有3中状态:运行、关闭和已终止
。ExecutorService 在创建初期处于运行状态。shutdown 方法将执行平缓的关闭过程:不再接受 新的任务提交,同时等待已提交的任务执行完成,包含那些还未开始执行的任务。 shutdownNow 方法将执行粗暴的关闭过程:他将尝试取消所有运行中的任务,并将不再启动队列中尚未开始执行的任务。
在 ExecutorService 关闭后提交的任务将由“拒绝执行处理器(Rejected Exception Handler)”来处理(详见3.3节),他会抛弃任务,或者使得 execute 方法抛出一个未检查的 RejectedExecutionException。等所有任务都完成后, ExecutorService 将进入终止状态。
ExecutorService 接口与 Executor 相比,提供了返回 Future 对象,终止、关闭线程池的方法。Executor 接口定义的 execute() 方法只能接收 Runable 接口的对象,而 ExecutorService 接口中的 submit() 方法则可以接受 Runable 和 Callable 接口的对象。
1.2 Callable 和 Future
Executor 框架使用 Runable 作为其基本的任务表达形式。Runable 是一种有很大局限的抽象,虽然 run() 方法能够将相应的信息写入日志或者放入共享的数据结构,但他不能返回值或者抛出一个受检查的异常。
许多任务实际上都是存在延迟的计算 -- 执行数据库查询、从网络上获取资源。对于这些任务,Callable 是一种更好的抽象:它认为 call() 方法将返回一个值或者抛出一个异常。
ExecutorService 中的所有submit 方法都将返回一个Future,从而将一个Runable或者Callable提交给Executor,并得到一个Future用来获取任务的执行结果或者取消任务。Future 代表了一个异步运算的结果,它提供了判断运算是否完成、等待运算的进行以及获得运算结果等
,该接口包含的方法如下所示:
public interface Future{
/**
* 试图取消对此任务的执行
* 如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。
* 当调用 cancel 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。
* 如果任务已经启动,则 mayInterruptIfRunning 参数确定是否应该以试图停止任务的方式来中断执行此任务的线程
*/
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
/**
* 如有必要,等待计算完成,然后获取其结果
*/
V get() throws InterruptedException, ExecutionException;
/**
* 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
其中,get 方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已完成,那么get会立即返回结果或者抛出一个 Exception ,如果任务没有完成,那么get会阻塞直到任务完成。如果任务抛出了异常,那么get会将异常封装为 ExecutionException 并重新抛出。如果任务被取消,那么get将抛出 CancellationException。
二: Executors
从 JDK 1.5 开始,类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用 Executors 中的静态工厂方法来创建线程池:
-
newFixedThreadPool:
newFixedThreadPool将创建一个固定长度的线程池,每提交一个任务时就创建一个线程池,直到达到线程池的最大数量,这时线程池中线程的数量将不再变化(如果某个线程由于发生了未预期的 Exception 而结束,那么线程池将补充一个新的线程)。
image
-
newCachedThreadPool:
newCachedThreadPool 将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,当需求增加时,可添加新的线程,线程池的规模不存在任何的限制。注意!!!!这里使用的阻塞队列为 SynchronousQueue,关于这点在 3.2 任务队列模块中详细提到。
image
-
newSingleThreadExecutor:
newSingleThreadExecutor 是一个单线程的 Executor,它创建单个工作线程来执行任务,如果已有的这个线程因异常结束,将创建一个新的线程来代替。newSingleThreadExecutor 能保证依照任务在队列中的顺序来串行执行。
image
-
newScheduledThreadPool:
newScheduledThreadPool创建一个固定长度的线程池,而且以延迟或者定时的方式来执行任务。
image
newFixedThreadPool 和 newCachedThreadPool 这两个工厂方法返回的都是 ThreadPoolExecutor 实例, newSingleThreadExecutor 和 newScheduledThreadPool 返回的则是 ScheduleExecutorService。
三:ThreadPoolExecutor
3.1 线程的创建于销毁
线程池的基本大小(CorePoolSize,也作核心线程数)、最大大小(MaximumPoolSize)以及存活时间等因素共同负责线程的创建和销毁。基本大小也就是线程池的目标大小,即在没有任务执行时线程池的大小,只有在工作队列满了才会创建超出这个数量的线程。线程池的最大大小表示可同时进行活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。

通过调节线程池的基本大小和存活时间,可以帮助线程池回收空闲线程占用的资源,从而使得这些资源可以用于执行其他的工作。(显然,这是一种折衷:回收空闲线程池会产生额外的延迟,因为当需求增加时,必须创建新的线程来满足需求。)
newFixedThreadPool 工厂方法将线程池的基本大小和最大大小设置为参数中指定的值由用户指定,而且创建的线程池不会超时。newCachedThreadPool 工厂方法将线程池的最大大小设置为 Integer.MAX_VALUE,而将基本大小设置为0,并将超时时间设置为 60s,这种方法创建的线程池可以被无线扩展,需求降低时也能够自动收缩。
3.2 任务队列

ThreadPoolExecutor 允许提供一个 BlockingQueue 来保存等待执行的任务。任务的排队方式有三种:无界队列、有界队列和同步移交。队列的选择与其他的配置参数有关,例如线程池的大小等。
newFixedThreadPool 和 newSingleThreadExecutor 在默认情况下将使用一个无界的 LinkedBlockingQueue 。如果所有现存的工作线程都处于忙碌状态,那么任务将在等待队列中等候。如果任务持续不断的到达,并且超过了线程池的处理速度,那么任务队列将无限制的增加。
一种更稳妥的资源管理策略是使用有界队列,例如 ArrayBlockingQueu、有界的LinkedBlockingQueue、PriorityBlockingQueue。有界队列有助于避免资源耗尽的情况发生,但他又带来了新的问题:当队列填满,新的任务怎么办?在使用有界队列时,队列的大小与线程池的大小必须一起调节。如果线程池较小而队列较大,那么有助于减少内存的使用量,降低CPU的使用率,同时还可以减少上下文切换,但付出的代价就是可能会限制吞吐量。
对于非常大或者无界的线程池,可以使用SynchronousQueue 来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue 不是一个真正的队列,而是一种在线程间的移交机制。要将一个任务放入 SynchronousQueue 中,必须有另外一个空闲线程正在等待接受这个任务。如果没有此时空闲线程且线程数量小于线程池的最大线程数量,那么将创建一个新的线程来执行这个任务,否则将根据饱和策略拒绝执行这个任务。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是放入队列中等待。只有当线程池是无界的或者可以拒绝执行任务时 SynchronousQueue 才有实际价值。在 newCachedThreadPool 中使用的就是 SynchronousQueue。
LinkedBlockingQueue 和 ArrayBlockingQueue 都具备队列的基本属性 -- 先进先出(FIFO),任务的执行顺序与到达顺序一致。如果想进一步控制任务的执行顺序,可以使用 PriorityBlockingQueue,这个队列将根据优先级来安排任务的执行。
只有在任务间相互独立时,为线程池或者阻塞队列设置界限才是合理的,如果任务之间存在依赖性,那么有界的线程池或者阻塞队列将导致线程“饥饿”死锁问题,此时应该使用无界的线程池,比如 newCachedThreadPool。
3.3 饱和策略
当有界队列被填满后,饱和策略开始发挥作用。 ThreadPoolExecutor 的饱和策略可以通过调用 setRejectedExecutionHandler 来修改(如果某个任务被提交到已经关闭的 Executor 时,也会用到饱和策略)。JDK 提供了几种不同的 RejectedExecutionHandler 实现,每种实现都包含有不同的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy 和 DiscardOldestPolicy。
“终止(Abort)”
策略是默认的饱和策略,该策略抛出未检查的 RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。当新提交的任务无法保存到等待队列中时,“抛弃(Discard)”
策略会抛弃该任务。“抛弃最旧的(DiscardOldest)”策略则会抛弃下一个将要被执行的任务,然后尝试重新提交新的任务(如果等待队列采用的是优先队列,那么抛弃最旧的策略将会导致抛弃优先级最高的任务,因此最高不要将“抛弃最旧的”饱和策略和优先级队列一起使用)。
“调用者运行(Caller-Runs)”策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将这些任务退回给调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了 executor 的线程中执行该任务。当线程池中所有线程被占用,并且等待队列被填满,下一个任务会在调用了 executor 的线程中执行。由于执行任务需要一定的时间,因此主线程至少在一个时间段内不能提交任务,从而使得工作线程能够有时间处理任务。
创建一个固定大小、采用有界队列以及“调用者执行”策略的线程池代码:
ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_THREADS, MAX_THREADS,
0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
3.4 线程工厂
每当线程池创建一个线程时,都是通过线程工厂方法来完成,默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂方法,可以定制线程池的配置信息。 ThreadFactory 中只定义了一个 newThread 方法,每当创建一个线程时都会调用到这个方法。
在许多情况下都需要使用定制的线程工厂方法。例如,你希望为线程池的线程指定一个 UncaughtExceptionHandler ,或者实例化一个定制的 Thread 类用于执行调试信息的记录。
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
以下代码就是一个很好的创建线程池,并设置新建线程名称的例子:
//配置线程池信息
ExecutorService exportBuyerPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("export_buyer_list-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
ThreadFactoryBuilder 位于com.google.common.util.concurrent,利用建造者模式去设置线程池的相关信息。具体代码如下,当然,也可以根据自己的需要通过实现 ThreadFactory 接口来设置线程池新建线程时的配置。
public final class ThreadFactoryBuilder {
private String nameFormat = null;
private Boolean daemon = null;
private Integer priority = null;
private UncaughtExceptionHandler uncaughtExceptionHandler = null;
private ThreadFactory backingThreadFactory = null;
public ThreadFactoryBuilder() {
}
public ThreadFactoryBuilder setNameFormat(String nameFormat) {
String.format(nameFormat, 0);
this.nameFormat = nameFormat;
return this;
}
public ThreadFactoryBuilder setDaemon(boolean daemon) {
this.daemon = daemon;
return this;
}
public ThreadFactoryBuilder setPriority(int priority) {
Preconditions.checkArgument(priority >= 1, "Thread priority (%s) must be >= %s", new Object[]{priority, 1});
Preconditions.checkArgument(priority <= 10, "Thread priority (%s) must be <= %s", new Object[]{priority, 10});
this.priority = priority;
return this;
}
public ThreadFactoryBuilder setUncaughtExceptionHandler(UncaughtExceptionHandler uncaughtExceptionHandler) {
this.uncaughtExceptionHandler = (UncaughtExceptionHandler)Preconditions.checkNotNull(uncaughtExceptionHandler);
return this;
}
public ThreadFactoryBuilder setThreadFactory(ThreadFactory backingThreadFactory) {
this.backingThreadFactory = (ThreadFactory)Preconditions.checkNotNull(backingThreadFactory);
return this;
}
public ThreadFactory build() {
return build(this);
}
private static ThreadFactory build(ThreadFactoryBuilder builder) {
final String nameFormat = builder.nameFormat;
final Boolean daemon = builder.daemon;
final Integer priority = builder.priority;
final UncaughtExceptionHandler uncaughtExceptionHandler = builder.uncaughtExceptionHandler;
final ThreadFactory backingThreadFactory = builder.backingThreadFactory != null ? builder.backingThreadFactory : Executors.defaultThreadFactory();
final AtomicLong count = nameFormat != null ? new AtomicLong(0L) : null;
return new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = backingThreadFactory.newThread(runnable);
if (nameFormat != null) {
//设置新建线程名称
thread.setName(String.format(nameFormat, count.getAndIncrement()));
}
if (daemon != null) {
//设置新建线程是否为守护线程
thread.setDaemon(daemon);
}
if (priority != null) {
//设置新建线程优先级
thread.setPriority(priority);
}
if (uncaughtExceptionHandler != null) {
//设置异常信息
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
}
return thread;
}
};
}
}
网友评论