美文网首页
12.线程池-5

12.线程池-5

作者: nieniemin | 来源:发表于2021-08-08 18:05 被阅读0次

CompletionService介绍

CompletionService

通过上面的截图,我们大概可以猜到CompletionService跟线程池的队列有关。java.util.concurrent.CompletionService 是对 ExecutorService 的一个功能增强封装,优化了获取异步操作结果的接口。CompletionService接口功能是以异步的方式将执行任务和处理任务分别执行,避免阻塞。
在上一节介绍execute和submit的区别的时候有提到submit有返回值,可以返回Future。当我们执行Future的get方法获取结果时,可能拿到的Future并不是第一个执行完成的Callable的Future,就会进行阻塞,可能会导致严重的性能损耗问题。而CompletionService正是为了解决这个问题,它是Java8的新增接口,它的实现类是ExecutorCompletionService。CompletionService会根据线程池中Task的执行结果按执行完成的先后顺序排序,任务先完成的可优先获取到。下面通过代码来分别对别下:
直接get获取:

public class CompletionServiceTest {

    private static final  int PROCESS_SIZE = Runtime.getRuntime().availableProcessors();

    private static final  int CORE_POOL_SIZE = PROCESS_SIZE;

    private static final  int KEEP_ALIVE_TIME = 5;

    private static final  int WORK_QUEUE_SIZE = PROCESS_SIZE;
    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE,
                PROCESS_SIZE * 2,
                KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(WORK_QUEUE_SIZE));

        List<Future<String>> futureList = new ArrayList<>();
        
        // 两个任务,分别休眠4s,1s
        futureList.add(threadPoolExecutor.submit(new MyCallable(4_000)));
        futureList.add(threadPoolExecutor.submit(new MyCallable(1_000)));

        for (Future future : futureList) {
            try {
                // get方法是阻塞的,如果线程池中某一个任务执行较长,会阻塞其他已经完成的任务获取结果.
                System.out.println("get方法阻塞:" + future.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        threadPoolExecutor.shutdown();
    }

    /**
     * call():休眠sleepTime,任务开始以及结束打印下内容.
     * 
     */
     static class MyCallable implements Callable<String> {

        private int sleepTime;
        public MyCallable(int sleepTime) {
            this.sleepTime = sleepTime;
        }

        @Override
        public String call() throws Exception {
            System.out.println(Thread.currentThread().getName() + "  start,sleepTime:" + sleepTime);
            Thread.sleep(sleepTime);
            return "end.当前线程名字:" + Thread.currentThread().getName() + ",休眠了:" + sleepTime;
        }
    }
}
**************************************************************
pool-1-thread-2  start,sleepTime:1000
pool-1-thread-1  start,sleepTime:4000
get方法阻塞:end.当前线程名字:pool-1-thread-1,休眠了:4000
get方法阻塞:end.当前线程名字:pool-1-thread-2,休眠了:1000

使用CompletionService:

public class CompletionServiceTest {

    private static final int PROCESS_SIZE = Runtime.getRuntime().availableProcessors();

    private static final int CORE_POOL_SIZE = PROCESS_SIZE;

    private static final int KEEP_ALIVE_TIME = 5;

    private static final int WORK_QUEUE_SIZE = PROCESS_SIZE;

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ExecutorService threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE,
                PROCESS_SIZE * 2,
                KEEP_ALIVE_TIME, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(WORK_QUEUE_SIZE));

        CompletionService completionService = new ExecutorCompletionService(threadPoolExecutor);
        //提交Callable任务
        completionService.submit(new MyCallable(4_000));
        completionService.submit(new MyCallable(1_000));
        //获取future结果,不会阻塞
        Future<String> pollFuture = completionService.poll();
        //call方法中会休眠1s,这里因为没有执行完成的Callable,所以返回null
        System.out.println("没有执行完毕:" + pollFuture);


        //获取future结果,最多等待1秒,不会阻塞
     /*   Future<Integer> pollTimeOutFuture = completionService.poll(1, TimeUnit.SECONDS);*/

        //通过take获取Future结果,此方法会阻塞
        for (int i = 0; i < 2; i++) {
            System.out.println("--------" + completionService.take().get());
        }

        threadPoolExecutor.shutdown();
    }

    /**
     * call():休眠sleepTime,任务结束打印结果.
     */
    static class MyCallable implements Callable<String> {

        private int sleepTime;

        public MyCallable(int sleepTime) {
            this.sleepTime = sleepTime;
        }

        @Override
        public String call() throws Exception {
            System.out.println(Thread.currentThread().getName() + "  start,sleepTime:" + sleepTime);
            Thread.sleep(sleepTime);
            return "end.当前线程名字:" + Thread.currentThread().getName() + ",休眠了:" + sleepTime;
        }
    }
}
*********************************************************
没有执行完毕:null
pool-1-thread-1  start,sleepTime:4000
pool-1-thread-2  start,sleepTime:1000
--------end.当前线程名字:pool-1-thread-2,休眠了:1000
--------end.当前线程名字:pool-1-thread-1,休眠了:4000

上面代码可以清晰看出使用CompletionService优点,CompletionService没有采取依次遍历 Future 的方式,而是在中间加上了一个结果队列,任务完成后马上将结果放入队列,那么从队列中取到的就是最早完成的结果。如果队列为空,那么 take() 方法会阻塞直到队列中出现结果为止。此外 CompletionService 还提供一个 poll()方法,返回值与 take() 方法一样,不同之处在于它不会阻塞,如果队列为空则立刻返回 null。

相关文章

  • 12.线程池-5

    CompletionService介绍 通过上面的截图,我们大概可以猜到CompletionService跟线程池...

  • 线程池

    1、为什么要使用线程池2、线程池的工作原理3、线程池参数4、阻塞队列5、饱和策略6、向线程池提交任务7、线程池的状...

  • ExecutorService

    ExecutorService扩展和实现Executor。 java 线程池的5种状态 RUNNING 线程池...

  • JAVA并发问题-线程池ThreadPool

    JAVA中提供的线程池 Executors工厂类 Executors工具类提供了5种线程池的创建方法 每种线程池都...

  • Java多线程之线程池深入讲解

    1 线程池介绍 1.1 线程池概念 Sun在Java5中,对Java线程的类库做了大量的扩展,其中线程池就是Jav...

  • JDK多任务执行框架

    1、为什么要使用线程池?2、线程池有什么作用?3、说说几种常见的线程池及使用场景。4、线程池都有哪几种工作队列?5...

  • 线程池面试题

    1、为什么要使用线程池?2、线程池有什么作用?3、说说几种常见的线程池及使用场景。4、线程池都有哪几种工作队列?5...

  • JUC线程池(4):线程池状态

    我们都知道,线程有5种状态:新建状态,就绪状态,运行状态,阻塞状态,死亡状态。线程池也有5种状态;然而,线程池不同...

  • using

    JAVA md5 将json转化为java对象 转化xmlToJSONjson转xml 遍历map: 线程池线程池...

  • 线程池-5

    tryTerminate()这个方法又有点晦涩了,也分解成3个部分: 1、这个还挺好理解的,即如果是以下3个条件则...

网友评论

      本文标题:12.线程池-5

      本文链接:https://www.haomeiwen.com/subject/eteovltx.html