美文网首页
并发包之CompletionService

并发包之CompletionService

作者: 破晓追风 | 来源:发表于2016-06-16 20:55 被阅读135次

    CompletionService简介

    将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。例如,CompletionService 可以用来管理异步 IO ,执行读操作的任务作为程序或系统的一部分提交,然后,当完成读操作时,会在程序的不同部分执行其他操作,执行操作的顺序可能与所请求的顺序不同。

    通常,CompletionService 依赖于一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。ExecutorCompletionService 类提供了此方法的一个实现。

    内存一致性效果:线程中向 CompletionService 提交任务之前的操作 happen-before 该任务执行的操作,后者依次 happen-before 紧跟在从对应 take() 成功返回的操作。

    CompletionService接口

    public interface CompletionService<V> {  
      Future<V> submit(Callable<V> task);  
      Future<V> submit(Runnable task, V result); 
      Future<V> take() throws InterruptedException; 
      Future<V> poll(); 
      Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
    }
    

    ExecutorCompletionService

    ExecutorCompletionService 是CompletionService的实现类,使用提供的 Executor 来执行任务的 CompletionService。此类将安排那些完成时提交的任务,把它们放置在可使用 take 访问的队列上。该类非常轻便,适合于在执行几组任务时临时使用。

    构造器

    public class ExecutorCompletionService<V> implements CompletionService<V> {    
        private final Executor executor;
        private final AbstractExecutorService aes;
        private final BlockingQueue<Future<V>> completionQueue;
        
        public ExecutorCompletionService(Executor executor) {
            if (executor == null)
              throw new NullPointerException();
            this.executor = executor;
            this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null;
            this.completionQueue = new LinkedBlockingQueue<Future<V>>();
        }
        public ExecutorCompletionService(Executor executor,                                 BlockingQueue<Future<V>> completionQueue) {  
            if (executor == null || completionQueue == null)  
                throw new NullPointerException();
            this.executor = executor;
            this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null;
            this.completionQueue = completionQueue;
        }
    }
    

    ExecutorCompletionService是Executor和BlockingQueue的结合体,任务的提交和执行都是委托给Executor来完成。

    提交任务

    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }
    
    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }
    

    当提交某个任务时,该任务首先将被包装为一个QueueingFuture,该类是FutureTask的一个子类,通过改写该子类的done方法,可以实现当任务完成时,将结果放入到BlockingQueue中。

    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }
    

    获取结果

    通过使用BlockingQueue的take或poll方法,则可以得到结果。在BlockingQueue不存在元素时,这两个操作会阻塞,一旦有结果加入,则立即返回。

    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }
    public Future<V> poll() {
        return completionQueue.poll();
    }
    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }
    

    参考

    http://xw-z1985.iteye.com/blog/1997077
    http://www.tuicool.com/articles/umyy6b

    相关文章

      网友评论

          本文标题: 并发包之CompletionService

          本文链接:https://www.haomeiwen.com/subject/llimdttx.html