问题
- 使用 Future.get()异步获取结果的时候,当异步任务没有计算成功,主线程调用 get()方法获取结果的时候会一直阻塞到线程完成为止,影响运行效率。
- 我们习惯使用线程池达到线程的复用,线程池中使用 submit 异步计算任务获取返回结果的时候,我们可能 future.get()方式获取任务的返回结果,但是 N 个线程去执行,当前任务没有执行完成,而其他任务执行完事了,我们通过 future.get()方式获取结果的时候即使其他完事了,我们还需要等待这个任务完成,影响效率,我们希望可以这样,谁先完成,谁就可以获取结果,CompleService 就干这个事的。
实现原理
我们一般使用 CompletionService 的子类 ExecutorCompletionService,其内部有一个阻塞队列,传入 N 个任务,任意任务完成之后,会将执行结果放入阻塞队列中,其他线程可以调用 take,poll 方法从阻塞队列中获取任务,任务进出队列遵循先进先出的原则。
例子
//例子打印结果会是先完成的先打印
public static void main(String[] args) throws Exception{
//初始化线程池
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// 同时运行多个任务
CompletionService<String> completionService = new ExecutorCompletionService(threadPool);
for (int i=0;i<5;i++){
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
//线程随机休眠
int time=new Random().nextInt(5);
TimeUnit.SECONDS.sleep(time);
return "当前线程名为 " + Thread.currentThread() + "执行时间" + time + "秒";
}
});
}
for (int i = 1; i <= 5; i++) {
Future<String> future = completionService.take();
System.out.println(future.get());
}
}
源码
类的继承结构图
<img src="https://4k-images.oss-cn-beijing.aliyuncs.com/m2img/2019/05/202110191449727.png" alt="image-20211019144919848" style="zoom:50%;"/>
public interface CompletionService<V> {
//提交一个Callabl类型的任务,并返回关联任务的Future
Future<V> submit(Callable<V> task);
//提交的任务类型为Runable类型,并返回关联任务的Future
Future<V> submit(Runnable task, V result);
//从内部阻塞队列中获取并移除第一个执行的任务,阻塞直到任务完成
Future<V> take() throws InterruptedException;
//从内部阻塞队列中获取并移除第一个执行完成的任务,这个获取不到会返回,不阻塞
Future<V> poll();
//从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞时间unit内获取不到就返回
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
ExecutorCompletionService
public class ExecutorCompletionService<V> implements CompletionService<V> {
//执行具体的任务
private final Executor executor;
//封装Callable或Runnable对象
private final AbstractExecutorService aes;
//队列,用来存放已经完成的任务
private final BlockingQueue<Future<V>> completionQueue;
//继承FutureTask,线程池的就讲过FutureTask,
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
//将已经完成的任务添加到队列中
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
//构造函数初始化
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
//进行类型转化
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
//初始化化一个LinkedBlockingQueue先进先出阻塞队列
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
//走ThreadPool线程池那套逻辑 ,提交一个任务,再任务完成之后调用上面的Done将自己加入队列中
executor.execute(new QueueingFuture(f));
return f;
}
。。。Runnable类型的submit类似
}
运行结果
image-20211019151653450总结
上图可以看到先执行的可以提交获取到,原理其实就是使用了一个阻塞队列存放已经完成任务的结果,应用程序可以从阻塞队列中获取先执行的任务
闲谈
本人学历是双非本科,自己本人能力有限,文章列的把自己所知道的已经网上看的总结下,自己还在持续学习中,开始写文章的原因是发现自己学完一个东西,有的时候老是发现自己学的模模糊糊的,没有自己的总结,所有选择写下来,能帮助各位更好,公众号有自己总结的一系列文章,需要的小伙伴还请关注下个人公众号点个赞,这将对我是很大的鼓励~
巨人肩膀
https://blog.csdn.net/iteye_5555/article/details/82063942
https://cloud.tencent.com/developer/article/1444259
https://blog.csdn.net/jijianshuai/article/details/75480406
网友评论