- 多线程之——ExecutorCompletionService
- ExecutorCompletionService
- 高并发编程-ExecutorCompletionService深
- Java基础-线程-ExecutorCompletionServ
- CompletionService+ExecutorComple
- 29-CompletionService和ExecutorCom
- ExecutorCompletionService 源码分析
- Java-ExecutorCompletionService
- 9-ExecutorCompletionService
- Java并发编程——ExecutorCompletionServ
在我们开发中,经常会遇到这种情况,我们起多个线程来执行,等所有的线程都执行完成后,我们需要得到个线程的执行结果来进行聚合处理。我在内部代码评审时,发现了不少这种情况。看很多同学都使用正确,但比较啰嗦,效率也不高。本文介绍一个简单处理这种情况的方法:
直接上代码:
public class ExecutorCompletionServiceTest {
@Test
public void testExecutorCompletionService() throws InterruptedException {
ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);
for(int i=0;i<10;i++) {
completionService.submit(new Calc());
}
int sum = 0;
for(int i=0;i<10;i++) {
try {
//这个地方,take方法一定会得到已经完成了的线程,如果还没有已完成的线程,则阻塞;
sum += completionService.take().get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
System.out.println(sum);
}
class Calc implements Callable<Integer> {
@Override
public Integer call() throws Exception {
Random r = new Random();
int ret = r.nextInt();
if(ret % 19 == 0) {
throw new InterruptedException("随意制造一个执行异常");
}
TimeUnit.SECONDS.sleep(Math.abs(ret % 5));
System.out.println(Thread.currentThread().getName());
return ret;
}
}
}
上面的take方法一定会得到已经完成了的线程,如果还没有已完成的线程,则阻塞;
有些同学的做法是,通过线程池的submit方法,返回一个Future,并加future加入一个list中,然后遍历这个list中的Future,并调用future的take方法得到callable的返回值。
这样做不好的地方是:遍历list中第一个future可能不是第一个完成的,而且还可能后面的future先执行完成,但是主线程还在等第一个future的结果。明显效率有缺陷。
那么ExecutorCompletionService又是如何实现的呢?
我们看看submit方法的实现:
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
上面的代码中,我们发现调用了线程池executor的execute方法,但是将callbale任务包装成了一个QueueingFuture,我们看到QueueingFuture是一个ExecutorCompletionService的内部类:
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
其中有个方法done, 这个方面中加task添加了外部类中的一个queue中。
done方法应该就是指这个任务done了(执行完成了)的回调。
这里重点看一下这个queue(completionQueue)
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
......
}
我们可以看到这个queue是一个blocking queue, 阻塞队列的特性是:
当我们要从queue中获取一个元素时,如果队列为空,则阻塞;
当我们要向queue中添加元素时,如果队列满了,则阻塞。
看到这里我们就明白了ExecutorCompletionService的原理了,当任务执行完成后,就将任务的结果封装成一个Future加到一个阻塞队列中。
当主线程调用其take方法是,如果队列为空,则阻塞,否则获取已完成的future并通过get方法获取返回值。
网友评论