美文网首页
并发编程-CompleService 类

并发编程-CompleService 类

作者: 程序员fly | 来源:发表于2021-10-28 19:10 被阅读0次

    问题

    • 使用 Future.get()异步获取结果的时候,当异步任务没有计算成功,主线程调用 get()方法获取结果的时候会一直阻塞到线程完成为止,影响运行效率。
    • 我们习惯使用线程池达到线程的复用,线程池中使用 submit 异步计算任务获取返回结果的时候,我们可能 future.get()方式获取任务的返回结果,但是 N 个线程去执行,当前任务没有执行完成,而其他任务执行完事了,我们通过 future.get()方式获取结果的时候即使其他完事了,我们还需要等待这个任务完成,影响效率,我们希望可以这样,谁先完成,谁就可以获取结果,CompleService 就干这个事的。

    实现原理

    我们一般使用 CompletionService 的子类 ExecutorCompletionService,其内部有一个阻塞队列,传入 N 个任务,任意任务完成之后,会将执行结果放入阻塞队列中,其他线程可以调用 take,poll 方法从阻塞队列中获取任务,任务进出队列遵循先进先出的原则。

    例子

     //例子打印结果会是先完成的先打印
        public static void main(String[] args) throws Exception{
                //初始化线程池
                ExecutorService threadPool = Executors.newFixedThreadPool(5);
                // 同时运行多个任务
                CompletionService<String> completionService = new ExecutorCompletionService(threadPool);
                for (int i=0;i<5;i++){
                    completionService.submit(new Callable<String>() {
                        @Override
                        public String call() throws Exception {
                            //线程随机休眠
                            int time=new Random().nextInt(5);
                            TimeUnit.SECONDS.sleep(time);
                            return "当前线程名为 " + Thread.currentThread() + "执行时间" + time + "秒";
                        }
                    });
                }
                for (int i = 1; i <= 5; i++) {
                        Future<String> future = completionService.take();
                        System.out.println(future.get());
                }
            }
    

    源码

    类的继承结构图
    <img src="https://4k-images.oss-cn-beijing.aliyuncs.com/m2img/2019/05/202110191449727.png" alt="image-20211019144919848" style="zoom:50%;"/>

     public interface CompletionService<V> {
            //提交一个Callabl类型的任务,并返回关联任务的Future
            Future<V> submit(Callable<V> task);
            //提交的任务类型为Runable类型,并返回关联任务的Future
            Future<V> submit(Runnable task, V result);
            //从内部阻塞队列中获取并移除第一个执行的任务,阻塞直到任务完成
            Future<V> take() throws InterruptedException;
            //从内部阻塞队列中获取并移除第一个执行完成的任务,这个获取不到会返回,不阻塞
            Future<V> poll();
                //从内部阻塞队列中获取并移除第一个执行完成的任务,阻塞时间unit内获取不到就返回
            Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
        }
    

    ExecutorCompletionService

    
        public class ExecutorCompletionService<V> implements CompletionService<V> {
           //执行具体的任务
            private final Executor executor;
            //封装Callable或Runnable对象
            private final AbstractExecutorService aes;
            //队列,用来存放已经完成的任务
            private final BlockingQueue<Future<V>> completionQueue;
           //继承FutureTask,线程池的就讲过FutureTask,
           private class QueueingFuture extends FutureTask<Void> {
                QueueingFuture(RunnableFuture<V> task) {
                    super(task, null);
                    this.task = task;
                }
                //将已经完成的任务添加到队列中
                protected void done() { completionQueue.add(task); }
                private final Future<V> task;
            }
    
          //构造函数初始化
            public ExecutorCompletionService(Executor executor) {
                if (executor == null)
                    throw new NullPointerException();
                this.executor = executor;
                //进行类型转化
                this.aes = (executor instanceof AbstractExecutorService) ?
                    (AbstractExecutorService) executor : null;
                //初始化化一个LinkedBlockingQueue先进先出阻塞队列
                this.completionQueue = new LinkedBlockingQueue<Future<V>>();
            }
    
           public Future<V> submit(Callable<V> task) {
                if (task == null) throw new NullPointerException();
                RunnableFuture<V> f = newTaskFor(task);
                //走ThreadPool线程池那套逻辑 ,提交一个任务,再任务完成之后调用上面的Done将自己加入队列中
                executor.execute(new QueueingFuture(f));
                return f;
            }
            。。。Runnable类型的submit类似
         }
    
    

    运行结果

    image-20211019151653450

    总结

    上图可以看到先执行的可以提交获取到,原理其实就是使用了一个阻塞队列存放已经完成任务的结果,应用程序可以从阻塞队列中获取先执行的任务

    闲谈

    本人学历是双非本科,自己本人能力有限,文章列的把自己所知道的已经网上看的总结下,自己还在持续学习中,开始写文章的原因是发现自己学完一个东西,有的时候老是发现自己学的模模糊糊的,没有自己的总结,所有选择写下来,能帮助各位更好,公众号有自己总结的一系列文章,需要的小伙伴还请关注下个人公众号点个赞,这将对我是很大的鼓励~

    巨人肩膀

    https://blog.csdn.net/iteye_5555/article/details/82063942
    https://cloud.tencent.com/developer/article/1444259
    https://blog.csdn.net/jijianshuai/article/details/75480406

    相关文章

      网友评论

          本文标题:并发编程-CompleService 类

          本文链接:https://www.haomeiwen.com/subject/iowealtx.html