美文网首页并发编程
CompletionService 源码解析

CompletionService 源码解析

作者: xiaolyuh | 来源:发表于2019-08-10 16:17 被阅读14次

    CompletionService的主要作用是:按照异步任务的完成顺序,逐个获取到已经完成的异步任务。主要实现是在ExecutorCompletionService中。

    类图

    ExecutorCompletionService.png

    核心内部类

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }
    

    CompletionService的实现中,将任务FutureTask做了扩展,实现了FutureTaskdone方法。当任务完成后会回调这个方法,这时我们在这个方法中将完成的任务放到队列中,就实现了按照异步任务完成的顺序,逐个处理任务的结果了。

    核心属性

    // 执行任务的线程池
    private final Executor executor;
    // 存放已完成的异步任务的阻塞队列,默认使用 LinkedBlockingQueue
    private final BlockingQueue<Future<V>> completionQueue;
    

    构造函数

    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
    
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }
    

    在构造函数中我们至少需要传入一个Executor线程池的实现来执行异步任务,但是建议再传入一个阻塞队列,默认的LinkedBlockingQueue是一个无界队列,有内存溢出的风险。

    submit 提交任务

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
    

    我们可以看到,在提交任务给线程池之前,我们会将任务封装成QueueingFuture任务。当该任务执行完成后会回调执行done方法,将任务放到队列。

    获取已完成的任务

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }
    
    public Future<V> poll() {
        return completionQueue.poll();
    }
    
    • take:如果没有任务,一直阻塞,直到有新任务进来
    • poll:如果没有任务返回NULL

    示例

    public class CompletionServiceTest {
    
        @Test
        public void test() throws ExecutionException, InterruptedException {
            Random random = new Random();
            ExecutorService executor = Executors.newFixedThreadPool(10);
            CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor, new LinkedBlockingQueue<>(10));
            for (int i = 0; i < 8; i++) {
                completionService.submit(() -> {
                    int time = random.nextInt(1000);
                    sleep(time);
                    System.out.println(Thread.currentThread().getName() + " 执行异步任务执行耗时: " + time);
                    return time;
                });
            }
    
            while (true) {
                System.out.println(Thread.currentThread().getName() + " 主线程获取到任务结果 " + completionService.take().get());
            }
        }
    
    
        public static void sleep(int probe) {
            try {
                Thread.sleep(probe);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    pool-1-thread-7 执行异步任务执行耗时: 153
    main 主线程获取到任务结果 153
    pool-1-thread-5 执行异步任务执行耗时: 208
    main 主线程获取到任务结果 208
    pool-1-thread-4 执行异步任务执行耗时: 242
    main 主线程获取到任务结果 242
    pool-1-thread-8 执行异步任务执行耗时: 456
    main 主线程获取到任务结果 456
    pool-1-thread-1 执行异步任务执行耗时: 567
    main 主线程获取到任务结果 567
    pool-1-thread-2 执行异步任务执行耗时: 782
    main 主线程获取到任务结果 782
    pool-1-thread-6 执行异步任务执行耗时: 796
    main 主线程获取到任务结果 796
    pool-1-thread-3 执行异步任务执行耗时: 976
    main 主线程获取到任务结果 976
    

    我的是8核机器,所以任务的结束时间一定会按照任务的结束时间排序。

    源码

    https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

    spring-boot-student-concurrent 工程

    layering-cache

    为监控而生的多级缓存框架 layering-cache这是我开源的一个多级缓存框架的实现,如果有兴趣可以看一下

    相关文章

      网友评论

        本文标题:CompletionService 源码解析

        本文链接:https://www.haomeiwen.com/subject/sqifjctx.html