美文网首页
Java的五种线程池

Java的五种线程池

作者: herohua | 来源:发表于2020-02-12 12:36 被阅读0次

    1.固定数量线程池newFixedThreadPool

    创建一个可重用的固定数量的无界队列的线程池。在任何时候,最多有nThreads个活跃线程处理任务。如果在所有线程都处于活动状态时提交了其他任务,则它们将在队列中等待,直到某个线程可用为止。如果在线程池关闭之前,执行任务的过程中由于执行失败导致任何线程终止,则在执行后续任务时将使用新线程代替。线程池中的线程将一直存活直到调用shutdown方法。

    源码:
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    使用:
    private static void useFixedThreadPool() throws InterruptedException {
        final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
    
        // Returns the approximate number of threads that are actively
        System.out.println(executor.getActiveCount());
    
        IntStream.range(0, 100).boxed().forEach(i -> {
            executor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                System.out.println(Thread.currentThread().getName() + " 1============");
            });
        });
        
        System.out.println(executor.getActiveCount());
    
        TimeUnit.SECONDS.sleep(3);
    
        System.out.println(executor.getActiveCount());
    }
    
    运行结果:
    运行结果.png

    2.缓存线程池useCachedThreadPool

    创建一个线程池,该线程池根据需要创建新线程,如果有可获得的缓存线程,将重用已经构造的线程。该线程池通常能提高执行许多短期异步任务的程序的性能。对execute的调用将重用先前已经构造的线程(如果存在)。如果没有可用的现有线程,则将创建一个新线程并将其添加到池中。六十秒内未使用的线程将终止并从缓存中删除。因此,保持空闲时间足够长的池不会消耗任何资源。

    源码:
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    使用:
    private static void useCachedThreadPool() throws InterruptedException {
        final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
    
        System.out.println(executor.getActiveCount());
    
        executor.execute(() -> {
            executor.execute(() -> {
                System.out.println("============");
            });
        });
    
        System.out.println(executor.getActiveCount());
    
        IntStream.range(0, 100).boxed().forEach(i -> {
            executor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                System.out.println("============");
            });
        });
    
        System.out.println(executor.getActiveCount());
    
        TimeUnit.SECONDS.sleep(3);
    
        System.out.println(executor.getActiveCount());
    }
    
    运行结果:
    运行结果.png

    3.只有一个线程的线程池useSingleThreadPool

    创建一个只有单个线程的线程池。等价于new FinalizableDelegatedExecutorService(Executors.newFixedThreadPool(1));返回的不是ThreadPoolExecutor的实例,仅仅暴露ExecutorService的方法。

    源码:
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    使用:
    private static void useSingleThreadPool() throws InterruptedException {
        final ExecutorService executor = Executors.newSingleThreadExecutor();
    
        IntStream.range(0, 100).boxed().forEach(i -> {
            executor.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                System.out.println(Thread.currentThread().getName() + "============");
            });
        });
    
        TimeUnit.SECONDS.sleep(3);
    }
    
    运行结果:
    运行结果.png

    4.延迟线程池ScheduledThreadPoolExecutor

    源码:
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    
    使用1:
    private static void testSchedule() throws InterruptedException, ExecutionException {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
    
        ScheduledFuture<?> future = executor.schedule(() -> {
            System.out.println("=======");
        }, 2, TimeUnit.SECONDS);
    
    
        //System.out.println(future.cancel(true));
    
        System.out.println(future.get());
    }
    
    使用2:
    private static void TestScheduledAtFixedRate() {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
        AtomicLong interval = new AtomicLong(0L);
    
        // 如果period小于任务执行的时长,则period失效,当上一个任务执行完毕之后,再执行下一个任务
        ScheduledFuture<?> scheduledFuture = executor.scheduleAtFixedRate(() -> {
            long currentTime = System.currentTimeMillis();
            if (interval.get() == 0) {
                System.out.printf("The first time trigger task at %d\n", currentTime);
            } else {
                System.out.printf("The actually spend [%d]\n", currentTime - interval.get());
            }
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            interval.set(currentTime);
    
        }, 3, 2, TimeUnit.SECONDS);
    }
    
    运行结果.png
    使用3:
    private static void testScheduleWithFixedDelay() throws InterruptedException {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
        System.out.println(executor.getExecuteExistingDelayedTasksAfterShutdownPolicy());
        executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
    
        AtomicLong interval = new AtomicLong(0L);
    
        // 上一次任务结束之后的时间+delay=下一次任务开始的时间
        executor.scheduleWithFixedDelay(() -> {
    
            long currentTime = System.currentTimeMillis();
            if (interval.get() == 0) {
                System.out.printf("The first time trigger task at %d\n", currentTime);
            } else {
                System.out.printf("The actually spend [%d]\n", currentTime - interval.get());
            }
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            interval.set(currentTime);
    
        }, 3, 2, TimeUnit.SECONDS);
    
        TimeUnit.SECONDS.sleep(15);
    
        executor.shutdown();
    }
    
    运行结果.png

    5.并行线程池newWorkStealingPool

    源码:
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
    
    使用:
    public static void main(String[] args) throws InterruptedException {
    
        Optional.of(Runtime.getRuntime().availableProcessors()).ifPresent(System.out::println);
    
        final ExecutorService executorService = Executors.newWorkStealingPool();
    
        final List<Callable<String>> callableList = IntStream.range(0, 20).boxed().map(i -> (Callable<String>) () -> {
            System.out.println(Thread.currentThread().getName());
    
            TimeUnit.SECONDS.sleep(3);
    
            return "Task" + i;
        }).collect(Collectors.toList());
    
        executorService.invokeAll(callableList).stream().map(future -> {
            try {
                return future.get();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).forEach(System.out::println);
    }
    
    运行结果:
    运行结果.png

    相关文章

      网友评论

          本文标题:Java的五种线程池

          本文链接:https://www.haomeiwen.com/subject/ejbqfhtx.html