Executor
public interface Executor {
void execute(Runnable command);
}
Executor抽象提供了一种将任务提交与每个任务的运行机制(包括线程使用、调度)分离的方法,即Runnable
代表任务,execute
处理调度的逻辑;
static class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
System.out.println("ThreadPerTaskExecutor - execute");
new Thread(r).start();
}
}
static class DirectExecutor implements Executor {
public void execute(Runnable r) {
System.out.println("DirectExecutor - execute");
r.run();
}
}
class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
System.out.println("task offer ... ");
tasks.offer(new Runnable() {
public void run() {
System.out.println("SerialExecutor - execute ... ");
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
System.out.println("schedule next task ");
executor.execute(active);
}
}
}
public static void main(String[] args) {
// SerialExecutor executor = new SerialExecutor(new ThreadPerTaskExecutor());
SerialExecutor executor = new SerialExecutor(new DirectExecutor());
executor.execute(() -> { System.out.println(Thread.currentThread()); });
executor.execute(() -> { System.out.println(Thread.currentThread()); });
}
#输出
#task offer ...
#schedule next task
#DirectExecutor - execute
#SerialExecutor - execute ...
#Thread[main,5,main]
#task offer ...
#schedule next task
#DirectExecutor - execute
#SerialExecutor - execute ...
#Thread[main,5,main]
官方文档上提供的示例,演示了Executor
抽象的简单使用方法,示例想体现的除了异步同步任务外,我觉得更加重要的是任务执行和任务调度分离的思想;
ExecutorService
public interface ExecutorService extends Executor {
// 启动有序关闭,在此过程中执行以前提交的任务,但不接受任何新任务。
void shutdown();
// 尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。
List<Runnable> shutdownNow();
// executor是否被shutdown
boolean isShutdown();
// 如果所有任务都完成关闭则返回true
boolean isTerminated();
// 阻塞直到所有任务在关闭请求后完成执行,或超时发生后,或当前线程中断后;
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交任务,返回
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// 执行一个集合的任务,返回一个列表其中包含所有任务完成时的状态和结果(可以是异常结果)。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// 执行给定的任务,返回已成功完成的任务的结果(即不抛出异常),如果有的话。在正常或异常返回时,未完成的任务将被取消。
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService提供了对每个Executor
跟踪、管理一个或者多个异步任务的进度,也可以关闭服务来终止service接收新任务和回收资源;
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // 禁止提交新任务
try {
//等待现有任务终止
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // 取消当前正在执行的任务
// 等待一段时间,等待任务响应被取消
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// 如果当前线程处于中断状态,重试shutdown
pool.shutdownNow();
// 保存中断状态
Thread.currentThread().interrupt();
}
}
文档中演示了一个终止服务的示例,分两个阶段关闭一个 ExecutorService
,首先需要调用shutdown
来拒绝传入的任务,然后调用shutdownNow
(如果需要的话)来取消任何延迟的任务;
CompletionService
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
CompletionService旨在提供一个异步任务产生执行和已完成任务结果的解耦服务,即会有一个队列储存已完成的任务结果的合集,任务的提交和执行不会阻塞获取结果操作;
ExecutorCompletionService
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
...
}
ExecutorCompletionService是CompletionService的实现类,内部有个阻塞队列储存的是完成任务的结果;
// 注意这个queue的默认长度为Integer.MAX_VALUE
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
构造器默认使用LinkedBlockingQueue
,那么很容易就联想到该阻塞队列的特性:
- 是有界的双端队列
- 提供阻塞和非阻塞方法获取已完成的任务
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
- 当完成任务的队列满了,新完成的任务结果会被抛弃
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
// FutureTask提供的钩子
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
注意这里的done
在FutureTask
完成或者异常或者cancel
都会被调用;
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
若构造器中传进来的Executor
是AbstractExecutorService
的子类,那么newTaskFor
就会交由子类来决定FutureTask
的类型,达到定制扩展的效果;
void solve(Executor e, Collection<Callable<Result>> solvers)
throws InterruptedException, ExecutionException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
for (Callable<Result> s : solvers)
ecs.submit(s);
int n = solvers.size();
for (int i = 0; i < n; ++i) {
Result r = ecs.take().get(); // 这里会抛出中断异常
if (r != null)
use(r);
}
}
官方提供的用法示例,第一个比较简单,遍历任务集合中每个任务执行后获取结果,是一种顺序执行的过程,执行 -> 获取结果 -> 执行 -> 获取结果,当然执行任务过程可以是异步的,如果遇到中断异常会停止任务;
网友评论