美文网首页并发编程
CompletionService 源码解析

CompletionService 源码解析

作者: xiaolyuh | 来源:发表于2019-08-10 16:17 被阅读14次

CompletionService的主要作用是:按照异步任务的完成顺序,逐个获取到已经完成的异步任务。主要实现是在ExecutorCompletionService中。

类图

ExecutorCompletionService.png

核心内部类

private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
        super(task, null);
        this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
}

CompletionService的实现中,将任务FutureTask做了扩展,实现了FutureTaskdone方法。当任务完成后会回调这个方法,这时我们在这个方法中将完成的任务放到队列中,就实现了按照异步任务完成的顺序,逐个处理任务的结果了。

核心属性

// 执行任务的线程池
private final Executor executor;
// 存放已完成的异步任务的阻塞队列,默认使用 LinkedBlockingQueue
private final BlockingQueue<Future<V>> completionQueue;

构造函数

public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

public ExecutorCompletionService(Executor executor,
                                 BlockingQueue<Future<V>> completionQueue) {
    if (executor == null || completionQueue == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = completionQueue;
}

在构造函数中我们至少需要传入一个Executor线程池的实现来执行异步任务,但是建议再传入一个阻塞队列,默认的LinkedBlockingQueue是一个无界队列,有内存溢出的风险。

submit 提交任务

public Future<V> submit(Callable<V> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<V> f = newTaskFor(task);
    executor.execute(new QueueingFuture(f));
    return f;
}

我们可以看到,在提交任务给线程池之前,我们会将任务封装成QueueingFuture任务。当该任务执行完成后会回调执行done方法,将任务放到队列。

获取已完成的任务

public Future<V> take() throws InterruptedException {
    return completionQueue.take();
}

public Future<V> poll() {
    return completionQueue.poll();
}
  • take:如果没有任务,一直阻塞,直到有新任务进来
  • poll:如果没有任务返回NULL

示例

public class CompletionServiceTest {

    @Test
    public void test() throws ExecutionException, InterruptedException {
        Random random = new Random();
        ExecutorService executor = Executors.newFixedThreadPool(10);
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor, new LinkedBlockingQueue<>(10));
        for (int i = 0; i < 8; i++) {
            completionService.submit(() -> {
                int time = random.nextInt(1000);
                sleep(time);
                System.out.println(Thread.currentThread().getName() + " 执行异步任务执行耗时: " + time);
                return time;
            });
        }

        while (true) {
            System.out.println(Thread.currentThread().getName() + " 主线程获取到任务结果 " + completionService.take().get());
        }
    }


    public static void sleep(int probe) {
        try {
            Thread.sleep(probe);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
pool-1-thread-7 执行异步任务执行耗时: 153
main 主线程获取到任务结果 153
pool-1-thread-5 执行异步任务执行耗时: 208
main 主线程获取到任务结果 208
pool-1-thread-4 执行异步任务执行耗时: 242
main 主线程获取到任务结果 242
pool-1-thread-8 执行异步任务执行耗时: 456
main 主线程获取到任务结果 456
pool-1-thread-1 执行异步任务执行耗时: 567
main 主线程获取到任务结果 567
pool-1-thread-2 执行异步任务执行耗时: 782
main 主线程获取到任务结果 782
pool-1-thread-6 执行异步任务执行耗时: 796
main 主线程获取到任务结果 796
pool-1-thread-3 执行异步任务执行耗时: 976
main 主线程获取到任务结果 976

我的是8核机器,所以任务的结束时间一定会按照任务的结束时间排序。

源码

https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-concurrent 工程

layering-cache

为监控而生的多级缓存框架 layering-cache这是我开源的一个多级缓存框架的实现,如果有兴趣可以看一下

相关文章

网友评论

    本文标题:CompletionService 源码解析

    本文链接:https://www.haomeiwen.com/subject/sqifjctx.html