第六章 任务执行
6.1 在线程中执行任务
当围绕“任务执行”来设计应用程序结构时,第一步就是要找出清晰的任务边界。应该让各个任务保持独立:任务并不依赖与其他任务的状态、结果或边界效应。
6.1.1 串行地执行任务
在应用程序中可以提供多种策略来调度任务,最简单的策略就是在单个线程中串行地执行各项任务。
程序清单 6-1 串行的 Web服务器
public class SingleThreadWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
Socket connection = socket.accept();
handleRequest(connection);
}
}
private static void handleRequest(Socket connection) {
// request-handling logic here
}
}
由于SingleThreadWebServer每次只能处理一个请求,所以实际应用中肯定不会采取这种做法。
在服务器应用程序中,串行处理机制通常都无法提供高吞吐量或快速响应性。
6.1.2 显示地为任务创建线程
通过为每个请求创建一个新的线程来提供服务,从而实现更高的响应性,如程序清单6-2所示。
程序清单6-2 在Web服务器中为每个请求启动一个新的线程
public class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
new Thread(task).start();
}
}
private static void handleRequest(Socket connection) {
// request-handling logic here
}
}
对于每个链接,主循环都将创建一个新线程来处理请求,而不是在主循环中进行处理。由此可以得出3个主要结论:
- 任务处理过程从主线程中分离出来,使得主循环能够更快地重新等待下一个到来的连接。
- 任务可以并行处理,从而能同时服务多个请求。
- 任务处理代码必须是线程安全的。
6.1.3 无限制创建线程的不足
在生成环境中,“为每个任务分配一个线程”这种方法存在一些缺陷,尤其是当需要创建大量的线程时:
- 线程生命周期的开销非常高。创建和销毁线程的代价很高。
- 资源消耗。活跃的线程会消耗系统资源,尤其是内存。
- 稳定性。根据平台的不同,在可创建的线程数量上存在一个限制,超过限制之后,可能会抛出OutOfMemoryError异常。
所以该方法的问题在于,它没有限制可创建线程的数量。
6.2 Executor框架
任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。
线程池简化了线程的管理工作,并且java.util.concurrent提供了一种灵活的线程池实现作为Executor框架的一部分。在Java类库中,任务执行的主要抽象不是Thread,而是Executor。
程序清单6-3
public interface Executor {
void execute(Runnable command);
}
Executor提供了一种标准的方法将任务的提交过程与执行过程解耦开来,并用Runnable来表示任务。
Executor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程则相当于消费者。
6.2.1 示例:基于Executor的Web服务器
程序清单6-4
public class TaskExecutionWebServer {
private static final int NTHREADS = 100;
private static final Executor exec
= Executors.newFixedThreadPool(NTHREADS);
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
exec.execute(task);
}
}
private static void handleRequest(Socket connection) {
// request-handling logic here
}
}
程序清单6-4中用Executor代替了硬编码的线程创建过程,实现一个可以容纳100个线程的线程池。
该程序将请求处理任务的提交与任务的实际执行解耦开来,并且只需采用另一种不同的Executor实现,就可以改变服务器的行为。
我们可以很容易地将TaskExecutionWebService修改为类似ThreadPerTaskWebServer的行为,只需使用一个为每个请求都创建新线程的Executor,如程序清单6-5所示
程序清单6-5
public class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
};
}
同样,还可以编写一个Executor使TaskExecutionWebServer的行为类似于单线程的行为,即以同步的方式执行每个任务,然后再返回。
程序清单6-6
public class WithinThreadExecutor implements Executor {
public void execute(Runnable r) {
r.run();
};
}
6.2.2 执行策略
通过将任务的提交与执行解耦开来,从而无须太大的困难就可以为某种类型的任务指定和修改执行策略。在执行策略中定义了任务执行的“What、Where、When、How”等方面,包括:
- 在什么(What)线程中执行任务?
- 任务按照什么(What)顺序执行(FIFO、LIFO、优先级)?
- 有多少个(How Many)任务能并发执行
- 在队列中有多少个(How Many)任务在等待执行?
- 如果系统由于过载而需要拒绝一个任务,那么应该选择哪一个(Which)任务?另外,如何(How)通知应用程序有任务被拒绝?
- 在执行一个任务之前或之后,应该进行哪些(What)动作?
每当看到下面这种形式的代码时:
new Thread(runnable).start()
并且你希望获得一种更灵活的执行策略时,请考虑使用Executor来代替Thread。
6.2.3 线程池
线程池:是指管理一组同构工作线程的资源池。
“在线程池中执行任务”比“为每个任务分配一个线程”有以下好处:
- 通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。
- 当请求到达时,工作线程已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提供了响应性。
类库中提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors中的静态工厂方法之一来创建一个线程池:
- newFixedThreadPool:固定长度的线程池
- newCachedThreadPool:可缓存的线程池,线程池的规模不受限制
- newSingleThreadExecutor:单线程的Executor,确保按照任务在队列中的顺序来串行执行
- newScheduledThreadPool:固定长度的线程池,以延迟或定时的方式来执行任务,类似于Timer
6.2.4 Executor的生命周期
由于Executor以异步方式来执行任务,因此在任何时刻,之前提交任务的状态不是立即执行的。有些任务可能已经完成,有些可能正在运行,而其他的任务可能在队列中等待执行。关闭应用程序时,可能会采取平缓的关闭形式(完成所有已启动的任务,并且不再接受任何新任务),也可能直接粗暴的关闭。
为了解决执行服务的生命周期问题,Executor扩展了Executor扩展了ExecutorService接口,添加了一些用于生命周期管理的方法。
程序清单6-7
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// ...... 其他用于任务提交的便利方法
}
ExecutorService的生命周期有3中状态:运行、关闭和已终止。
shutdown方法将执行平缓的关闭过程;shutdownNow方法将执行粗暴的关闭过程。
在ExecutorService关闭后提交的任务将由“拒绝执行处理器”来处理,它会抛弃任务,或者使得execute方法抛出一个未检查的Rejected-ExecutionException。
程序清单6-8
public class LifecycleWebServer {
private final ExecutorService exec = Executors.newCachedThreadPool();
public void start() throws IOException {
ServerSocket socket = new ServerSocket(80);
while (!exec.isShutdown()) {
try {
final Socket conn = socket.accept();
exec.execute(new Runnable() {
public void run() {
handleRequest(conn);
}
});
} catch (RejectedExecutionException e) {
if (!exec.isShutdown())
log("task submission rejected", e);
}
}
}
public void stop() {
exec.shutdown();
}
private void log(String msg, Exception e) {
Logger.getAnonymousLogger().log(Level.WARNING, msg, e);
}
void handleRequest(Socket connection) {
Request req = readRequest(connection);
if (isShutdownRequest(req))
stop();
else
dispatchRequest(req);
}
interface Request {
}
private Request readRequest(Socket s) {
return null;
}
private void dispatchRequest(Request r) {
}
private boolean isShutdownRequest(Request r) {
return false;
}
}
上面程序的LifecycleWebServer通过增加生命周期支持来扩展Web服务器的功能。可以通过两种方法来关闭Web服务器:在程序中调用stop,或者以客户端请求形式向Web服务器发送一个特定格式的HTTP请求。
6.2.5 延迟任务与周期任务
Timer类负责管理延迟任务("在100ms后执行该任务")以及周期任务("每10ms执行一次该任务")。
Timer的缺陷:
- 在执行所有定时任务时只会创建一个线程。如果某个定时任务执行过长,会影响其他任务的定时准确性。
- Timer并不捕获异常,因此当任务抛出未检查的异常时,会终止线程。
程序清单6-9
public class OutOfTime {
public static void main(String[] args) throws Exception {
Timer timer = new Timer();
timer.schedule(new ThrowTask(), 1);
SECONDS.sleep(1);
timer.schedule(new ThrowTask(), 1);
SECONDS.sleep(5);
}
static class ThrowTask extends TimerTask {
public void run() {
throw new RuntimeException();
}
}
}
该程序运行1秒就终止了。
如果要构建自己的调度任务,可以使用DelayQueue,它实现了BlockingQueue,并为ScheduledThreadPoolExecutor提供调度任务。
6.3 找出可利用的并行性
在大多数服务器应用程序中都存在一个明显的任务边界:单个客户请求。
6.3.1 示例:串行的页面渲染器
最简单的方法就是对HTML文档进行串行处理。当遇到文本标签时,将其绘制到图像缓存中。当遇到图像引用时,先通过网络获取它,然后再将其绘制到图像缓存中,这种方法可能会等待很长时间,才能显示所有的文本。
另一种方法时,先绘制文本元素,同时为图像预留出矩形的占位空间,在处理完了第一遍文本后,程序再开始下载图像,并将它们绘制到相应的占位空间中。
程序清单6-10
public abstract class SingleThreadRenderer {
void renderPage(CharSequence source) {
renderText(source);
List<ImageData> imageData = new ArrayList<ImageData>();
for (ImageInfo imageInfo : scanForImageInfo(source))
imageData.add(imageInfo.downloadImage());
for (ImageData data : imageData)
renderImage(data);
}
interface ImageData {
}
interface ImageInfo {
ImageData downloadImage();
}
abstract void renderText(CharSequence s);
abstract List<ImageInfo> scanForImageInfo(CharSequence s);
abstract void renderImage(ImageData i);
}
6.3.2 携带结果的任务Callable与Future
Executor框架使用Runnable作为其基本的任务表示形式。Runnable不能返回一个值或抛出一个受检查的异常。
许多任务实际上都是存在延迟的计算 -执行数据库查询,从网络上获取资源,或者计算一个复杂的功能。对于这些任务,Callable是一种更好的抽象。
Executor执行的任务有4个生命周期:创建、提交、开始和完成。在Executor框架中,已提交但尚未开始的任务可以取消,已经开始执行的任务,只有当他们能响应中断时,才能取消。
Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。在Future规范中包含的隐含意义是,任务的生命周期只能前进,不能后退,某个任务完成后,它将永远停留在“完成”状态上。
get方法的行为取决于任务的状态。如果任务已经完成,那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。
6.3.3 示例:使用Future实现页面渲染器
为了使页面的渲染器实现更高的并发性,首先将渲染过程分解为两个任务,一个是渲染所有的文本,另一个是下载所有的图像。
程序清单6-13 使用Future等待图像下载
public abstract class FutureRenderer {
private final ExecutorService executor = Executors.newCachedThreadPool();
void renderPage(CharSequence source) {
final List<ImageInfo> imageInfos = scanForImageInfo(source);
Callable<List<ImageData>> task =
new Callable<List<ImageData>>() {
public List<ImageData> call() {
List<ImageData> result = new ArrayList<ImageData>();
for (ImageInfo imageInfo : imageInfos)
result.add(imageInfo.downloadImage());
return result;
}
};
Future<List<ImageData>> future = executor.submit(task);
renderText(source);
try {
List<ImageData> imageData = future.get();
for (ImageData data : imageData)
renderImage(data);
} catch (InterruptedException e) {
// Re-assert the thread's interrupted status
Thread.currentThread().interrupt();
// We don't need the result, so cancel the task too
future.cancel(true);
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
interface ImageData {
}
interface ImageInfo {
ImageData downloadImage();
}
abstract void renderText(CharSequence s);
abstract List<ImageInfo> scanForImageInfo(CharSequence s);
abstract void renderImage(ImageData i);
}
在FutureRenderer中创建了一个Callable来下载所有的图像,并将其提交到一个ExecutorService。这将返回一个描述任务执行情况的Future。当主任务需要图像时,它会等待Future.get的调用结果。
FurureRenderer使得渲染文本任务与下载图像数据的任务并发地执行。当所有图像下载完后,会显示到页面上。这将提示用户体验,不仅使用户更快地看到结果,还有效利用了并行性,但我们还可以做得更好。用户不必等到所有的图像都下载完成,而希望看到每当下载完一幅图像时就立即显示出来。
6.3.4 在异构任务并行化中存在的局限
在FutureRenderer中,我们尝试并行地执行两个不同类型的任务-下载图像与渲染页面。然而,通过对异构任务进行并行化来获得重大的性能提升是困难的。
两个人可以很好地分担洗碗的工作:其中一个人负责清洗,另一个负责烘干。然而,要将不同类型的任务平均分给每个工人并不容易。当人数增加时,如果确保他们能帮忙而不是妨碍其他人工作,或者在重新分配工作时,并不是容易的事。如果没有在相似的任务之间找出细粒度的并行性,那么这种方法的好处将减少。
当在多个工人之间分配异构的任务时,还有一个问题就是各个任务的大小可能完全不同。
当在多个工人之间分解任务时,还需要一定的任务协调开销:为了使任务分解能提高性能,这种开销不能高于并行性实现的提升。
FutureRenderer使用了两个任务,其中一个复杂渲染文本,另一个负责下载图像。如果渲染文本的速度远远高于下载图像速度(可能性很大),那么程序的最终性能与串行执行相比性能差别不大,而代码却复杂了。
虽然做了很多工作来并发执行异构任务以提高并发度,但从中得到的并发性却是十分有限的。
只有当大量相互独立且同构的任务可以并发进行处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。
6.3.5 CompletionService: Executor 与 BlockingQueue
如果想Executor提交了一组计算任务,并且希望在计算完成后得到结果,那么可以保留与每个任务关联的Future,然后反复使用get,同时将参数timeout指定为0,从而通过轮询来判断任务是否可行。
这种方法虽然可行,但很繁琐,还有一种更好的方法:完成服务(CompletionService)
CompletionService将Executor与BlockingQueue的功能融合在一起。你可以将Callable任务提交给它执行,然后使用类似与队列操作的take和poll等方法来获得已完成的结果。而这些结果会在完成时被封装为Future。ExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor。
ExecutorCompletionService的实现很简单。在构造函数中创建一个BlockingQueue来保存计算完成的结果。当计算完成时,调用Future-Task中的done方法。当提交给某个任务时,该任务将首先包装为一个QueueingFuture,这是FutureTask的一个子类,然后再改写子类的done方法,并将结果放入BlockingQueue中。
程序清单6-14
private class QueueingFuture<V> extends FutureTask<V> {
QueueingFuture(Callable<V> c) { super(c); }
QueueingFuture(Runnable t, V r) { super(t, r); }
protected void done() {
completionQueue.add(this);
}
}
6.3.6 示例:使用CompletionService实现页面渲染器
可以通过CompletionServ从两个方法来提高页面渲染器的性能:缩短总时间以及提高响应性。
为每一幅图像的下载都创建一个独立任务,并从线程池中执行它们,从而将串行的下载过程转化为并行的过程:浙江减少下载的总时间。
此外,通过CompletionService中获取结果以及使每张图片在下载完成后立刻显示出来,使用户获得一个更加动态和更高响应性的用户界面。
程序清单 6-15
public abstract class Renderer {
private final ExecutorService executor;
Renderer(ExecutorService executor) {
this.executor = executor;
}
void renderPage(CharSequence source) {
final List<ImageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionService =
new ExecutorCompletionService<ImageData>(executor);
for (final ImageInfo imageInfo : info)
completionService.submit(new Callable<ImageData>() {
public ImageData call() {
return imageInfo.downloadImage();
}
});
renderText(source);
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
interface ImageData {
}
interface ImageInfo {
ImageData downloadImage();
}
abstract void renderText(CharSequence s);
abstract List<ImageInfo> scanForImageInfo(CharSequence s);
abstract void renderImage(ImageData i);
}
6.3.7 为任务设置时限
有时候,如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。
例如,某个Web应用程序从外部的广告服务器上获取广告信息,但如果该应用程序在两秒内得不到响应,那么将显示一个默认的广告,这样即使不能获得广告信息,也不会降低站点的响应性能。
在支持时间限制的Future.get中支持这种需求:当结果可用时,它立即返回,如果在指定时限内没有计算出结果,那么将抛出TimeoutException。
在使用限时任务时应注意,当这些任务超时后应该立即停止,从而避免为继续计算一个不再使用的结果而浪费计算资源。要实现这个功能,可以由人任务本身来管理它的限定事件,并且在超时后中职执行或取消任务。此时可再使用Future,如果一个限时的get方法跑出了TimeoutException,那么可以通过Future取消任务。future.cancel(true)。
RenderWithTimeBudget 给出了限时Future.get的一种典型应用。在它生成的页面中包括响应用户请求的内容以及从广告服务器上获得广告。它将获取广告的任务提交给一个Executor,然后计算剩余的广告文本内容,最后等待广告信息,知道超出限定的时间(传递给get的timeout参数的计算方法是,将指定时限减除当前时间,这可能会得到负数,但在这里与时限有关的负数都视为0)。如果get超时,那么将取消广告获取任务,并转而使用默认的广告信息。
程序清单6-16
public class RenderWithTimeBudget {
private static final Ad DEFAULT_AD = new Ad();
private static final long TIME_BUDGET = 1000;
private static final ExecutorService exec = Executors.newCachedThreadPool();
Page renderPageWithAd() throws InterruptedException {
long endNanos = System.nanoTime() + TIME_BUDGET;
Future<Ad> f = exec.submit(new FetchAdTask());
// Render the page while waiting for the ad
Page page = renderPageBody();
Ad ad;
try {
// Only wait for the remaining time budget
long timeLeft = endNanos - System.nanoTime();
ad = f.get(timeLeft, NANOSECONDS);
} catch (ExecutionException e) {
ad = DEFAULT_AD;
} catch (TimeoutException e) {
ad = DEFAULT_AD;
f.cancel(true);
}
page.setAd(ad);
return page;
}
Page renderPageBody() { return new Page(); }
static class Ad {
}
static class Page {
public void setAd(Ad ad) { }
}
static class FetchAdTask implements Callable<Ad> {
public Ad call() {
return new Ad();
}
}
}
6.3.8 示例:旅行预订门户网站
考虑这样一个旅行预定门户网站:用户输入旅行的日期和其他要求,门户网站获取并显示来自多条航线,旅店或汽车租赁公司的报价。可能会调用Web服务,访问数据库,执行一个EDI事物或其他机制。
在这种情况下,不宜让页面的响应时间受限于最慢的响应时间,而应该只显示在指定时间内收到的信息。对于没有及时响应的服务提供者,页面可以忽略或者显示提示信息。
从一个公司获取报价的过程与其他公司获得报价的过程无关,因此可以将获取报价的过程当成一个任务,从而使获得报价的过程能并发执行。创建n个任务,将其提交到线程池,保留n个Futrue,并使用限时的get方法通过Future串行地获取每一个结果,这一切都很简单,但我们还可以使用一个更简单的方法——invokeAll(invoke 援引)
下面的代码使用了支持显示的invokeAll,将多个任务提交到一个ExecutorService并获得结果。
InvokeAll方法的参数为一组任务,并返回一组Future。这两个集合有着相同的结构。
InvokeAll按照任务集合中迭代器额顺序肩所有的Future添加到返回的集合中,从而使调用者能将各个Future与其表示Callable关联起来。当所有任务都执行完毕时,或者调用线程被中断时,又或者超过指定时限时,invokeAll将返回。将超过指定时限后,任何还未完成的任务都会被取消。当invokeAll返回后,每个任务要么正常地完成,要么被取消,没有正在执行的任务,而客户端可以调用get或isCancelled来判断究竟是何种情况。
程序清单6-17
public class TimeBudget {
private static ExecutorService exec = Executors.newCachedThreadPool();
public List<TravelQuote> getRankedTravelQuotes(TravelInfo travelInfo, Set<TravelCompany> companies,
Comparator<TravelQuote> ranking, long time, TimeUnit unit)
throws InterruptedException {
List<QuoteTask> tasks = new ArrayList<QuoteTask>();
for (TravelCompany company : companies)
tasks.add(new QuoteTask(company, travelInfo));
List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);
List<TravelQuote> quotes =
new ArrayList<TravelQuote>(tasks.size());
Iterator<QuoteTask> taskIter = tasks.iterator();
for (Future<TravelQuote> f : futures) {
QuoteTask task = taskIter.next();
try {
quotes.add(f.get());
} catch (ExecutionException e) {
quotes.add(task.getFailureQuote(e.getCause()));
} catch (CancellationException e) {
quotes.add(task.getTimeoutQuote(e));
}
}
Collections.sort(quotes, ranking);
return quotes;
}
}
class QuoteTask implements Callable<TravelQuote> {
private final TravelCompany company;
private final TravelInfo travelInfo;
public QuoteTask(TravelCompany company, TravelInfo travelInfo) {
this.company = company;
this.travelInfo = travelInfo;
}
TravelQuote getFailureQuote(Throwable t) {
return null;
}
TravelQuote getTimeoutQuote(CancellationException e) {
return null;
}
public TravelQuote call() throws Exception {
return company.solicitQuote(travelInfo);
}
}
interface TravelCompany {
TravelQuote solicitQuote(TravelInfo travelInfo) throws Exception;
}
interface TravelQuote {
}
interface TravelInfo {
}
网友评论