美文网首页
22.线程池的使用

22.线程池的使用

作者: 0x70e8 | 来源:发表于2018-08-08 10:16 被阅读0次

    [TOC]

    执行器(Executor)层次

    图片来自参考资料1

    Executor

    Executor即为执行器,是执行器框架的顶层接口,定义最为基础的框架功能:执行任务。

    • 接口定义如下:
    public interface Executor {
    
        /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         *
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);
    }
    
    • Runnable:任务抽象

    执行器接口定义了执行器的任务执行模型,指定了任务的抽象为Runnable接口。Runnable接口:

    @FunctionalInterface
    public interface Runnable {
        public abstract void run();
    }
    

    Runnable是一个函数式接口,内部唯一抽象方法run方法无异常抛出,无返回值。Thread类是Runnable接口的实现类,所以可以把一个线程抽象成一个任务提交给执行器来执行,在这种应用场景下,Thread实例不是作为一个线程,而是作为一个任务的封装,因为只是调用其run方法,不会触及线程的start()。

    无返回值且无异常抛出的run方法给任务模型带来局限性,即无法抛出异常也没有返回值。但是有一个wrap类FutureTask,能够把Callable接口包装成Runnable和Future,使得执行器接口的任务可以拥有Callable和Future的特性。(Callable和Future另行笔记:21.Future和Callable.md)

    ExecutorService

    ExecutorService接口继承Executor接口,是执行器框架的估价接口,它定义了任务的提交、取消、执行器的关闭等方法。是为执行器的最终执行“提供服务”的接口。

    • 接口定义:
    public interface ExecutorService extends Executor {
    
        /**
         * 启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。 
         * 如果已经关闭,调用没有其他影响。
         * 此方法不会等待先前提交的任务完成执行。 
         * 使用awaitTermination来做到这一点。{@link #awaitTermination awaitTermination}
         */
        void shutdown();
    
        /**
         * 阻止等待任务启动并试图停止当前正在执行的任务,返回等待执行的任务list。
         * 此方法不会等待先前提交的任务完成执行。 
         * 使用awaitTermination来做到这一点。{@link #awaitTermination awaitTermination}
         */
        List<Runnable> shutdownNow();
    
        boolean isShutdown();
    
        /**
         * 以shutdown为前提
         * Returns {@code true} if all tasks have completed following shut down.
         * Note that {@code isTerminated} is never {@code true} unless
         * either {@code shutdown} or {@code shutdownNow} was called first.
         *
         * @return {@code true} if all tasks have completed following shut down
         */
        boolean isTerminated();
    
        /**
         * Blocks until all tasks have completed execution after a shutdown
         * request, or the timeout occurs, or the current thread is
         * interrupted, whichever happens first.
         */
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
    
        /**
         * Submits a value-returning task for execution and returns a
         * Future representing the pending results of the task. The
         * Future's {@code get} method will return the task's result upon
         * successful completion.
         */
        <T> Future<T> submit(Callable<T> task);
    
        /**
         * Submits a Runnable task for execution and returns a Future
         * representing that task. The Future's {@code get} method will
         * return the given result upon successful completion.
         */
        <T> Future<T> submit(Runnable task, T result);
    
        /**
         * Submits a Runnable task for execution and returns a Future
         * representing that task. The Future's {@code get} method will
         * return {@code null} upon <em>successful</em> completion.
         */
        Future<?> submit(Runnable task);
    
        /**
         * Executes the given tasks, returning a list of Futures holding
         * their status and results when all complete.
         * {@link Future#isDone} is {@code true} for each
         * element of the returned list.
         * Note that a <em>completed</em> task could have
         * terminated either normally or by throwing an exception.
         * The results of this method are undefined if the given
         * collection is modified while this operation is in progress.
         */
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    
        /**
         * Executes the given tasks, returning a list of Futures holding
         * their status and results
         * when all complete or the timeout expires, whichever happens first.
         * {@link Future#isDone} is {@code true} for each
         * element of the returned list.
         */
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
    
        /**
         * Executes the given tasks, returning the result
         * of one that has completed successfully (i.e., without throwing
         * an exception), if any do. Upon normal or exceptional return,
         * tasks that have not completed are cancelled.
         * The results of this method are undefined if the given
         * collection is modified while this operation is in progress.
         */
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    
        /**
         * Executes the given tasks, returning the result
         * of one that has completed successfully (i.e., without throwing
         * an exception), if any do before the given timeout elapses.
         * Upon normal or exceptional return, tasks that have not
         * completed are cancelled.
         * The results of this method are undefined if the given
         * collection is modified while this operation is in progress.
         */
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    AbstractExecutorService

    抽象类AbstractExecutorService为ExecutorService提供默认公共实现。

    ScheduledExecutorService

    继承ExcutorService,是一个支持“定时”调度任务的ExecutorService。

        // 延时执行
        public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
    
        // 延时执行,Callable版本
        public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
        
        // 定期执行,在初始时延结束后,以period为周期执行
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
        
        // 定期执行,在初始的延迟结束后周期运行任务,在一次调用完成和下一次调用开始之间有delay的延迟,这个版本会等待一个线程执行完成。
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
    

    ThreadPoolExecutor

    是执行器框架中最常用的执行器:线程池。在其之下还扩展了其他的执行器。如:ScheduledThreadPoolExecutor。线程池是执行器框架的功能实现者,池化的线程作为任务的执行者存在于执行器内部来完成任务的最终执行,执行器控制内部的线程(执行者)的生命周期以及任务的调度策略。

    线程池的简单理解就是将线程创建好,缓存在内存中被多个任务复用,减少线程创建和销毁的开销。和自行动手创建线程执行任务相比,线程池负责线程的创建和任务的分发,以及控制线程的生命周期。

    Executors

    Executors是执行器框架的一个工具类,提供了使用执行器的一些静态方法,如返回某个类型的线程池实例等,在《阿里巴巴java开发手册》中,并不推荐使用Executors来创建线程池,因为掩盖了创建线程池的细节,失去对线程池细节的配置。如newCachedThreadPool()内部设置的最大线程数为Integer.MAX_VALUE,这可能会使得OOM发生。

    线程池

    线程池基本属性

    • private volatile long keepAliveTime

    corePoolSize之外的线程的空转存活时间(默认纳秒)

    • private volatile boolean allowCoreThreadTimeOut;

    false时,core线程始终保持存活,否则使用keepAliveTime来控制core线程的空转时间,默认false;

    • private volatile int corePoolSize;

    最小线程数,如果开启了allowCoreThreadTimeOut,最小线程可能会是0

    • private volatile int maximumPoolSize;

    线程池中允许的最大线程数,实际数量还需依赖底层限制。线程池的阻塞队列满了之后,如果还有任务提交,如果当前的线程数小于maximumPoolSize,则会新建线程来执行任务。 如果使用的是无界队列,该参数也就没有什么效果了。

    • private final BlockingQueue<Runnable> workQueue;

    任务队列,当任务提交后无法为其分配线程执行时,会暂存到阻塞的任务队列中,当任务队列满了(如果有界),就会创建新的线程来hold任务(如果没有达到maximumPoolSize)。需要注意使用的是有界队列还是无界队列。ArrayBlockingQueue(需要指定容量创建)和LinkedBlockingQueue(默认Integer.MAX_VALUE)是有界队列,PriorityBlockingQueue(默认容量11,会自动扩容的二叉堆)和SynchronousQueue(不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作)是无界队列。

    • 状态:

    RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED

    状态含义和状态如何迁移:

        /* The runState provides the main lifecycle control, taking on values:
         *
         *   RUNNING:  Accept new tasks and process queued tasks
         *   SHUTDOWN: Don't accept new tasks, but process queued tasks
         *   STOP:     Don't accept new tasks, don't process queued tasks,
         *             and interrupt in-progress tasks
         *   TIDYING:  All tasks have terminated, workerCount is zero,
         *             the thread transitioning to state TIDYING
         *             will run the terminated() hook method
         *   TERMINATED: terminated() has completed
         *
         * The numerical order among these values matters, to allow
         * ordered comparisons. The runState monotonically increases over
         * time, but need not hit each state. The transitions are:
         *
         * RUNNING -> SHUTDOWN
         *    On invocation of shutdown(), perhaps implicitly in finalize()
         * (RUNNING or SHUTDOWN) -> STOP
         *    On invocation of shutdownNow()
         * SHUTDOWN -> TIDYING
         *    When both queue and pool are empty
         * STOP -> TIDYING
         *    When pool is empty
         * TIDYING -> TERMINATED
         *    When the terminated() hook method has completed
         *
         * Threads waiting in awaitTermination() will return when the
         * state reaches TERMINATED.
         */
    
    • 工作者数量:workerCount

    workerCount和state共用一个int型变量,前者使用低29位,后者使用高3位。

    线程池工作逻辑:

    • 任务提交到线程池,如果线程池中的线程数小于corePoolSize,就创建新线程来执行任务
    • 当任务到达时线程数达到corePoolSize,任务被放入woreQueue中等待线程空闲,当线程执行完一个任务,会取队列中的任务;
    • 当任务到达时队列已满,如果当前线程数小于Maxsize,就创建一个新线程处理新来的任务,如果当前线程数已经达到上限,就使用指定的拒绝任务处理器处理任务。

    线程池创建

    线程池主要有常规的任务调度线程池和定时(延迟)任务的线程池两种,前者对应ThreadPoolExecutor类和ForkJoinPool(java8),后者对应的是ScheduledThreadPoolExecutor。

    使用构造器(推荐)

    ThreadPoolExecutor
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    前几个参数的含义前文已经说明,threadFactory是指定线程创建工厂,handler是当任务工作队列已经满了且线程数已经达到上限时的任务拒绝处理器。这个构造器有几个重载的构造器:

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue);
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory);
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler);
    
    • Test
        @Test
        public void testThreadPool() {
            ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
            pool.prestartAllCoreThreads();// 预先启动所有核心线程,让其空转等待
    //      pool.prestartCoreThread(); // 预先启动一个核心线程
            System.out.println(pool.getCorePoolSize());
            System.out.println(pool.getKeepAliveTime(TimeUnit.SECONDS));
            System.out.println(pool.getActiveCount());
            System.out.println(pool.getLargestPoolSize());
            System.out.println(pool.getCompletedTaskCount());
            System.out.println(pool.getPoolSize());
            System.out.println(pool.getTaskCount());
            System.out.println(pool.getQueue().size());
            pool.shutdown(); // 关闭线程池
        }
    // 5
    // 60
    // 0
    // 5
    // 0
    // 5
    // 0
    // 0
    
    ScheduledThreadPoolExecutor

    ScheduledThreadPoolExecutor继承了ThreadPoolExecutor并实现ScheduledExecutorService接口,为一个提供周期任务执行的线程池。
    构造器:

        public ScheduledThreadPoolExecutor(int corePoolSize,
                                           ThreadFactory threadFactory,
                                           RejectedExecutionHandler handler) {
            // 基于父类ThreadPoolExecutor的构造器,最大poolsize是Integer.MAX_VALUE,
            // 使用的任务队列是DelayedWorkQueue
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue(), threadFactory, handler);
        }
        /*几个重载的构造器,重载的风格类似于父类ThreadPoolExecutor*/
        public ScheduledThreadPoolExecutor(int corePoolSize);
        public ScheduledThreadPoolExecutor(int corePoolSize,
                                           ThreadFactory threadFactory);
        public ScheduledThreadPoolExecutor(int corePoolSize,
                                           RejectedExecutionHandler handler);
    
    
    • Test
        @Test
        public void testSchedulePool() {
            ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
            pool.shutdown();
        }
    
    ForkJoinPool

    暂略.

    使用Executors工具类

    Executors类提供了众多public static工厂方法来创建线程池:

    固定线程数的线程池

    @since 1.5

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    // 重载版本
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory);
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    // 重载版本
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory);
    
    支持任务密取的线程池

    (ForkJoinPool) @since 1.8

    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
    // 并行数为处理器数
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    } 
    
    CachedThreadPool

    初始化是线程池线程数为0,使用时按需创建线程

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    // 重载
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory);
    
    ScheduledThreadPoolExecutor

    执行周期性任务的线程池

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
    // 重载
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) ;
    
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    // 重载
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) ;
    

    注: ThreadFactory threadFactory是创建线程的工厂接口,通过此参数可以自定义线程创建的过程。

    线程池任务执行

    应用思路

    • 创建线程池 (使用构造器创建线程池
    • submit()提交任务 或 execute()执行任务
    • 关闭线程池

    submit是AbstractExecutorService提供的模板方法,其内部执行了Executor接口的execute方法,实际上execute是交给子类类来实现的。execute方法在ThreadPoolExecutor中有实现。

    Test

    固定线程数线程池
        @Test
        public void testFixCountPool() {
            // 固定线程数为10,任务队列100
            ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
            for (int i = 0; i < 100; i++) {
                Runnable r = () -> {
                    System.out.println(Thread.currentThread().getName() + ",task start");
                    System.out.println(Thread.currentThread().getName() + ",task  end");
                    // latch.countDown();
                };
                pool.submit(r);
            }
            // 等待任务完成后再shutdown,还可以使用CountDownLatch来完成此需求
            while (pool.getCompletedTaskCount() < 100) {
            }
            pool.shutdown();
        }
        @Test
        public void testPool2() throws InterruptedException, ExecutionException {
            ExecutorService pool = new ThreadPoolExecutor(1,1,0,TimeUnit.SECONDS,new LinkedBlockingQueue<>(20));
            // 使用有返回结果的Callable任务
            FutureTask<Integer> task = new FutureTask<Integer>(() -> {
                System.out.println("This is a callable");
                return 10;
            });
            pool.submit(task);
            // 阻塞等待结果
            System.out.println(task.get());
            // pool支持Callable任务
            Future<Integer> future = pool.submit(() -> {
                System.out.println("this is a pure callable");
                return 100;
            });
            System.out.println(future.get());
            pool.shutdown();
        }
    
    cachedPool

    CachedThreadPool初始化时不会创建线程,在有任务提交的时候回按需创建线程,线程有idle有效期,过期会自动销毁。

    ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
    
    schedulePool
        @Test
        public void testSchedulePool() throws InterruptedException, ExecutionException {
            ScheduledExecutorService pool = new ScheduledThreadPoolExecutor(1);
            // 延迟三秒执行
            ScheduledFuture<String> future = pool.schedule(() -> {
                System.out.println("hhh");
                return "end";
            }, 3, TimeUnit.SECONDS);
            System.out.println(System.currentTimeMillis());
            System.out.println(future.get());
            System.out.println(System.currentTimeMillis());
            pool.shutdowm();
        }
        // 指定周期执行
        @Test
        public void testSchedulePool2() throws InterruptedException, ExecutionException, TimeoutException {
            ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();
            pool.scheduleAtFixedRate(() -> {
                System.out.println("hhh");
            }, 2, 1, TimeUnit.SECONDS);
            TimeUnit.SECONDS.sleep(10);
            pool.shutdown();
            pool.awaitTermination(3,TimeUnit.SECONDS);
            
        }
        // 指定延迟的周期执行
        @Test
        public void testScheduledPool() throws InterruptedException {
            CountDownLatch latch = new CountDownLatch(10);
            ScheduledThreadPoolExecutor pool = new ScheduledThreadPoolExecutor(1);
            // 1秒后执行任务,任务完成后按间隔两秒再次执行任务
            pool.scheduleWithFixedDelay(() -> {
                try {
                    System.out.println("hi");
                    // 任务休眠三秒,差不多任务是5秒执行一次
                    TimeUnit.SECONDS.sleep(3);
                    latch.countDown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, 1, 2, TimeUnit.SECONDS);
            latch.await();
            pool.shutdown();
        }
    
    invokeAll等
        @Test
        public void testInvokeAny() {
            ExecutorService executor = Executors.newFixedThreadPool(10);
            List<Callable<String>> tasks = new ArrayList<>(100);
            Callable<String> task = () -> {
                Thread.sleep(new Random().nextInt(3000));
                return Thread.currentThread().getName();
            };
            for (int i = 0; i < 100; i++) {
                tasks.add(task);
            }
            try {
                // 返回一个完成的任务结果,没完成的其他的都被取消
                String res = executor.invokeAny(tasks);
                System.out.println(res);
                executor.shutdown();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        /**
         * invokeAll
         */
        @Test
        public void testInvokeAll() {
            ExecutorService executor = Executors.newFixedThreadPool(10);
            List<Callable<Integer>> tasks = new ArrayList<>(100);
            for (int i = 0; i < 100; i++) {
                final int v = i;
                Callable<Integer> task = () -> {
                    Thread.sleep(100);
                    return v;
                };
                tasks.add(task);
            }
            try {
                // 所有任务完成后会返回,结果是按提交顺序存到list中的
                List<Future<Integer>> res = executor.invokeAll(tasks);
                // 遍历
                res.stream().forEach((Future<Integer> f) -> {
                    try {
                        System.out.println(f.get());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
                executor.shutdown();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 按照获得结果的顺序存储结果
         */
        @Test
        public void testInvokeAll2() {
            ExecutorService executor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
            List<Callable<Integer>> tasks = new ArrayList<>(100);
            for (int i = 0; i < 100; i++) {
                final int v = i;
                Callable<Integer> task = () -> {
                    Thread.sleep(100);
                    return v;
                };
                tasks.add(task);
            }
            try {
                ExecutorCompletionService<Integer> service = new ExecutorCompletionService<>(executor);
                for (Callable<Integer> task : tasks)
                    service.submit(task);
                for (int i = 0; i < tasks.size(); i++) {
                    System.out.println(service.take().get());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    

    小结

    向实例化之后的pool提交任务,需要返回值的task使用Callable接口定义,否则使用Runnable。

    线程池关闭

    API提供了两种关闭池的方式,shutdown和shutdownNow,两者执行后,线程池都会拒绝新任务的提交,前者会等待已启动的任务执行结束,队列中的任务执行会启动,但是线程池不会阻塞等待任务完成,后者是会尝试中断启动的任务、不会启动队列中的任务,暴力关闭pool。

    shutdown

        @Test
        public void testShutdown() throws InterruptedException {
            ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
            // 让主线程等待任务完成(防止jvm退出,另一种方案是轮询pool.getActiveCount()
            CountDownLatch latch = new CountDownLatch(6);
            Callable<String> task = () -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println("hi");
                    return "hi";
                } finally {
                    latch.countDown();
                }
            };
            p.submit(task);
            // 提交5个任务,会直接进入到队列中
            for (int i = 0; i < 5; i++)
                p.submit(task);
            System.out.println("shutdown");
            p.shutdown();
            System.out.println(p.isShutdown());// true
            System.out.println(p.getActiveCount());
            // p.submit(task);// RejectedExecutionException
            latch.await();
            System.out.println(p.getActiveCount());
        }
    // 输出:
    // shutdown
    // true
    // 1
    // hi
    // hi
    // hi
    // hi
    // hi
    // hi
    // 0
    

    输出结果证明提交的任务以及队列中的任务能够正常执行完成,如果主进程没有退出的话。

    • 使用awaitTermination阻塞等待
        @Test
        public void testShutdownAwait() throws InterruptedException {
            ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
            Callable<String> task = () -> {
                TimeUnit.SECONDS.sleep(1);
                System.out.println("hi");
                return "hi";
            };
            p.submit(task);
            // 提交5个任务,会直接进入到队列中
            for (int i = 0; i < 5; i++)
                p.submit(task);
            System.out.println("shutdown");
            p.shutdown();
            // 阻塞直达池中任务全部完成或者超时
            p.awaitTermination(1000, TimeUnit.SECONDS);
            System.out.println(p.isShutdown());// true
            System.out.println(p.getActiveCount());
    
        }
    

    shutdownNow

        @Test
        public void testShutdownNow() throws InterruptedException {
            ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
            Callable<String> task = () -> {
                System.out.println("start");
                TimeUnit.SECONDS.sleep(3);
                System.out.println("hi");
                return "hi";
            };
            p.submit(task);
            // 提交5个任务,会直接进入到队列中
            for (int i = 0; i < 5; i++)
                p.submit(task);
            System.out.println("shutdown");
            p.shutdownNow();
            System.out.println(p.isShutdown());// true
            System.out.println(p.getActiveCount());
            while (p.getActiveCount() > 0) {
            }
            System.out.println("end");
        }
    // 输出:
    // shutdown
    // start
    // true
    // 0
    // end
    

    第一个任务启动后没有等到完成就被取消(shutdownNow执行后getActiveCount就变成了0),队列中任务不会启动。
    shutdownNow使用Thread#interrupt来中断任务,如果任务不响应中断,就不会被中断。awaitTermination()调用后会阻塞等待没有被结束成功的任务执行完成。如果任务在shutdownNow时就全部被中断,任务就都已经结束了,awaitTermination()也不会阻塞。
    这里引出停止线程的一个方法:让线程响应中断。

        @Test
        public void testShutdownNowAwait() throws InterruptedException {
            ThreadPoolExecutor p = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
            Callable<String> task = () -> {
                System.out.println("start");
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    System.out.println(e);
                }
                System.out.println("hi");
                return "hi";
            };
            p.submit(task);
            // 提交5个任务,会直接进入到队列中
            for (int i = 0; i < 5; i++)
                p.submit(task);
            System.out.println("shutdown");
            p.shutdownNow(); 
            System.out.println(p.isShutdown());// true
            System.out.println(p.getActiveCount());
            // 阻塞等待active的任务执行完成
            p.awaitTermination(1000, TimeUnit.SECONDS);
            System.out.println("end");
        }
    // 输出
    // start
    // true
    // 1
    // java.lang.InterruptedException: sleep interrupted
    // hi
    // end
    
    

    小结

    shutdown和shutdownNow的区别是,前者只是拒绝新任务,如果主线程不消亡,池中的活跃任务和队列中的任务都会执行,可以使用awaitTermination()指定pool阻塞等待它们完成;shutdownNow也会拒绝新任务,它会通过中断试着终止活跃任务,但是不一定能终止成功,队列中的任务被取消,如果活跃任务没有被杀死,只要宿主进程没死也是会继续执行的,但是队列中的任务不会启动,它也可以使用awaitTermination()指定pool阻塞等待没被杀死的活跃任务完成。

    参考资料

    [1] 【死磕Java并发】-----J.U.C之线程池:线程池的基础架构
    [2] 【死磕Java并发】-----J.U.C之线程池:ThreadPoolExecutor

    相关文章

      网友评论

          本文标题:22.线程池的使用

          本文链接:https://www.haomeiwen.com/subject/jvhnsftx.html