美文网首页
第六章——任务执行

第六章——任务执行

作者: 你可记得叫安可 | 来源:发表于2020-10-29 21:17 被阅读0次

6.2 Executor 框架

Java 类库中,任务执行的主要抽象不是 Thread,而是 Executor

public interface Executor {
    void execute(Runnable command);
}

虽然 Executor 是个简单的接口,但它却为灵活且强大的异步任务执行框架提供了基础。
Executor 基于生产者 - 消费者模式,提交任务的操作相当于生产者(生成待完成的工作单元),执行任务的线程则相当于消费者(执行完这些工作单元)。如果要在程序中实现一个生产者 - 消费者的设计,那么最简单的方式就是使用 Executor

6.2.1 示例:基于 Executor 的 Web 服务器
public class TaskExecutionWebServer {
    private static final int NTHREADS = 100;
    private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);  // 申明一个固定大小的线程池

    public static void main(String[] args) throws IOException {
        ServerSocket socket = new ServerSocket(80);
        while (true) {
            final Socket connection = socket.accept();
            Runnable task = () -> handleRequest(connection);
            exec.execute(task);  // 每来一个请求就将其交给线程池处理
        }
    }
}

在上面 TaskExecutionWebServer 中,我们通过使用 Executor,将请求处理任务的提交与任务的实际执行解耦开来,并且只需采用另一种不同的 Executor 实现,就可以改变服务器的行为。改变 Executor 实现或配置所带来的影响要远远小于改变任务提交方式带来的影响。

我们可以很容易地将 TaskExecutionWebServer 修改为类似 ThreadPerTaskWebServer 的行为:只需使用一个为每个请求都创建新线程的 Executor

public class ThreadPerTaskExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        new Thread(command).start();
    }
}

同样,还可以编写一个 Executor 使 TaskExecutionWebServer 的行为类似于单线程的行为,即以同步的方式执行每个任务,然后再返回:

public class WithinThreadExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        command.run();
    }
}
6.2.3 线程池

类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用 Executors 中的静态工厂方法之一来创建一个线程池:

  • newFixedThreadPool,将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这是线程池的规模将不再变化(如果某个线程由于发生了未预期的 Exception 而结束,那么线程池会补充一个新的线程)。除非线程显示地调用 shutdown,否则线程将一直存在于线程池中。(执行完任务的线程一直阻塞的等待新任务到来。)

  • newCachedThreadPool,创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。

  • newSingleThreadExecutor,是一个单线程的 Executor,它创建单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。newSingleThreadExecutor 能确保依照任务在队列中的顺序来串行执行。

  • newScheduledThreadPool,创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务。

6.2.4 Executor 的生命周期

为了解决生命周期问题,Executor 扩展了 ExecutorService 接口,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法)。

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    ...
}

ExecutorService 的生命周期有 3 种状态:运行、关闭和已终止。ExecutorService 在初始创建时处于运行状态。shutdown 方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成——包括那些还未开始执行的任务。shutdownNow 方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。这部分内容可以看:ExecutorService 的 shutdown 和 shutdownNow 区别与联系

ExecutorService 关闭后提交的任务将由 “拒绝执行处理器(Rejected Execution Handler)” 来处理,它会抛弃任务,或者使得 execute 方法抛出一个未检查的 RejectedExecutionException

6.3 找出可利用的并行性

我们假设实现一个浏览器程序中的页面渲染(Page-Rendering)功能,它的作用是将 HTML 页面绘制到图像缓存中。

6.3.1 示例:串行的页面渲染器
public class SingleThreadRender {
    void renderPage(CharSequence source) {
        renderText(source);
        List<ImageData> imageData = new ArrayList<ImageData>();
        for (ImageInfo imageInfo : scanForImageInfo(source)) {
            imageData.add(imageInfo.downloadImage());
        }
        for (ImageData data : imageData) {
            renderImage(data);
        }
    }
}

从上面代码我们可以看到,程序先通过 renderText 渲染了页面中的文本,然后扫描页面中图像的信息,再串行地调用 downloadImage 将所有图片依次下载下来,最后将下载下来的图像通过 renderImage 进行渲染。

图像下载过程的大部分时间都是在等待 I/O 操作执行完成,在这期间 CPU 几乎不做任何工作。因此,这种串行执行方法没有充分利用 CPU,使得用户在看到最终页面之前要等待过长的时间。

6.3.2 携带结果的任务 Callable 与 Future

Executor 框架使用 Runnable 作为基本的任务表示形式。Runnable 是一种有很大局限的抽象,虽然 run 能写入到日志文件或者将结果放入某个共享的数据结构,但它不能返回一个值或抛出一个受检查的异常。

Callable 是一种更好的抽象:它认为主入口点(即 call)将返回一个值,并可能跑出一个异常。要使用 Callable 来表示无返回值的任务,可使用 Callable<Void>

6.3.3 示例:使用 Future 实现页面渲染器
public class FutureRenderer {
    private final ExecutorService executor = ...;
    
    void renderPage(CharSequence source) {
        final List<ImageInfo> imageInfos = scanForImageInfo(source);
        Callable<List<ImageData>> task = new Callable<>() {
            @Override
            public List<ImageData> call() throws Exception {
                List<ImageData> result = new ArrayList<ImageData>();
                for (ImageInfo imageInfo : imageInfos) {
                    result.add(imageInfo.downloadImage());
                }
                return reuslt;
            }
        };
        
        Future<List<ImageData>> future = executor.submit(task);
        renderText(source);
        
        try {
            List<ImageData> imageData = future.get();
            for (ImageData data : imageData) {
                renderImage(data);
            }
        } catch (InterruptedException e) {
            // 重新设置线程的中断状态
            Thread.currentThread().interrupt();
            // 由于不需要结果,因此取消
            future.cancel(true);
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }
}

在上面的代码中,我们将渲染文字和下载图像解耦开来了。但是下载图像仍然是串行的,而且在我们大部分的场景中,下载图像的时间其实远远大于渲染文字。因此,如果我们能将串行的文件下载做成并行的,将大大地节省时间。

6.3.5 CompletionService: Executor 与 BlockingQueue

如果向 Executor 提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的 Future,然后反复使用 get 方法,同时将参数 timeout 指定为 0,从而通过轮询来判断任务是否完成。这种方法虽然可行,但是却使用了轮询,无端地消耗了 CPU 资源。我们可以使用 CompletionService

CompletionServiceExecutorBlockingQueue 的功能融合在一起。你可以将一组 Callable 任务提交给它来执行,然后使用类似于队列操作的 takepoll 等方法来获得已完成的结果。ExecutorCompletionService 实现了 CompletionService,并将计算部分委托给一个 Executor

ExecutorCompletionService 的实现其实非常简单,可以自己阅读源码。

6.3.6 示例:使用 CompletionService 实现页面渲染器
public class Renderer {
    private final ExecutorService executor;

    Renderer(ExecutorService executor) {
        this.executor = executor;
    }

    void renderPage(CharSequence source) {
        List<ImageInfo> info = scanForImageInfo(source);
        CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor);
        for (final ImageInfo imageInfo : info) {
            // 提交一组 Callable 任务
            completionService.submit(() -> imageInfo.downloadImage());
        }
        renderText(source);
        
        try {
            for (int t = 0, n = info.size(); t < n; t++) {
                Future<ImageData> f = completionService.take();  // take 是阻塞的方法,但是提交的任务组中,一旦有任务完成,take 就会马上返回
                ImageData imageData = f.get();
                renderImage(imageData);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }
}

上面的 Renderer 为每一幅图像的下载都创建一个独立任务,并在线程池中执行它们,从而将串行的下载过程转换为并行的过程。此外,通过从 CompletionService 中获取结果以及使每张图片在下载完成后立刻显示出来,能使用户获得一个更加动态和更高响应性的用户界面。

6.3.7 为任务设置时限

有时候,如果某个任务无法在指定时间内完成,那么将不再需要它的结果,此时可以放弃这个任务。例如,某个 Web 应用程序从外部的广告服务器上获取广告信息,但如果该应用程序在两秒内得不到响应,那么将显示一个默认的广告。

在有限时间内执行任务的主要困难在于,要确保得到答案的时间不会超过限定的时间,或者在限定的时间内无法获得答案。在支持时间限制的 Future.get 中支持这种需求:当结果可用时,它将立即返回,如果在指定时限内没有计算出结果,那么将抛出 TimeException

在使用限时任务时需要注意,当这些任务超时后应该立即停止,从而避免为继续计算一个不再使用的结果而浪费计算资源。要实现这个功能,可以由任务本身来管理它的限定时间,并且在超时后中止执行或取消任务。如果一个限时的 get 方法抛出了 TimeoutException,那么可以通过 Future 来取消任务。如果编写的任务是可取消的,那么可以提前中止它,以免消耗过多的资源。

Page renderPageWithAd() throws InterruptedException {
    long endNanos = System.nanoTime() + TIME_BUDGET;
    Future<Ad> f = exec.submit(new FetchAdTask());
    // 在等待广告的同时限时页面
    Page page = renderPageBody();
    Ad ad;
    try {
        // 只等待指定的时间长度
        long timeLeft = endNanos - System.nanoTime();
        ad = f.get(timeLeft, TimeUnit.NANOSECONDS);
    } catch (ExecutionException e) {
        ad = DEFAULT_AD;
    } catch (TimeoutException e) {
        ad = DEFAULT_AD;
        f.cancel(true); // 主动取消任务
    }
    page.setAd(ad);
    return page;
}

上面代码中,当获取广告的任务 Future 超时并抛出 TimeException 后,任务 Future 需要主动调用 cancel 方法以取消任务的执行。

上面是单个 Future 超时的时候抛出 TimeoutException,并且需要开发者自己主动 cancelExecutorService 还有传入一组任务的情况。

6.3.8 示例:旅行预订门户网站

ExecutorService 支持提交多个任务并获得结果。InvokeAll 方法的参数为一组任务,并返回一组 FutureinvokeAll 按照任务集合中迭代器的顺序将所有的 Future 添加到返回的集合中,从而使调用者能将各个 Future 与其表示的 Callable 关联起来。当所有任务都执行完毕时,或者调用线程被中断时,又或者超过指定时限时,invokeAll 将返回。当超过指定时限后,任何还未完成的任务都会取消(这与单个 Future 超时不同)。当 invokeAll 返回后,每个任务要么正常的完成,要么被取消,而客户端代码可以调用 getisCancelled 来判断究竟是何种情况。

public class QuoteTask implements Callable<TravelQuote> {
    private final TravelCompany company;
    private final TravelInfo travelInfo;
    ...
    
    public TravelQuote call() throws Exception {
        return company.soliciteQuote(travelInfo);
    }
}

public class RankedTravelQuotes {
    public List<TravelQuote> getRankedTravelQuotes(
            TravelInfo travelInfo, Set<TravelCompany> companies,
            Comparator<TravelQuote> ranking, long time, TimeUnit unit)
            throws InterruptedException {
        List<QuoteTask> tasks = new ArrayList<>();
        for (TravelCompany company : companies) {
            tasks.add(new QuoteTask(company, travelInfo));
        }
        List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);  // 调用 invokeAll 来执行一组任务
        List<TravelQuote> quotes = new ArrayList<>(tasks.size());
        Iterator<QuoteTask> taskIter = tasks.iterator();
        for (Future<TravelQuote> f : futures) {
            QuoteTask task = taskIter.next();
            try {
                quotes.add(f.get());  // 阻塞地遍历 future.get()
            } catch (ExecutionException e) {
                quotes.add(task.getFailureQuote(e.getCause()));
            } catch (CancellationException e) {  // 不是抛出 TimeoutException,而是被直接取消后抛出 CancellationException
                quotes.add(task.getTimeoutQuote(e));
            }
        }
    }
}

相关文章

网友评论

      本文标题:第六章——任务执行

      本文链接:https://www.haomeiwen.com/subject/bvutvktx.html