6.2 Executor 框架
在 Java 类库中,任务执行的主要抽象不是 Thread
,而是 Executor
。
public interface Executor {
void execute(Runnable command);
}
虽然 Executor
是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础。
Executor
基于生产者 - 消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。如果要在程序中实现一个生产者 - 消费者的设计,那么最简单的方式就是使用 Executor
。
6.2.1 示例:基于 Executor 的 Web 服务器
public class TaskExecutionWebServer {
private static final int NTHREADS = 100;
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS); // 申明一个固定大小的线程池
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = () -> handleRequest(connection);
exec.execute(task); // 每来一个请求就将其交给线程池处理
}
}
}
在上面 TaskExecutionWebServer
中,我们通过使用 Executor
,将请求处理任务的提交与任务的实际执行解耦开来,并且只需采用另一种不同的 Executor
实现,就可以改变服务器的行为。改变 Executor
实现或配置所带来的影响要远远小于改变任务提交方式带来的影响。
我们可以很容易地将 TaskExecutionWebServer
修改为类似 ThreadPerTaskWebServer
的行为:只需使用一个为每个请求都创建新线程的 Executor
。
public class ThreadPerTaskExecutor implements Executor {
@Override
public void execute(Runnable command) {
new Thread(command).start();
}
}
同样,还可以编写一个 Executor
使 TaskExecutionWebServer
的行为类似于单线程的行为,即以同步的方式执行每个任务,然后再返回:
public class WithinThreadExecutor implements Executor {
@Override
public void execute(Runnable command) {
command.run();
}
}
6.2.3 线程池
类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用 Executors
中的静态工厂方法之一来创建一个线程池:
-
newFixedThreadPool
,将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这是线程池的规模将不再变化(如果某个线程由于发生了未预期的Exception
而结束,那么线程池会补充一个新的线程)。除非线程显示地调用shutdown
,否则线程将一直存在于线程池中。(执行完任务的线程一直阻塞的等待新任务到来。) -
newCachedThreadPool
,创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。 -
newSingleThreadExecutor
,是一个单线程的Executor
,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。newSingleThreadExecutor
能确保依照任务在队列中的顺序来串行执行。 -
newScheduledThreadPool
,创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务。
6.2.4 Executor 的生命周期
为了解决生命周期问题,Executor
扩展了 ExecutorService
接口,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法)。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
...
}
ExecutorService
的生命周期有 3 种状态:运行、关闭和已终止。ExecutorService
在初始创建时处于运行状态。shutdown
方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务。shutdownNow
方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。这部分内容可以看:ExecutorService 的 shutdown 和 shutdownNow 区别与联系。
在 ExecutorService
关闭后提交的任务将由 “拒绝执行处理器(Rejected Execution Handler)” 来处理,它会抛弃任务,或者使得 execute
方法抛出一个未检查的 RejectedExecutionException
。
6.3 找出可利用的并行性
我们假设实现一个浏览器程序中的页面渲染(Page-Rendering)功能,它的作用是将 HTML 页面绘制到图像缓存中。
6.3.1 示例:串行的页面渲染器
public class SingleThreadRender {
void renderPage(CharSequence source) {
renderText(source);
List<ImageData> imageData = new ArrayList<ImageData>();
for (ImageInfo imageInfo : scanForImageInfo(source)) {
imageData.add(imageInfo.downloadImage());
}
for (ImageData data : imageData) {
renderImage(data);
}
}
}
从上面代码我们可以看到,程序先通过 renderText
渲染了页面中的文本,然后扫描页面中图像的信息,再串行地调用 downloadImage
将所有图片依次下载下来,最后将下载下来的图像通过 renderImage
进行渲染。
图像下载过程的大部分时间都是在等待 I/O 操作执行完成,在这期间 CPU 几乎不做任何工作。因此,这种串行执行方法没有充分利用 CPU,使得用户在看到最终页面之前要等待过长的时间。
6.3.2 携带结果的任务 Callable 与 Future
Executor
框架使用 Runnable
作为基本的任务表示形式。Runnable
是一种有很大局限的抽象,虽然 run
能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。
Callable
是一种更好的抽象:它认为主入口点(即 call
)将返回一个值,并可能跑出一个异常。要使用 Callable
来表示无返回值的任务,可使用 Callable<Void>
。
6.3.3 示例:使用 Future 实现页面渲染器
public class FutureRenderer {
private final ExecutorService executor = ...;
void renderPage(CharSequence source) {
final List<ImageInfo> imageInfos = scanForImageInfo(source);
Callable<List<ImageData>> task = new Callable<>() {
@Override
public List<ImageData> call() throws Exception {
List<ImageData> result = new ArrayList<ImageData>();
for (ImageInfo imageInfo : imageInfos) {
result.add(imageInfo.downloadImage());
}
return reuslt;
}
};
Future<List<ImageData>> future = executor.submit(task);
renderText(source);
try {
List<ImageData> imageData = future.get();
for (ImageData data : imageData) {
renderImage(data);
}
} catch (InterruptedException e) {
// 重新设置线程的中断状态
Thread.currentThread().interrupt();
// 由于不需要结果,因此取消
future.cancel(true);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}
在上面的代码中,我们将渲染文字和下载图像解耦开来了。但是下载图像仍然是串行的,而且在我们大部分的场景中,下载图像的时间其实远远大于渲染文字。因此,如果我们能将串行的文件下载做成并行的,将大大地节省时间。
6.3.5 CompletionService: Executor 与 BlockingQueue
如果向 Executor
提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的 Future
,然后反复使用 get
方法,同时将参数 timeout
指定为 0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但是却使用了轮询,无端地消耗了 CPU 资源。我们可以使用 CompletionService
。
CompletionService
将 Executor
和 BlockingQueue
的功能融合在一起。你可以将一组 Callable
任务提交给它来执行,然后使用类似于队列操作的 take
和 poll
等方法来获得已完成的结果。ExecutorCompletionService
实现了 CompletionService
,并将计算部分委托给一个 Executor
。
ExecutorCompletionService
的实现其实非常简单,可以自己阅读源码。
6.3.6 示例:使用 CompletionService 实现页面渲染器
public class Renderer {
private final ExecutorService executor;
Renderer(ExecutorService executor) {
this.executor = executor;
}
void renderPage(CharSequence source) {
List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor);
for (final ImageInfo imageInfo : info) {
// 提交一组 Callable 任务
completionService.submit(() -> imageInfo.downloadImage());
}
renderText(source);
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<ImageData> f = completionService.take(); // take 是阻塞的方法,但是提交的任务组中,一旦有任务完成,take 就会马上返回
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}
上面的 Renderer
为每一幅图像的下载都创建一个独立任务,并在线程池中执行它们,从而将串行的下载过程转换为并行的过程。此外,通过从 CompletionService
中获取结果以及使每张图片在下载完成后立刻显示出来,能使用户获得一个更加动态和更高响应性的用户界面。
6.3.7 为任务设置时限
有时候,如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。例如,某个 Web 应用程序从外部的广告服务器上获取广告信息,但如果该应用程序在两秒内得不到响应,那么将显示一个默认的广告。
在有限时间内执行任务的主要困难在于,要确保得到答案的时间不会超过限定的时间,或者在限定的时间内无法获得答案。在支持时间限制的 Future.get
中支持这种需求:当结果可用时,它将立即返回,如果在指定时限内没有计算出结果,那么将抛出 TimeException
。
在使用限时任务时需要注意,当这些任务超时后应该立即停止,从而避免为继续计算一个不再使用的结果而浪费计算资源。要实现这个功能,可以由任务本身来管理它的限定时间,并且在超时后中止执行或取消任务。如果一个限时的 get
方法抛出了 TimeoutException
,那么可以通过 Future
来取消任务。如果编写的任务是可取消的,那么可以提前中止它,以免消耗过多的资源。
Page renderPageWithAd() throws InterruptedException {
long endNanos = System.nanoTime() + TIME_BUDGET;
Future<Ad> f = exec.submit(new FetchAdTask());
// 在等待广告的同时限时页面
Page page = renderPageBody();
Ad ad;
try {
// 只等待指定的时间长度
long timeLeft = endNanos - System.nanoTime();
ad = f.get(timeLeft, TimeUnit.NANOSECONDS);
} catch (ExecutionException e) {
ad = DEFAULT_AD;
} catch (TimeoutException e) {
ad = DEFAULT_AD;
f.cancel(true); // 主动取消任务
}
page.setAd(ad);
return page;
}
上面代码中,当获取广告的任务 Future
超时并抛出 TimeException
后,任务 Future
需要主动调用 cancel
方法以取消任务的执行。
上面是单个 Future
超时的时候抛出 TimeoutException
,并且需要开发者自己主动 cancel
。ExecutorService
还有传入一组任务的情况。
6.3.8 示例:旅行预订门户网站
ExecutorService
支持提交多个任务并获得结果。InvokeAll
方法的参数为一组任务,并返回一组 Future
。invokeAll
按照任务集合中迭代器的顺序将所有的 Future
添加到返回的集合中,从而使调用者能将各个 Future
与其表示的 Callable
关联起来。当所有任务都执行完毕时,或者调用线程被中断时,又或者超过指定时限时,invokeAll
将返回。当超过指定时限后,任何还未完成的任务都会取消(这与单个 Future
超时不同)。当 invokeAll
返回后,每个任务要么正常的完成,要么被取消,而客户端代码可以调用 get
或 isCancelled
来判断究竟是何种情况。
public class QuoteTask implements Callable<TravelQuote> {
private final TravelCompany company;
private final TravelInfo travelInfo;
...
public TravelQuote call() throws Exception {
return company.soliciteQuote(travelInfo);
}
}
public class RankedTravelQuotes {
public List<TravelQuote> getRankedTravelQuotes(
TravelInfo travelInfo, Set<TravelCompany> companies,
Comparator<TravelQuote> ranking, long time, TimeUnit unit)
throws InterruptedException {
List<QuoteTask> tasks = new ArrayList<>();
for (TravelCompany company : companies) {
tasks.add(new QuoteTask(company, travelInfo));
}
List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit); // 调用 invokeAll 来执行一组任务
List<TravelQuote> quotes = new ArrayList<>(tasks.size());
Iterator<QuoteTask> taskIter = tasks.iterator();
for (Future<TravelQuote> f : futures) {
QuoteTask task = taskIter.next();
try {
quotes.add(f.get()); // 阻塞地遍历 future.get()
} catch (ExecutionException e) {
quotes.add(task.getFailureQuote(e.getCause()));
} catch (CancellationException e) { // 不是抛出 TimeoutException,而是被直接取消后抛出 CancellationException
quotes.add(task.getTimeoutQuote(e));
}
}
}
}
网友评论