任务
任务通常是一些抽象且离散的逻辑工作单元。当围绕任务执行来构建并发程序时,需要找到任务的边界,使得每个任务尽可能与其他任务独立开来,这样能够独立地在单独的线程中执行,提高并发性。
线程池
线程池是指管理一组同构工作线程的资源池,线程池与工作队列是密切相关的,在工作队列中保存了所有等待执行的任务。工作者线程的任务就是从工作队列中取出一个任务执行,执行结束返回线程池等待下一个任务。
在线程池中的线程不是根据任务临时创建的,而是事先准备好一组线程,等待任务的出现,执行完任务并不随着任务结束而销毁,而是返回线程池等待下次任务的复用。这减少了线程频繁创建和销毁的开销,同时因为线程池的大小限制,也限制住系统中最大的并发量。
为什么要使用线程池
- 构建和销毁一个线程是要与操作系统交互的,这个成本是很高的;
- 活跃的线程会消耗系统资源,尤其是内存。如果可运行的线程对于处理器的数量,那么就会闲置一些线程。大量的线程会占用大量的内存空间,也会给垃圾回收带来压力,因为线程往往生命周期很短。大量的线程竞争CPU也会带来性能的问题;
- 可创建的线程数量在各个平台上有不同的限制,如果破坏了限制,可能会抛出OOM异常而终止程序。在一定范围内,增加线程可以提高系统的吞吐量,但是物极必反,再创建更多的线程只会降低程序的执行速度,过多的创建线程可能会使系统崩溃。
任务执行策略
任务是一组逻辑工作单元,需要依附于运行线程被执行。任务的执行策略有简单粗暴的把所有任务放入单线程中顺序执行、为每个任务创建一个线程执行、使用异步任务执行框架来执行。
单线程执行简单、但非常不高效且可能会因为一个任务的错误导致整个任务阻塞。每个任务创建线程的缺点上节已经阐述。最佳选择是选择异步执行框架也就是基于线程池的任务执行框架。
Executor框架
Executor是一个简单的函数式接口:
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
它提供一种标准的方法将任务的提交和执行过程解耦,使用Runnalbe作为任务的抽象。
Executor还提供了对周期性任务的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。
Executor基于生产者-消费者模式,提交任务相当于生产者,执行任务相当于消费者。
Executor有许多静态工厂方法用来构建线程池(返回的是ExecutorService):
方法 | 描述 |
---|---|
newCachedThreadPool | 必要时创建新线程,空闲线程会被保留60秒 |
newFixedThreadPool | 创建指定数量线程的线程池,且线程一直保留 |
newSingleTheadPool | 只有一个线程的线程池,改线程顺序执行提交的每一个任务 |
newScheduledThreadPool | 用于预定执行而构建的线程池,用于替代java.util.Timer |
newSingleThreadScheduledExecutor | 用于执行预定任务的单线程池。 |
线程池是执行框架的实现的一部分,作为消费者角色,执行器将任务分发给线程池执行。
当用完一个线程池时,需要将线程池关闭,否则JVM将无法退出。关闭线程池有两种方法:
- shutdown
此方法是平缓的关闭方式,不再接受新的任务,等待以已经启动的任务结束,当所有的任务完成,线程池中的线程死亡。
- shutdownNow
暴力关闭方式,取消尚未开始的任务并试图中断正在运行的线程。
ExecutorService的生命周期有三种:运行、关闭、终止。Executor初始创建时处于运行状态,执行shutdown之后进入关闭状态,等所有任务都完成后进入终止状态。可以调用awaitTermination等待ExecutorService到达终止状态,或者使用isTerminated轮询状态。
使用线程池的一般逻辑:
- 调用Executors类的静态方法newCachedThreadPool或者newFixedThreadPoo
- 调用submit提交任务(Runnable或Callable对象)
- 如果想要取消一个任务,或者提交Callable对象,要保存好返回的Future对象
- 当不再提交新任务时,调用shutdown。
ExecutorService
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
/*
* 提交所有对象到一个Callable对象的集合中,并返回一个Future对象列表,代表所有任务的Future对象。
* 可以使用ExecutorCompletionService对结果按可获得的顺序排序。
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
/*
*提交所有对象到一个Callable对象的集合中,并返回某个已经完成的任务的结果;
* 这个结果无法知道是哪个任务返回的,返回值之后,这个任务组就结束了。
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Callable和Future
Executor以Runnable为任务抽象,但是Runnable方法不能抛出异常且没有返回值。Callable是一种更好的任务抽象,它认为主入口点将返回一个值并可能抛出一个异常。
Future表示一个任务的生命周期,并提供了方法来判断是否已经完成或取消,以及获取任务的结果或者取消任务等。Future需要和Callable合作使用才能获取返回结果。
CompletionService控制任务组
如果向Executor提交了一组任务,并且希望在任务执行完成后获得结果,那么可以保留每个任务的Future对象,然后通过调用get方法获取执行结果。CompletionService提供了更好的方法来完成此需求。
CompletionService将Executor和BlockingQueue的功能融合在一起。可以将Callable任务提交给它执行,然后使用类似队列操作的take和poll方法获取已完成的结果,而且这些结果在完成时会被封装成Future对象。
ExecutorCompletionService实现了CompletionService,并将计算部分委托给Executor。
CompletionService<ImageData> completionService = new ExecutorCompletionService<>(executor);
completionServie.submit(()->{//...});
Future<ImageInfo> f = completionService.take();
为任务设置时限
利用Future的限时get方法。
异构任务的并行问题
各个任务执行时间可能差距较大,导致并行任务的时间依赖于最久执行时间的任务。
Fork-join框架
并行任务执行框架。
要采用框架可用的一种方式完成递归计算,需要通过一个扩展RecursiveTasK<T>的类或者提供一个扩展RecursiveAction的类,再覆盖compute方法来生成和调用子任务,然后合并结果。
可完成Future
处理非阻塞调用的传统方法是使用事件监听器,为任务完成之后要出现的动作注册一个处理器,如果下一个动作也是异步的,在它之后的下一个动作会在一个不同的事件处理器中。这样虽然在功能上不会有什么问题,但是这个流程的代码可能分散到各处。
Java8的CompletableFuture类提供了一种候选方法,可完成Future可以组合。
@Test
public void testCompletableFuture() {
String res = CompletableFuture.supplyAsync(() -> "hello").thenApplyAsync((a) -> a + "world").join();
System.out.println(res);
}
网友评论