第八章 线程池的使用
本章将介绍对线程池进行配置与调优的高级选项,并分析在使用任务框架时需要注意的各种危险,以及一些使用Executor的高级实例。
8.1 在任务与执行策略之间的隐性耦合(Implicit Couplings Between Tasks and Execution Policies)
Executor框架可以将任务的提交与任务的执行策略解耦开来。虽然Executor框架为制定和修改执行策略都提供了相当大的灵活性,但并非所有的任务都能使用所有的执行策略。有些类型的任务需要明确地指定执行策略,包括:
- 依赖性任务(Dependent tasks.)
大多数行为正确的任务都是独立的,它们不依赖其他任务的执行时序,执行结果或其他结果。当在线程池执行独立的任务时,可以随意地改变线程池的大小与配置,这些修改只会对执行性能产生影响。然而,如果提交给线程池的任务需要依赖其他的任务,那么就隐含地给执行策略带来了约束,此时必须小心地维持这些执行策略以避免产生活跃性问题(8.1.1) - 使用线程封闭机制的任务(Tasks that exploit thread confinement)。
与线程池相比,单线程的Executor能能够对并发性做出更强的承诺。它们能确保任务不会并发地执行,使你能够放宽代码对线程安全的要求。对象可以封闭在任务线程中,使得在该线程中执行的任务在访问该对象时不需要同步,即使这些资源不是线程安全的。这种情形将在任务与执行策略之间形成隐式的耦合——任务要求其执行所在的Executor是单线程的(只要确保任务不会并发执行,并提供足够的并发机制,使得一个任务对内存的作用对于下一个任务一定是可见的——这正式newSingleThreadExecutor提供的保证)。如果将Executor从单线程环境改为线程池环境,那么将失去线程安全性。 - 对响应时间敏感的任务(Response-time-sensitive tasks)
如果将一个运行时间较长的任务提交到单线程的Executor中,或者将多个运行时间较长的任务提交到一个只包含少量线程的线程池中,那么将降低由该Executor管理的服务的响应性。 - 使用ThreadLocal的任务
ThreadLocal使每个线程都可以拥有某个变量的一个私有“版本”。然而只要条件允许,Executor可以自由地重用这些线程。
在标准的Executor实现中,当执行需求较低时将回收空闲线程,而当需求增加时将添加新的线程,并且如果从任务中抛出了一个未检查异常,那么将用一个新的工作者线程来代替抛出异常的线程。只有当线程本地值的生命周期受限于任务的生命周期时,在线程池的线程中使用ThreadLocal才有意义,而在线程池的线程中不应该使用ThreadLocal在任务之间传递值。
只有当任务都是同类型的并且互相独立时,线程池的性能才能达到最佳。
如果将运行时间较长和运行时间较短的任务混合在一起,除非线程池很大,否则将可能造成“拥塞”
如果提交的任务依赖于其他任务,除非线程池无限大,否则可能造成死锁。
在一些任务中,需要拥有或排除某种特定的执行策略。如果某些任务依赖与其他的任务,那么会要求线程池足够大,从而确保它们的依赖任务不会内放入等待队列中或被拒绝,而采用线程封闭机制的任务需要串行执行。通过将这些需求写入文档,将来的代码维护人员就不会由于使用了某种不合适的执行侧路而破坏了安全性或活跃性。
8.1.1 线程饥饿死锁
在线程中,如果任务依赖与其他任务,那么可能产生死锁。
在单线程的Executor中,如果一个任务将另一个任务提交到同一个Executor,并且等待这个被提交任务的结果,那么通常会引发死锁。第二个任务停留在工作队列中,并等待第一个任务完成,而第一个任务又无法完成,因为它在等待第二个任务的完成。
在更大的线程池中,如果所有正在执行的任务的线程都由于等待其他仍处于工作队列的任务而阻塞,那么会发生同样的问题,这个现象被称为线程饥饿死锁(Thread Starvation Deadlock)。
只要线程池中的任务需要无限期等待一些必须由线程池中其他任务才能提供的资源或条件,例如某个任务等待另一个任务的返回值或执行结果,那么除非线程池足够大,否则将发生线程饥饿死锁。
在8-1中给出了线程饥饿死锁的示例。RenderPageTask向Executor提交了两个任务来获取网页的页眉和页脚。绘制页面,等待获取页眉和页脚任务的结果,然后将页脚,页面主题和页眉组合起来并形成最终的页面。当使用单线程的Executor,那么ThreadDeadlock会经常发生死锁。同样,如果线程池不够大,那么当多个任务通过栅栏机制来彼此协调时,将导致线程饥饿死锁。
程序清单8-1
public class ThreadDeadlock {
ExecutorService exec = Executors.newSingleThreadExecutor();
public class LoadFileTask implements Callable<String> {
private final String fileName;
public LoadFileTask(String fileName) {
this.fileName = fileName;
}
public String call() throws Exception {
// Here's where we would actually read the file
return "";
}
}
public class RenderPageTask implements Callable<String> {
public String call() throws Exception {
Future<String> header, footer;
header = exec.submit(new LoadFileTask("header.html"));
footer = exec.submit(new LoadFileTask("footer.html"));
String page = renderBody();
// Will deadlock -- task waiting for result of subtask
return header.get() + page + footer.get();
}
private String renderBody() {
// Here's where we would actually render the page
return "";
}
}
}
每当提交了一个有依赖性的Executor任务时,要清楚地知道可能会出现线程饥饿死锁,因此需要在代码或者配置Executor的配置文件中记录线程池的大小限制或配置限制。
除了在线程池大小的显式限制外,还可能由于其他资源上的约束而存在一些隐式限制。
如果应用程序使用一个包含10个连接的JDBC连接池,并且每个任务需要一个数据库连接,那么线程池就好像只有10个线程,因为任务超过10个时,新的任务需要等待其他任务释放连接。
8.1.2 运行时间较长的任务
执行时间较长的任务不仅会造成线程池堵塞,还会增加执行时间较短任务的服务时间。如果线程池中线程的数量远小于在稳定状态下执行时间较长任务的数量,那么到最后可能所有的线程都会运行这些执行时间较长的任务,从而影响整体的响应性。
限定任务等待资源的时间而不是无限制地等待,可以缓解执行时间较长任务造成的影响。
在类库的大多数可阻塞方法中,都同时定义了限时版本和无限时版本(例如Thread.join,,BlockingQueue.put,CountDownLatch.awati以及Selector.select等)。
如果执行超时,那么可把任务标识为失败,然后中止任务或将任务重新返回队列中以便随后执行。
8.2 设置线程池的大小
线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。在代码中通常不会固定线程池的大小,而应该通过某种配置机制来提供,或者根据Runtime.availableProcessors来动态计算。
如果线程池过大,那么大量的线程将在相对较少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,还可能耗尽资源。如果线程池过小,那么将导致许多空闲的处理器无法执行工作,从而降低吞吐率。
要想正确地设置线程池的大小,必须分析计算环境,资源预算和任务的特性。
如果需要执行不同类别的任务,并且它们之间的行为相差很大,那么就应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整。
在计算密集型的任务,在拥有Ncpu个处理器的系统上,当线程池的大小为Ncpu+1,通常能实现最优的利用率。
对于包含I/O操作或其他阻塞操作的任务,由于线程不会一直执行,因此线程池的规模应该更大、要正确地设置线程池的大小,你必须估算出任务的等待时间与计算时间的比值,这可以通过一些分析或监控工具来获得。
还可以通过另一种方法来调节线程池的大小:在某个基准负载下,分别设置大小不同的线程池来运行应用程序,并观察CPU利用率的水平。
给出下列定义:
image要使处理器到达期望的使用率,线程池的最优大小等于:
image可以通过Runtime来获得CPU的数目:
int N_CPUS = Runtime.getRuntime().availableProcessors();
CPU周期并不是唯一影响线程池大小的资源,还包括内存,文件句柄,套接字句柄和数据库连接等。
8.3 配置ThreadPoolExecutor
ThreadPoolExecutor为一些Executor提供了基本的实现,这些Executor是由Executors中的newCachedThreadPool,newFixedThreadPool和newScheduledThreadPool等工厂方法返回的。ThreadPoolExecutor是一个灵活的,稳定的线程池,允许进行各种定制。
如果默认的执行策略不能满足需求,那么可以通过ThreadPoolExecutor的构造函数来实例化一个对象,并根据自己的需求来定制,并且可以参考Executors的源代码爱了解默认配置下的执行策略,然后再以这些执行策略为基础进行修改。
程序清单8-2
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExceutionHandler handler){...}
8.3.1 线程的创建与销毁
线程池的基本大小(Core Pool Size,core核心的),最大大小以及存活时间等因素共同负责线程的创建与销毁。基本大小就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。线程池的最大大小表示可同时活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。
线程池的处理流程:
image一个线程从被提交(submit)到执行共经历以下流程:
- 线程池判断核心线程池里是的线程是否都在执行任务,如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下一个流程
- 线程池判断工作队列是否已满。如果工作队列没有满,则将新提交的任务储存在这个工作队列里。如果工作队列满了,则进入下一个流程。
- 线程池判断其内部线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已满了,则交给饱和策略来处理这个任务。
线程池在执行excute方法时,主要有以下四种情况
- 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(需要获得全局锁)
- 如果运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue
- 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(需要获得全局锁)
- 如果创建新线程将使当前运行的线程超出maxiumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
通过调节线程池的基本大小和存活时间,可以帮助线程池回收空闲线程占有的资源,从而使得这些资源可以用于执行其他工作。
newFixedThreadPool工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。
newCachedThreadPool工厂方法将线程池的最大大小设置为Integ.MAX_VALUE,而且将基本大小设置为0,并将超时设置为1分钟,这种方法创建的线程池可以被无限扩展,并且当需求降低时会自动收缩。
其他形式的线程池可以通过显式的ThreadPoolExecutor构造函数来构造。
image8.3.2 管理队列任务
在有限的线程池中会限制可并发的任务数量(单线程的Executor是特例:它们能确保不会有任务并发执行,因为它们通过线程封闭来实现线程安全性)
6.1.2介绍,如果无限制地创建线程,将导致不稳定性,并通过采用固定大小的线程池(而不是每收到一个请求就创建一个新线程)来解决。
然而,在高负载情况下,应用程序仍可能耗尽资源,只是出现问题的概率较小。如果新请求的到达速率超过了线程池的处理速率,那么新到来的请求将累积起来。
在线程池中,这些请求会在一个由Executor管理的Runnable队列中等待。
通过一个Runnable和一个链表节点来表现一个等待中的任务,比使用线程来表示的开销低很多,但如果客户提交给服务器请求的速率超过了服务器的处理速率,仍可能会耗尽资源。
ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。
基本的任务排队方法有3种:无界队列(unbounded queue,),有界队列(bounded queue,)和同步移交(synchronous handoff)。
队列的选择与其他的配置参数有关,例如线程池的大小等。
newFixedThreadPool和newSingleThreadExecutor在默认情况下将使用一个无界的LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续地达到,并且超过了线程池处理它们的速度,那么队列将无限制地增加。
一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue,有界的LinkedBlockingQueue,PriorityBlockingQueue。
有界队列有助于避免资源耗尽的情况发生,但队列填满后,新的任务怎么办(有许多饱和策略【saturation policies】可以解决,参见8.3.3).
在使用有界队列的时,队列的大小与线程池的大小必须一起调节。如果线程池较小而队列较大,有助于减少内存使用量,减低CPU的使用率,同时还可以减少上下文切换,但代价是可能限制吞吐量。
在newCachedThreadPool工厂方法中使用了SynchronousQueue。
对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。
SynchronousQueue不是一个真正的队列,而是一种在线程之间移交的机制。
要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么TrheadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。
使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是首先放在队列中,然后有工作者线程从队列中提取该任务。
只有当线程池是无界的或者可以拒绝任务时,SynchronousQueue才有实际价值。
当使用像LinkedBlockingQueue或ArrayBlockingQueue这样的FIFO队列时,任务的执行顺序与到达顺序相同。
如果像进一步控制任务执行顺序,可以使用PriorityBlockingQueue,这个队列将根据优先级来安排任务。任务的优先级时通过自然顺序或Comparator(如果任务实现了Comparable)来定义的。
对于Executor,newCachedThreadPool工厂方法是一种很好的默认选择,他能提供比固定大小的线程更好的排队性能(由于使用了SynchronousQueue而不是LinkedBlockingQueue)。
当需要限制当前任务的数量以满足资源管理需求时,可以选择固定大小的线程池,就像在接受网络用户请求的服务器应用程序中,如果不进行限制,容易发生过载问题。
只有任务相互独立时,为线程池或工作队列设置界限才是合理的。
如果任务之间存在依赖关系,那么有界的线程池或队列可能导致线程饥饿死锁问题。此时应该使用无界的线程池如newCachedThreadPool。
8.3.3 饱和策略
当有界队列被填满后,饱和策略开始发挥作用。
ThreadPoolExecutor的饱和策略可以通过调用setRejectedExecutionHandler来修改。(如果某个任务被提交到一个已被关闭的Executor时,也会用到饱和策略)
JDK提供了几种不同的setRejectedExecutionHandler实现,每种实现都包含有不同的饱和策略:
AbortPolicy, CallerRunsPolicy, DiscardPolicy和DiscardOldestPolicy.
AbortPolicy,“中止(Abort)策略”是默认的饱和策略,该策略将抛出未检查的Rejected-ExecutionException。调用这可以捕获这个异常,然后根据需求编写自己的处理代码。
当新提交的任务无法保存到队列中等待执行时,DiscardPolicy,“抛弃(Discard)策略”会悄悄抛弃该任务。
DiscardOldestPolicy,“抛弃最旧策略“则会抛弃下个将被执行任务,然后尝试重新提交新的任务。(如果工作队列是一个优先队列,那么抛弃最旧策略将导致抛弃优先级最高的任务,因此最好不要将抛弃最旧饱和策略和优先级队列一起使用)
CallerRunsPolicy,“调用者运行策略“实现了一种调节机制。该策略不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而减低新任务的流量。
它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。
我们以第6章的WebServer作为实例(如6.2.1),修改为使用有界队列和CallerRunsPolicy饱和策略。
当线程池中的所有线程都被占用,并且工作队列被填满后,下一个任务会在调用execute时在主线程中执行。由于执行任务需要一定时间,因此主线程至少在一段时间内不能提交任何任务,从而使得工作者线程有时间来处理正在执行的任务。在这期间,主线程不会调用accpet,因此到达的请求将被保存在TCP层的队列中而不是在应用程序的队列中。如果持续过载,TCP层将发现它的请求队列被填满,因此同样会开始抛弃请求。当服务器过载时,这种过载情况会逐渐向外蔓延开来——从线程池到工作队列到应用程序再到TCP层,最终到达客户端,导致服务器在高负载下实现一种平缓的性能降低。
当创建Executor时,可以选择饱和策略或者对执行策略进行修改。
程序清单8-3
// 8-3 创建一个固定大小的线程池,并采用有界队列以及“调用者运行”饱和策略
ThreadPoolExecutor executor
= new ThreadPoolExecutor(N_THREADS, N_THREADS,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(CAPACITY));
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy());
当工作队列被填满后,没有预定义的饱和策略来阻塞execute。然而,通过使用Semaphore(信号量)来限制任务的到达率,就可以实现这个功能。
BoundedExecutor使用了一个无界队列(因为不能限制队列的大小和任务额达到率),并设置信号量的上界设置为线程池的大小加上可排队任务的数量,这是因为信号量需要控制正在执行的和等待执行的任务数量。
程序清单8-4
@ThreadSafe
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
}
}
}
8.3.4 线程工厂
每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。
程序清单8-5
// 8-5 ThreadFactory接口
public interface ThreadFactory {
Thread newThread(Runnable r);
}
默认的线程工厂方法将创建一个新的,非守护的线程,并且不包含特殊的配置信息。
通过指定一个线程工厂方法,可以定制线程池的配置信息。在ThreadFactory中只定义了一个方法newThread,每当线程池需要创建一个新线程都会调用这个方法。
在许多情况下需要使用定制的线程工厂方法。在MyThreadFactory中给出了一个自定义的线程工厂。它创建了一个新的MyAppThread实例,并将一个特定于线程池中的名字传递给MyAppThread的构造函数,从而可以在线程转储和错误日志信息中区分来自不同线程池的功能。
在应用程序的其他地方也可以使用MyAppThread,以便所有线程都能使用它的调试功能。
程序清单8-6
public class MyThreadFactory implements ThreadFactory {
private final String poolName;
public MyThreadFactory(String poolName) {
this.poolName = poolName;
}
public Thread newThread(Runnable runnable) {
return new MyAppThread(runnable, poolName);
}
}
在MyAppThread中还可以定制其他行为,包括:为线程指定名字,设置自定义UncaughtException-Handler向Logger中写入信息,维护一些统计信息(包括有多少个线程被创建和销毁),以及在线程被创建或终止时把调试信息写入日志。
程序清单8-7
public class MyAppThread extends Thread {
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();
public MyAppThread(Runnable r) {
this(r, DEFAULT_NAME);
}
public MyAppThread(Runnable runnable, String name) {
super(runnable, name + "-" + created.incrementAndGet());
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t,
Throwable e) {
log.log(Level.SEVERE,
"UNCAUGHT in thread " + t.getName(), e);
}
});
}
public void run() {
// Copy debug flag to ensure consistent value throughout.
boolean debug = debugLifecycle;
if (debug) log.log(Level.FINE, "Created " + getName());
try {
alive.incrementAndGet();
super.run();
} finally {
alive.decrementAndGet();
if (debug) log.log(Level.FINE, "Exiting " + getName());
}
}
public static int getThreadsCreated() {
return created.get();
}
public static int getThreadsAlive() {
return alive.get();
}
public static boolean getDebug() {
return debugLifecycle;
}
public static void setDebug(boolean b) {
debugLifecycle = b;
}
}
如果在应用程序中需要利用安全策略来控制对某些特殊代码库的访问权限,那么可以通过Executor中的privilegedThreadFactory工厂方法(privileged,享有特权的)来定制自己的线程工厂。通过这种方式创建出来的线程,将与创建privilegedThreadFactory的线程拥有相同的访问权限(permissions),AccessControlContext和contextClassLoader。
如果不使用privilegedThreadFactory,线程池创建的线程将从在需要新线程调用execute或submit的客户程序中继承访问权限,从而导致令人困惑的安全性异常。
8.3.5 在调用构造函数后再定制ThreadPoolExecutor
在调用完ThreadPoolExecutor的构造函数后,仍然可以通过设置函数(Setter)来修改大多数传递给它的构造函数的参数(例如线程池的基本大小,最大大小,存活时间,线程工厂以及拒绝执行处理器(rejected execution handler))。如果Executor是通过Executors中的某个(newSingleThreadExecutor除外)工厂方法创建的,那么可以将结果的类型转换为ThreadPoolExecutor以访问设置器,如 8-8
程序清单8-8
ExecutorService exec = Executors.newCachedThreadPool();
if (exec instanceof ThreadPoolExecutor)
((ThreadPoolExecutor) exec).setCorePoolSize(10);
else
throw new AssertionError("Oops, bad assumption");
在Executors中包含一个unconfigurableExecutorService工厂方法,该方法对一个现有的ExecutorService,使其只暴露出ExecutorService的方法,因此不能对它进行配置。newSingleThreadExecutor返回按这种方法包装的ExecutorService,而不是最初的ThreadPoolExecutor。虽然单线程的Executor实际上被实现为一个只包含唯一线程线程池,但它同样确保了不会并发地执行任务。如果在代码中增加单线程Executor的线程池大小,将破坏它的执行语义。
可以在自己的Executor中使用这项技术以防止执行策略被修改。如果将ExecutorService暴露给不信任的代码,又不希望对其进行修改,就可以通过unconfigurableExecutorService来包装它。
8.4 扩展ThreadPoolExecutor
ThreadPoolExecutor是可扩展的,它提供了几个可以在子类化中改写的方法:beforeExecute,afterExecute和terminated,这些方法可以用于扩展ThreadPoolExecutor的行为。
在执行任务的线程中将调用beforeExecute和afterExecute等方法,在这些方法中还可以添加日志,计时,监视或统计信息收集的功能。
无论是从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用。(如果任务在完成后带有一个Error,那么就不会调用afterExecute)如果beforeExecute抛出一个RuntimeException,那么任务将不被执行,并且afterExecute也不会被调用。
在线程池完成关闭时调用terminated,也就是在所有任务都已经完成并且所有工作者线程也已经关闭后。
terminated可以用来释放Executor在其生命周期里分配的各种资源,此外还可以执行发送通知,记录日志或收集finalize统计信息等操作。
8.4.1 示例:给线程池添加统计信息
在TimingThreadPool中给出了一个自定义的线程池,它通过beforeExecute,afterExecute和terminated等方法来添加日志记录和统计信息收集。
为了测量任务的运行时间,beforeExecute必须记录开始时间并把它保存到一个afterExecute可以访问的地方。因为这些方法将在执行任务的线程中调用,因此beforeExecute可以把值保存到一个ThreadLocal变量中,然后由afterExecute来读取。在TimingThreadPool中使用了两个AtomicLong变量,分别用于记录已处理的任务数和总的处理时间,并通过terminated来输出包含平均任务的日志消息。
程序清单8-9
public class TimingThreadPool extends ThreadPoolExecutor {
public TimingThreadPool() {
super(1, 1, 0L, TimeUnit.SECONDS, null);
}
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s: end %s, time=%dns",
t, r, taskTime));
} finally {
super.afterExecute(r, t);
}
}
protected void terminated() {
try {
log.info(String.format("Terminated: avg time=%dns",
totalTime.get() / numTasks.get()));
} finally {
super.terminated();
}
}
}
8.5 递归算法的并行化
在6.3节中,最后一次实现将每个图像的下载都视作一个独立任务,从而实现了更高的并行性。
如果在循环中包含了一些密集计算,或者需要执行可能阻塞的I/O操作,那么只要每次迭代是独立的,都可以对其进行并行化。
如果循环中的迭代操作都是独立的,并且不需要等待所有的迭代操作都完成在继续执行,那么就可以使用Executor将串行循环转化为并行循环,在processSequentially和processInParallel中给出了这种方法。(parallel 并行的 ,sequentially 继而地,顺序地)
程序清单8-10
public abstract class TransformingSequential {
void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e);
}
void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements)
exec.execute(new Runnable() {
public void run() {
process(e);
}
});
}
public abstract void process(Element e);
public <T> void sequentialRecursive(List<Node<T>> nodes,
Collection<T> results) {
for (Node<T> n : nodes) {
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}
public <T> void parallelRecursive(final Executor exec,
List<Node<T>> nodes,
final Collection<T> results) {
for (final Node<T> n : nodes) {
exec.execute(new Runnable() {
public void run() {
results.add(n.compute());
}
});
parallelRecursive(exec, n.getChildren(), results);
}
}
public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
interface Element {
}
interface Node <T> {
T compute();
List<Node<T>> getChildren();
}
}
调用processInParallel比processSequentially能更快地返回,因为processInParallel**会在所有下载任务都进入了Executor的队列后就立即返回,而不会等待这些任务全部完成。如果需要提交一个任务集并等待它们完成,那么可以使用ExecutorService.invokeAll,并且在所有任务都执行完成后调用CompletionService来获取结果,如第6章的Render所示。
当串行循环中的各个迭代操作之间彼此独立,并且每个迭代操作执行的工作量比管理一个新任务时带来的开销更多,那么这个串行循环就适合并行化。
在一些递归设计中同样可以采用循环并行化的方法。在递归算法中通常都会存在串行循环,而且这些循环可以按照程序8-10的方式并行化。
一种简单的情况是:在每个迭代操作中都不需要来自于后续递归迭代的结果。例如,8-11中的sequentialRecursive用深度优先算法遍历一棵树,在每个节点上执行计算并将结果放入一个集合。修改后的parallel同样执行深度优先遍历,但它并不是在访问节点时进行计算,而是为每个节点提交一个任务来完成计算。(recursive 递归的)
程序清单8-11
public abstract class TransformingSequential {
void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e);
}
void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements)
exec.execute(new Runnable() {
public void run() {
process(e);
}
});
}
public abstract void process(Element e);
public <T> void sequentialRecursive(List<Node<T>> nodes,
Collection<T> results) {
for (Node<T> n : nodes) {
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}
public <T> void parallelRecursive(final Executor exec,
List<Node<T>> nodes,
final Collection<T> results) {
for (final Node<T> n : nodes) {
exec.execute(new Runnable() {
public void run() {
results.add(n.compute());
}
});
parallelRecursive(exec, n.getChildren(), results);
}
}
public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
interface Element {
}
interface Node <T> {
T compute();
List<Node<T>> getChildren();
}
}
当parallelRecursive返回时,树中的各个节点都已经访问过了(但是遍历过程仍然是串行,只有compute是并行执行的),并且每个节点的计算也已经放入Executor的工作队列。
parallelRecursive的调用者可以通过以下方式等待所有的结果:创建一个特定于遍历过程的Executor,并使用shutdown和awaitTermination等方法,如下:
程序清单8-12
public abstract class TransformingSequential {
void processSequentially(List<Element> elements) {
for (Element e : elements)
process(e);
}
void processInParallel(Executor exec, List<Element> elements) {
for (final Element e : elements)
exec.execute(new Runnable() {
public void run() {
process(e);
}
});
}
public abstract void process(Element e);
public <T> void sequentialRecursive(List<Node<T>> nodes,
Collection<T> results) {
for (Node<T> n : nodes) {
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
}
public <T> void parallelRecursive(final Executor exec,
List<Node<T>> nodes,
final Collection<T> results) {
for (final Node<T> n : nodes) {
exec.execute(new Runnable() {
public void run() {
results.add(n.compute());
}
});
parallelRecursive(exec, n.getChildren(), results);
}
}
public <T> Collection<T> getParallelResults(List<Node<T>> nodes)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
interface Element {
}
interface Node <T> {
T compute();
List<Node<T>> getChildren();
}
}
8.5.1 示例:谜题框架
递归算法的并行化的一种引用就是解决一些谜题,这些谜题都需要找出一系列的操作从初始状态转换到目标状态,例如类似与“搬箱子”,“四色方柱”和其他棋牌谜题。
我们将谜题定义为:包含了一个初始位置,一个目标位置,以及用于判断是否是有效移动的规则集。规则集包含两个部分:计算从指定位置开始的所有合法移动,以及每次移动的结果位置。
8-13给出了表示谜题的抽象类,其中的类型参数P和M表示位置类和移动类。根据这个接口,我们可以写一个简单的串行求解程序,该程序将在谜题空间(Puzzle Space)中查找,直到找到一个解答或者找遍了整个空间都没有发现答案。
程序清单8-13
public interface Puzzle <P, M> {
P initialPosition();
boolean isGoal(P position);
Set<M> legalMoves(P position);
P move(P position, M move);
}
在8-14中的PuzzleNode代表通过一系列的移动到达的一个位置,其中保存了到达该位置的移动以及前一个PuzzleNode,只要沿着PuzzleNode链接逐步回溯,就可以重新构建出到达当前位置的移动序列。
程序清单8-14
@Immutable
public class PuzzleNode <P, M> {
final P pos;
final M move;
final PuzzleNode<P, M> prev;
public PuzzleNode(P pos, M move, PuzzleNode<P, M> prev) {
this.pos = pos;
this.move = move;
this.prev = prev;
}
List<M> asMoveList() {
List<M> solution = new LinkedList<M>();
for (PuzzleNode<P, M> n = this; n.move != null; n = n.prev)
solution.add(0, n.move);
return solution;
}
}
在8-15中给出了谜题框架的串行解决方案,它在谜题空间中执行一个深度优先搜索,当找到解答方案(不一定是最短的解决方案)后结束搜索。
为了避免无限循环,在串行版本中引入了一个Set对象,其中保存了之前已经搜索过的所有位置。
程序清单8-15
public class SequentialPuzzleSolver <P, M> {
private final Puzzle<P, M> puzzle;
private final Set<P> seen = new HashSet<P>();
public SequentialPuzzleSolver(Puzzle<P, M> puzzle) {
this.puzzle = puzzle;
}
public List<M> solve() {
P pos = puzzle.initialPosition();
return search(new PuzzleNode<P, M>(pos, null, null));
}
private List<M> search(PuzzleNode<P, M> node) {
if (!seen.contains(node.pos)) {
seen.add(node.pos);
if (puzzle.isGoal(node.pos))
return node.asMoveList();
for (M move : puzzle.legalMoves(node.pos)) {
P pos = puzzle.move(node.pos, move);
PuzzleNode<P, M> child = new PuzzleNode<P, M>(pos, move, node);
List<M> result = search(child);
if (result != null)
return result;
}
}
return null;
}
}
通过修改解决方案以利用并发性,可以以并发方式来计算下一步移动以及目标条件,因为计算某次移动的过程很大程度上与计算其他移动的过程是相互独立的。(之所以是很大程度上,是因为在各个任务之间会共享一些可变状态,例如已遍历位置的集合)如果有多个处理器可用,这将减少寻找解决方案所花费的时间。
在8-16中使用了一个内部类SolverTask,这个类继承了PuzzleNode并实现了Runnable,大多数工作都是在run方法中完成的:首先计算下一步可能到达的所有位置,并去掉已经到达的所有位置,然后判断(这个任务或者其他某个任务)是否已经成功地完成,最后将尚未搜索过的位置提交给Executor。
程序8-16
public class ConcurrentPuzzleSolver <P, M> {
private final Puzzle<P, M> puzzle;
private final ExecutorService exec;
private final ConcurrentMap<P, Boolean> seen;
protected final ValueLatch<PuzzleNode<P, M>> solution = new ValueLatch<PuzzleNode<P, M>>();
public ConcurrentPuzzleSolver(Puzzle<P, M> puzzle) {
this.puzzle = puzzle;
this.exec = initThreadPool();
this.seen = new ConcurrentHashMap<P, Boolean>();
if (exec instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
}
}
private ExecutorService initThreadPool() {
return Executors.newCachedThreadPool();
}
public List<M> solve() throws InterruptedException {
try {
P p = puzzle.initialPosition();
exec.execute(newTask(p, null, null));
// block until solution found
PuzzleNode<P, M> solnPuzzleNode = solution.getValue();
return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList();
} finally {
exec.shutdown();
}
}
protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
return new SolverTask(p, m, n);
}
protected class SolverTask extends PuzzleNode<P, M> implements Runnable {
SolverTask(P pos, M move, PuzzleNode<P, M> prev) {
super(pos, move, prev);
}
public void run() {
if (solution.isSet()
|| seen.putIfAbsent(pos, true) != null)
return; // already solved or seen this position
if (puzzle.isGoal(pos))
solution.setValue(this);
else
for (M m : puzzle.legalMoves(pos))
exec.execute(newTask(puzzle.move(pos, m), m, this));
}
}
}
为了避免无限循环,在串行版本中引入了一个Set对象,其中保存了之前已经搜索过的所有位置。在
ConcurrentPuzzleSolver使用了ConcurrentHashMap来实现相同的功能。这种做法不仅提供了线程安全性,还避免了在更新共享集合时存在的竞态条件,因为putIfAbsent只有在之前没有遍历过的某个位置才会通过原子方式添加到集合中。ConcurrentPuzzleSolver使用线程池内工作队列而不是调用栈来保存搜索的状态。
串行版本的程序执行深度优先搜索,因此搜索过程将受限与栈的大小。并发版本的程序执行广度优先搜索,因此不会受到栈大小的限制(但如果待搜索的或者已搜索的位置集合的大小超过了可能的内存总量,那么人可能耗尽内存)、
为了在找到某个解答后停止搜索,需要通过某种方式来检查是否有线程已经找到了一个解答。如果需要第一个找到的解答,那么还需要在其他任务都没有找打解答时更新解答。这些需求描述的一种闭锁(Latch)机制(5.5.1中),具体的说,是一种包含结果的闭锁。在8-17的ValueLatch中使用CountDownLatch来实现所需的闭锁行为,并且使用锁定机制来确保解答只会被设置一次。
程序清单8-17
@ThreadSafe
public class ValueLatch <T> {
@GuardedBy("this") private T value = null;
private final CountDownLatch done = new CountDownLatch(1);
public boolean isSet() {
return (done.getCount() == 0);
}
public synchronized void setValue(T newValue) {
if (!isSet()) {
value = newValue;
done.countDown();
}
}
public T getValue() throws InterruptedException {
done.await();
synchronized (this) {
return value;
}
}
}
每个任务首先查询solution闭锁,找到一个解答就停止。而在此之前,主线程需要等待,ValueLatch提供了一种方式来保存这个值,只有第一次调用才会设置它。调用者能够判断这个值是否已经被设置,以及阻塞等待它被设置。在第一次调用setValue时,将更新解答方案,并且CountDownLatch会递减,从getValue中释放主线程。
第一个找到解答的线程还会关闭Executor,从而阻止接受新的任务。要避免处理RejectedExecutionException,需要将拒绝处理器设置为“抛弃已提交的任务”。然后,所有未完成的任务最终将执行完成,并且在执行任何新任务时都会失败,从而使Executor结束。(如果任务运行的时间过长,那么可以中断它们而不是等它们完成)。
如果不存在解答,那么ConcurrentPuzzleSolver就不能很好地处理这种情况:如果已经遍历了所有的移动和位置都没有找到答案,那么在getSolution调用中将永远等待下去。当遍历了整个所有空间时,串行版本将结束,但要结束并发程序会更困难。其中一种方法是:记录活动任务的数量,当该值为0时将解答设置为null,如8-18
程序清单8-18
public class PuzzleSolver <P,M> extends ConcurrentPuzzleSolver<P, M> {
PuzzleSolver(Puzzle<P, M> puzzle) {
super(puzzle);
}
private final AtomicInteger taskCount = new AtomicInteger(0);
protected Runnable newTask(P p, M m, PuzzleNode<P, M> n) {
return new CountingSolverTask(p, m, n);
}
class CountingSolverTask extends SolverTask {
CountingSolverTask(P pos, M move, PuzzleNode<P, M> prev) {
super(pos, move, prev);
taskCount.incrementAndGet();
}
public void run() {
try {
super.run();
} finally {
if (taskCount.decrementAndGet() == 0)
solution.setValue(null);
}
}
}
}
找到解答的时间可能比等待的时间 要长,因此在解决器中需要包含几个结束条件。其中一个结束条件时候时间限制的,这容易实现:在ValueLatch中实现一个限时的getValue(其中使用限时版本的await),如果getValue超时,那么关闭Executor并声明出现了一个失败。另一个结束条件是某种特定与谜题的标准,例如只搜索特定数量的位置。此外,还可以提供一种取消机制,由用户z自己决定何时停止搜索。
小结
对于并发执行的任务,Executor框架是一种强大且灵活的框架。它提供了大量可调节的选项,例如创建线程和关闭线程的策略,处理任务队列的策略,处理过多任务的策略,并且提供了几个钩子方法来扩展它 行为。
网友评论