美文网首页
并发处理利器-CompletionService

并发处理利器-CompletionService

作者: 小吴酱呵呵 | 来源:发表于2017-07-22 23:13 被阅读0次
    版权声明:本文为博主原创文章,未经博主允许不得转载。
    

    摘要

    考虑这样一个需求,并发处理一批任务,每个任务都完成之后,对结果做一些后续处理,最后汇总结果。第一个方案:启动多个线程并发处理任务,并循环监控每一个线程的处理结果Futrue,直到所有Future返回为止。这个方案可行,但还需要自己监控所有的结果完成情况,是不是很乏味。来试试CompletionService吧。

    CompletionService

    先看看这个接口定义了哪些方法:

    • Future<V> submit(Callable<V> task); 提交Callable任务,并返回Future结果。
    • Future<V> submit(Runnable task, V result); 与上一个方法类似,当任务完成时返回指定的result对象。
    • Future<V> take() throws InterruptedException; 获取并移除最新完成的任务结果,该过程是阻塞的。
    • Future<V> poll(); 获取并移除最新完成的任务结果,如果没有结果则返回null。
    • Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; 指定超时时间等待获取并移除最新的任务结果。

    从获取结果的几个接口可以看到,返回的都是最新完成的结果。这就是重点所在,我们可以不需要去监控等待每一个结果(如果等待的第一个Future是最慢的,岂不是会妨碍其他先完成的任务吗),而是按结果完成顺序得到了每一个返回结果,先完成的结果可以先继续执行后续处理,这不是挺好嘛。

    ExecutorCompletionService是该接口的实现类,内部有一个线程池和BlockingQueue队列。它的实现原理其实挺简单:每个提交给ExecutorCompletionService的任务,都会被封装成一个QueueingFuture(FutureTask的子类),它重写了done()方法(该方法会在任务执行完成之后回调),将执行完成的FutureTask加入到内部队列,take()等方法其实是到内部队列中获取得到最新完成的结果FutrueTask。

    对比

    从代码层面来看看两种方案的差异:

    public void test1() {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        List<Future<String>> results = new ArrayList<Future<String>>(10);
        for(int i=0; i<10; i++) {
            Future<String> result = executorService.submit(new MyRunnable());
            results.add(result);
        }
        for(Future result : results) {
            String str = result.get();//遍历等待每一个Future, 如果第一个任务是最慢的,那么整个进度就会被拖慢
            //do something
        }
        //汇总操作
        executorService.shutdown();
    }
    
    public void test2() {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        CompletionService<String> completionService = new executorCompletionService<String>(executorService);
        for(int i=0; i<10; i++) {
            completionService.submit(new MyRunnable());
        }
        for(int i=0; i<10; i++) {
            String str = completionService.take().get();//先完成的结果先执行后续处理
            //do something
        }
        //汇总操作
        executorService.shutdown();
    }
    

    从代码量上看差异比较少,但是方案2不用单独维护一个List来保存所有的处理结果Future。重要的是,completionService因为任务结果按完成顺序陆续到来,每个任务的进度不会相互干扰,那么后续操作也不会相互影响,而第一种方案中如果第一个任务很慢,那么其他任务都要空闲等待第一个任务完成,才能继续后面的操作,这一点就明显影响到了性能。

    小心踩坑

    • 关闭线程池

    在测试方法中,为了演示而创建了线程池,方法结束时也关闭了线程池。如果你的代码与示例代码类似,那么请记住关闭线程池,否则即使方法退出之后,创建的线程也得不到回收和关闭,迟早将耗尽资源或撑爆内存。如果你的线程池是全局共享的,那么不存在这个问题,JVM关闭时会关闭线程池。

    • 错误的使用方式
    public void test3() {
        Future<String> future = completionService.submit(new MyRunnable());//这里的线程池是共享的
        String str = future.get();
        //do something
    }
    

    该场景也许不太合适使用completeService,但是这里要说明的是另一个问题,直接使用completionService.submit的返回结果Future会造成内存泄漏,因为该方式只关心获取当前返回的结果,而忽略了BlockingQueue中保存的Future对象,BlockingQueue队列会不断变大(默认实现是LinkedBlockingQueue,无界队列),迟早将内存撑爆。正确的使用方式还是通过completionService.take()来获取Future对象。

    如有什么地方描述不对,欢迎指出。

    相关文章

      网友评论

          本文标题:并发处理利器-CompletionService

          本文链接:https://www.haomeiwen.com/subject/lylpkxtx.html