美文网首页
多线程之——ExecutorCompletionService

多线程之——ExecutorCompletionService

作者: 阿福德 | 来源:发表于2019-01-28 18:18 被阅读0次

    在我们开发中,经常会遇到这种情况,我们起多个线程来执行,等所有的线程都执行完成后,我们需要得到个线程的执行结果来进行聚合处理。我在内部代码评审时,发现了不少这种情况。看很多同学都使用正确,但比较啰嗦,效率也不高。本文介绍一个简单处理这种情况的方法:

    直接上代码:

    public class ExecutorCompletionServiceTest {
        @Test
        public void testExecutorCompletionService() throws InterruptedException {
            ExecutorCompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);
            for(int i=0;i<10;i++) {
                completionService.submit(new Calc());
            }
            int sum = 0;
            for(int i=0;i<10;i++) {
                try {
                    //这个地方,take方法一定会得到已经完成了的线程,如果还没有已完成的线程,则阻塞;
                    sum += completionService.take().get();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(sum);
        }
        class Calc implements Callable<Integer> {
            @Override
            public Integer call() throws Exception {
                Random r = new Random();
                int ret = r.nextInt();
                if(ret % 19 == 0) {
                    throw new InterruptedException("随意制造一个执行异常");
                }
                TimeUnit.SECONDS.sleep(Math.abs(ret % 5));
                System.out.println(Thread.currentThread().getName());
                return ret;
            }
        }
    }
    

    上面的take方法一定会得到已经完成了的线程,如果还没有已完成的线程,则阻塞;
    有些同学的做法是,通过线程池的submit方法,返回一个Future,并加future加入一个list中,然后遍历这个list中的Future,并调用future的take方法得到callable的返回值。
    这样做不好的地方是:遍历list中第一个future可能不是第一个完成的,而且还可能后面的future先执行完成,但是主线程还在等第一个future的结果。明显效率有缺陷。
    那么ExecutorCompletionService又是如何实现的呢?
    我们看看submit方法的实现:

        public Future<V> submit(Callable<V> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task);
            executor.execute(new QueueingFuture(f));
            return f;
        }
    

    上面的代码中,我们发现调用了线程池executor的execute方法,但是将callbale任务包装成了一个QueueingFuture,我们看到QueueingFuture是一个ExecutorCompletionService的内部类:

        private class QueueingFuture extends FutureTask<Void> {
            QueueingFuture(RunnableFuture<V> task) {
                super(task, null);
                this.task = task;
            }
            protected void done() { completionQueue.add(task); }
            private final Future<V> task;
        }
    

    其中有个方法done, 这个方面中加task添加了外部类中的一个queue中。
    done方法应该就是指这个任务done了(执行完成了)的回调。
    这里重点看一下这个queue(completionQueue)

    public class ExecutorCompletionService<V> implements CompletionService<V> {
        private final Executor executor;
        private final AbstractExecutorService aes;
        private final BlockingQueue<Future<V>> completionQueue;
        ......
    }
    

    我们可以看到这个queue是一个blocking queue, 阻塞队列的特性是:
    当我们要从queue中获取一个元素时,如果队列为空,则阻塞;
    当我们要向queue中添加元素时,如果队列满了,则阻塞。

    看到这里我们就明白了ExecutorCompletionService的原理了,当任务执行完成后,就将任务的结果封装成一个Future加到一个阻塞队列中。
    当主线程调用其take方法是,如果队列为空,则阻塞,否则获取已完成的future并通过get方法获取返回值。

    相关文章

      网友评论

          本文标题:多线程之——ExecutorCompletionService

          本文链接:https://www.haomeiwen.com/subject/zwzajqtx.html