美文网首页Java多线程专题Java
[Java多线程编程之七] 让程序飞 —— 线程池的使用及实现原

[Java多线程编程之七] 让程序飞 —— 线程池的使用及实现原

作者: 小胡_鸭 | 来源:发表于2019-10-29 22:07 被阅读0次

    一、为什么要使用线程池?

      为了充分利用多核计算机的性能,程序需要被设计成多线程程序,保证不会出现某个CPU很忙,某个CPU很闲的情况,把线程的创建、通信和管理交给线程池管理,可以让开发人员专注于程序的业务逻辑;另一方面,线程的创建需要消耗操作系统的资源,如果频繁地创建和销毁线程,代价太大,而线程池对此做了优化,线程池中预先创建好了一组空闲的线程,当程序运行需要时,从线程池中取用空闲线程,当程序运行结束时,工作线程又重置为空闲线程放到线程池中,提高系统资源的利用效率。

    线程是否越多越好?

    (1)线程在java中是一个对象,更是操作系统的资源,线程的创建、销毁需要时间,如果创建时间 + 销毁时间 > 任务执行时间,就不划算。
    (2)java对象占用堆内存,操作系统线程占用系统内存,根据jvm规范,一个线程默认最大栈大小为1M,这个栈空间是需要从系统内存中分配的,如果线程过多,也会消耗过多内存,当超过负荷时程序可能会发生异常或错误。
    (3)线程数过多,会导致操作系统需要频繁切换上下文,这需要消耗性能,每个线程的执行效率反而会下降。

    二、线程池原理

    1、线程池管理器

      用于创建并管理线程池,包括创建线程池、销毁线程池,添加新的执行任务等,一般会限定线程池大小,避免像没有线程池管理时无限制地去创建线程导致线程数过多导致性能下降的情况。



    2、工作线程

      线程池中的线程,当应用程序需要一个新的线程时,线程池为其创建一个工作线程;当该线程需要执行的程序结束时,线程不会被销毁,而是回到线程池,继续等待执行任务,这就意味着工作线程可以循环地执行不同任务。



    3、任务接口

      每个提交到线程池的任务都必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口、任务执行完后的收尾工作、任务的执行状态等,可以把任务接口看成是工作线程执行目标的一个格式。

    4、任务队列

      线程池所能创建的工作线程数是有限的,当提交的任务数很多导致线程池暂时没有线程可去执行时,线程池会将其放入到任务队列中,可以把任务队列看成是一种缓冲机制,等线程池里又有空闲的工作线程里,线程池就可以从任务队列中取出任务给工作线程去执行。



    三、线程池API —— 接口定义和实现类

    类型 名称 描述
    接口 Executor 最上层的接口,定义了执行任务的方法execute
    接口 ExecutorService 继承了Executor接口,拓展了CallableFuture、关闭方法
    抽闲实现类 AbstractExecutorService 继承了ExecutorService接口,实现了submitinvokeAnyinvokeAll方法,将提交的任务包装成RunnableFuture对象,交给子类去执行
    接口 ScheduledExecutorService 继承了ExecutorService,增加了定时任务相关的方法
    实现类 ThreadPoolExecutor 继承了AbstractExecutorService,基础、标准的线程池实现
    实现类 ScheduledThreadPoolExecutor 继承了ThreadPoolExecutor,实现ScheduledExecutorService中相关定时任务的方法
    工具类 Executors 创建线程池的工具类
    1、Executor

      最上层的执行器接口,定义了执行方法execute,参数类型为Runnable表示可以接收提交到线程池Runnable任务,该接口实现了任务的提交与任务的执行分离。



    2、ExecutorService

      继承了Executor接口,ExecutorService扩展了功能,定义了关闭线程池提交任务执行任一个或全部任务的方法,这些方法能产生追踪一个或多个异步任务进度的Future对象或列表,接口中定义的方法列表如下:

    package java.util.concurrent;
    import java.util.List;
    import java.util.Collection;
    
    public interface ExecutorService extends Executor {
        // 优雅关闭线程池,关闭前提交的任务将被执行,但不再接受新的任务
        void shutdown();
        // 马上关闭线程池,所有正在执行中的任务会被终止,所有正在等待执行的任务不会被执行,并随方法返回
        List<Runnable> shutdownNow();
        // 判断线程池是否已关闭
        boolean isShutdown();
        // 如果关闭线程池后,线程池中所有的任务都已执行完成,则返回true;本方法用于判断线程池是否任务都执行完毕,必须在调用shutdown()或shutdownNow()后才能使用
        boolean isTerminated();
        // 检测线程池是否已关闭直到所有的任务都被执行结束,如果还有任务正在执行,则会阻塞等待,直到发生超时,或者当前线程被中断
        boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
        // 提交一个用于执行的Callable返回任务,并返回一个Future对象,可以通过其get方法获取Callable执行结果
        <T> Future<T> submit(Callable<T> task);
        // 提交一个用于执行的Runnable任务,并返回一个Future对象,执行结果会放入传入的result中
        <T> Future<T> submit(Runnable task, T result);
        // 提交一个Runnable任务,并返回一个Future对象,执行结果为null,这里返回Future的意义在于可通过get方法的阻塞控制某些代码在任务执行结束之后才执行
        Future<?> submit(Runnable task);
        // 执行给定的任务集合,执行完毕后,返回结果集
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
        // 执行给定的任务集合,执行完毕或者超时后,其他任务终止
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
        // 执行给定的任务,任意一个执行成功则返回结果,其他任务终止
        <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
        // 执行给定的任务,任意一个执行成功或者超时后,则返回结果,其他任务终止
        <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    }
    
    3、AbstractExecutorService

      AbstractExecutorServiceExecutorService的实现类,实现了ExecutorService接口中定义的submitinvokeAnyinvokeAll方法,执行一个任务前会把提交的RunnableCallable任务通过newTaskFor方法转化为RunnableFuture对象,再提交给executor方法执行,executor方法的具体实现由AbstractExecutorService的子类各自去实现。

    public abstract class AbstractExecutorService implements ExecutorService {
    
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                                boolean timed, long nanos)
            throws InterruptedException, ExecutionException, TimeoutException {
        };
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException {
            // implements code
        };
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
            // implements code
        };
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            // implements code
        };
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            // implements code
        };
    
    4、ScheduledExecutorService

      一个能够根据传入的时延延迟执行和周期参数定期执行任务的ExecutorService接口,方法定义如下:

    public interface ScheduledExecutorService extends ExecutorService {
        // 创建并执行一个一次性任务,过了延迟时间就会被执行
        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
        // 创建一个周期性任务,过了给定的初始延迟时间,会被第一次执行,执行过程中发生了异常,那么任务就停止
        // 一个任务执行时长超过了周期时间,下一次任务会等到该次任务执行结束之后立即执行
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit);    
        // 创建一个周期性任务,过了给定的初始延迟时间,会被第一次执行,执行过程中发生了异常,那么任务就停止
        // 一个任务执行时长超过了周期时间,下一次任务会等到该次任务执行结束的时间基础上,计算执行延迟
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit);
    }
    

    scheduleAtFixedRatescheduleWithFixedDelay的不同如下】
    假设执行周期(delay)为3秒,代表每隔3秒执行一次任务,任务是一个一个执行的,不会出现有两个不同周期的任务同时执行的情况,假设在第10秒开始执行任务,任务执行了4秒,超过了周期时间,那么下个周期的任务会在第几秒开始执行?
    scheduleAtFixedRate:在第14秒就马上开始执行。
    scheduleWithFixedDelay:在第14秒之后,等3秒到第17秒时才开始执行。

    5、ThreadPoolExecutor

      ThreadPoolExecutor是一个基础、标准的线程池,继承了AbstractExecutorService,也是最常用、直接使用的线程池,ThreadPoolExecutor构造器中主要的参数有核心线程大小最大线程数线程存活时间存活时间单位线程阻塞队列线程工厂拒绝策略

    • 核心线程数

      如果提交的任务是通过核心线程来执行的,则不会受线程存活时间的限制。

    • 最大线程数

      创建线程有代价,系统资源宝贵,不可能无限地创建线程,所以要设置最大线程数;最大线程数是指包含核心线程数在内的所能创建的最大线程数。

    • 线程存活时间

      针对非核心线程设置的线程存活时间,防止大量非核心线程因阻塞或其他原因导致执行时间过长,进而导致等待执行的工作队列一直等不到线程来执行。

    • 存活时间单位

      设置线程存活时间时的时间单位,有纳秒、微秒、毫秒、秒、分、时、天。

    • 线程工厂

      允许通过传入的自定义的实现了ThreadFactory接口的类来创建线程。

    • 拒绝策略

      当提交任务过多,线程池和工作队列的容量都无法执行和缓存所有工作任务时,就需要采取一定的策略拒绝提交的任务。

    定义的具体构造器如下:

    public class ThreadPoolExecutor extends AbstractExecutorService {
        // 使用默认线程工厂和拒绝策略的构造器,其余参数指定
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }    
        // 使用默认拒绝策略的构造器,其余参数指定
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }
        // 使用默认线程工厂的构造器,其余参数指定
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
        // 指定所有参数的构造器
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    }
    

      通常调用submit方法去提交任务,接着调用execute方法执行任务,execute的工作流程如下:
    (1)判断线程池中的核心线程数是否已满?若未满,则直接创建一个核心工作线程来执行任务;否则进入(2)。
    (2)判断工作队列是否已满?若未满,则将新提交的任务存储在工作队列中;否则进入(3)。
    (3)判断工作线程是否已达最大数量限制?没达到,则创建一个新的工作线程来执行任务;否则进入(4)。
    (4)执行拒绝策略来处理任务。

    【流程图如下】


    【源码】


    【举例】
      核心线程数为5,最大线程数为10(5秒超时),工作队列为3,现在要提交15个任务(假设几乎同一时间提交,忽略提交时延),每个任务的执行时间为6秒,那么线程池执行的情况如下:
    (1)为提交的前5个任务创建核心工作线程,没有超时时间,任务可以执行完毕。
    (2)第6到第8个任务会被工作队列缓存。
    (3)为第9到第13个任务创建非核心工作线程,由于超时时间为5秒,所以这5个工作线程会在第5秒时中断。
    (4)拒绝第14到第15个提交的任务。

    【代码示例】

    public class Demo9 {
        public void testCommon(ThreadPoolExecutor threadPoolExecutor) throws Exception {
            // 测试: 提交15个执行时间需要3秒的任务,看超过大小的2个,对应的处理情况
            for (int i = 0; i < 15; i++) {
                int n = i;
                threadPoolExecutor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.println("开始执行:" + n);
                            Thread.sleep(3000L);
                            System.err.println("执行结束:" + n);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
                System.out.println("任务提交成功 :" + i);
            }
            // 查看线程数量,查看队列等待数量
            Thread.sleep(500L);
            System.out.println("当前线程池线程数量为:" + threadPoolExecutor.getPoolSize());
            System.out.println("当前线程池等待的数量为:" + threadPoolExecutor.getQueue().size());
            // 等待15秒,查看线程数量和队列数量(理论上,会被超出核心线程数量的线程自动销毁)
            Thread.sleep(15000L);
            System.out.println("当前线程池线程数量为:" + threadPoolExecutor.getPoolSize());
            System.out.println("当前线程池等待的数量为:" + threadPoolExecutor.getQueue().size());      
        }
    
        /**
         * 线程池信息: 核心线程数量5,最大数量10,队列大小3,超出核心线程数量的线程存活时间:5秒,指定拒绝策略
         * @throws Exception
         */
        private void threadPoolExecutorTest2() throws Exception {
            // 创建一个 核心线程数量为5,最大数量为10,等待队列最大是3 的线程池,也就是最大容纳13个任务。
            // 默认的策略是抛出RejectedExecutionException异常,java.util.concurrent.ThreadPoolExecutor.AbortPolicy     
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(3), 
                    new RejectedExecutionHandler() {                    
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            System.err.println("有任务被拒绝执行了");
                        }
                    });
            testCommon(threadPoolExecutor);
            // 预计结果:
            // 1、5个任务直接分配线程开始执行
            // 2、3个任务进入等待队列
            // 3、队列不够用,临时加开5个线程来执行任务(5秒没活干就销毁)
            // 4、队列和线程池都满了,剩下2个任务,没资源了,被拒绝执行
            // 5、任务执行,5秒后,如果五任务可执行,销毁临时创建的5个线程
        }
        
        public static void main(String[] args) throws Exception {
            Demo9 demo = new Demo9();
            demo.threadPoolExecutorTest2();
        }
    }
    

    【执行结果】


    (1) 如果没有指定任务等待队列的长度,则默认为无界队列,不管多少任务提交都会被缓存,这时其实拒绝策略已经不起作用了,可以直接不指定。

    【策略分析】
      在某些场景下,不急着提交的任务马上被执行,只要提交的任务被放入等待执行的队列中,由线程池慢慢处理即可,这种情况适合将阻塞队列设置为无界队列(或者容量比较大的队列),这样当线程池已满时,提交的任务会被缓存到队列中而不会被拒绝执行,因此设置为无界队列时,不管是否指定拒绝策略,都不会拒绝。

    【代码示例】

    public class Demo9 {
    
        /**
         * 测试: 提交15个执行时间需要3秒的任务,看线程池的状况
         * 
         * @param threadPoolExecutor 传入不同的线程池,看不同的结果
         * @throws Exception
         */
        public void testCommon(ThreadPoolExecutor threadPoolExecutor) throws Exception {
                // code
        }
    
        /***
         * 1、线程池信息: 核心线程数量5,最大数量10,无界队列,超出核心线程数量的线程存活时间: 5秒,缓存提交任务的工作队列(这里没有指定队列大小,就是无界队列,可以无限缓存任务)
         */
        private void threadPoolExecutorTest1() throws Exception {
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
            testCommon(threadPoolExecutor);
        }
        public static void main(String[] args) throws Exception {
            Demo9 demo = new Demo9();
            demo.threadPoolExecutorTest1();
        }
    }
    

    【执行结果】


    (2) 如果任务提交的频率不可控不可预估,也不想堆积缓存而是尽快任务,则
    a. 核心线程数设为0
    b. 线程数设一个比较大的值
    c. 用同步队列作为阻塞队列

    【策略解析】
      一般我们不会把核心线程数设置得过大,因为核心线程执行完任务之后是不会被销毁而是一直存活的,如果核心线程数设置过大,那么当某个时间点提交了大量的任务时,就会相应创建大量的核心线程,当这些任务被执行完后,核心线程还存活,而后面可能任务提交的频率没这么高了,用不了这么多核心线程,这时太多核心线程会占用太多系统资源,系统性能会受到影响。
      这时适合把线程数量设大一点,而核心线程数不能设置得太大,某个时间点任务提交频率又很高,这时核心线程对于整个执行任务的贡献比例很小,干脆直接设置为0即可,这就意味着实际上执行任务的线程都不会核心线程,在任务执行完之后线程会被销毁,系统资源会被回收,虽然创建销毁的代价也很大,但是跟维护大量存活的核心线程相比,还是好得多;而且使用非核心线程使得线程池具有了更好的弹性,任务多我就多创建线性,任务少我就少创建线程,可以满足任务提交频率不可控不定的场景。
      我们不想堆积任务,就意味着不应该缓存任务,而是直接为任务创建工作线程,因此适合用同步队列作为阻塞队列,因为同步队列不会真正缓存任务,而是维护一组提交任务的线程,在任务入队offer时会失败,进而触发线程池为提交任务新增工作线程的效果。

    【代码示例】

        /**
         * 4、 线程池信息:
         * 核心线程数量0,最大数量Integer.MAX_VALUE,SynchronousQueue队列,超出核心线程数量的线程存活时间:60秒
         * 
         * @throws Exception
         */
        private void threadPoolExecutorTest4() throws Exception {
            
            // SynchronousQueue,实际上它不是一个真正的队列,因为它不会为队列中元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等待着把元素加入或移出队列。
            // 在使用SynchronousQueue作为工作队列的前提下,客户端代码向线程池提交任务时,
            // 而线程池中又没有空闲的线程能够从SynchronousQueue队列实例中取一个任务,
            // 那么相应的offer方法调用就会失败(即任务没有被存入工作队列)。
            // 此时,ThreadPoolExecutor会新建一个新的工作者线程用于对这个入队列失败的任务进行处理(假设此时线程池的大小还未达到其最大线程池大小maximumPoolSize)。
    
            // 和Executors.newCachedThreadPool()一样的      
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, 
                    new SynchronousQueue<Runnable>());
            testCommon(threadPoolExecutor);
            // 预计结果:
            // 1、 线程池线程数量为:15,超出数量的任务,其他的进入队列中等待被执行
            // 2、 所有任务执行结束,60秒后,如果无任务可执行,所有线程全部被销毁,池的大小恢复为0
            Thread.sleep(60000L);
            System.out.println("60秒后,再看线程池中的数量:" + threadPoolExecutor.getPoolSize());       
        }
    

    【执行结果】


    (3) 如果提交的任务想定时执行,则适用ScheduledThreadPoolExecutor

    【策略分析】
      ScheduledThreadPoolExecutor实现了ScheduledExecutorService接口,使用ScheduledThreadPoolExecutor能满足定时任务的需求;需要注意的地方是,如果定时任务中同一时间要执行的任务很多,则应该把线程数设大一点,因为为了保证设定的时间一到时线程池中必须有可以执行任务的线程;ScheduledThreadPoolExecutor中的线程全部都是核心线程,因为使用定时任务的场景意味着任务被不断周期性执行,并且不必立即执行,任务提交的频率相对稳定,所以使用核心线程利用率高,相对划算。
      ScheduledThreadPoolExecutor没有提供指定阻塞队列的构造参数,是因为其底层实现的原理依赖于DelayWorkQueue延时队列,任务被提交到延时队列之后,必须在设定的延时时间后才能取出任务给线程执行。

    【代码示例:只延迟不定期执行的任务】

        /**
         * 5、 定时执行线程池信息:3秒后执行,一次性任务,到点就执行 <br/>
         * 核心线程数量5,最大数量Integer.MAX_VALUE,DelayedWorkQueue延时队列,超出核心线程数量的线程存活时间:0秒
         * 
         * @throws Exception
         */ 
        private void threadPoolExecutorTest5() throws Exception {
            // 和Executors.newScheduledThreadPool()一样的
            ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5);
            threadPoolExecutor.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println("任务被执行,现在时间:" + System.currentTimeMillis());
                }
            }, 3000, TimeUnit.MILLISECONDS);
            System.out.println(
                    "定时任务,提交成功,时间是:" + System.currentTimeMillis() + ", 当前线程池中线程数量:" + threadPoolExecutor.getPoolSize());
            // 预计结果:任务在3秒后被执行一次
        }
    

    【执行结果】


    【代码示例:延迟定期的任务】
      定期执行任务有两种形式,一种是使用scheduleAtFixedRate,假设某次定时任务执行的时间超过了周期时间,则下一次执行会在上一次执行结束之后马上执行;另一种是使用scheduleWithFixedDelay,不管定时任务执行的时间是否超过周期时间,下一次执行都会在上一次执行结束之后等待固定的周期时间之后才开始执行。如果周期时间大于任务执行时间,则两种定期任务的的效果都一样。
      下面的代码中,设置了延迟时间为2秒,定时周期为1秒,每个任务的执行时间为3秒,这就意味着,当一个周期过去时要开始新的周期时,上个周期的任务还没被执行完毕,两种方式都不会在上个周期的任务还没执行完毕是又开始执行新的周期任务;对于scheduleAtFixedRate方式的线程池来说,会在3秒任务执行完毕后,马上执行新的周期任务,对于scheduleWithFixedDelay方式的线程池,会在3秒任务执行完毕之后,再等待1秒钟才开始执行新的周期任务。
        /**
         * 6、 定时执行线程池信息:线程固定数量5 ,<br/>
         * 核心线程数量5,最大数量Integer.MAX_VALUE,DelayedWorkQueue延时队列,超出核心线程数量的线程存活时间:0秒
         * 
         * @throws Exception
         */
        private void threadPoolExecutorTest6() throws Exception {
            ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5);
            // 周期性执行某一个任务,线程池提供了两种调度方式,这里单独演示一下。测试场景一样。
            // 测试场景:提交的任务需要3秒才能执行完毕。看两种不同调度方式的区别
            // 效果1: 提交后,2秒后开始第一次执行,之后每间隔1秒,固定执行一次(如果发现上次执行还未完毕,则等待完毕,完毕后立刻执行)。
            // 也就是说这个代码中是,3秒钟执行一次(计算方式:每次执行三秒,间隔时间1秒,执行结束后马上开始下一次执行,无需等待)
            threadPoolExecutor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务-1 被执行,现在时间:" + System.currentTimeMillis());
                }
            }, 2000, 1000, TimeUnit.MILLISECONDS);
    
            // 效果2:提交后,2秒后开始第一次执行,之后每间隔1秒,固定执行一次(如果发现上次执行还未完毕,则等待完毕,等上一次执行完毕后再开始计时,等待1秒)。
            // 也就是说这个代码钟的效果看到的是:4秒执行一次。 (计算方式:每次执行3秒,间隔时间1秒,执行完以后再等待1秒,所以是 3+1)
            threadPoolExecutor.scheduleWithFixedDelay(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(3000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务-2 被执行,现在时间:" + System.currentTimeMillis());
                }
            }, 2000, 1000, TimeUnit.MILLISECONDS);  
    }
    

    【执行结果:单独执行scheduleAtFixedRate方式表现出来的效果是间隔3秒;
          单独执行scheduleWithFixedDelay方式表现出来的效果是间隔4秒】




    (4)终止提交任务

      有时候,我们在程序执行的某个节点终止线程池,终止线程池之后不会再接收新的任务,当线程池和缓存队列已满或者线程池已终止时,都会执行拒绝策略。如果想在终止时,让终止前已经提交的任务继续执行完毕,则使用shutdown方法;如果想让所有任务都终止,并且正在执行的任务也要尝试中断,则使用shutdownNow方法。

    【代码示例:使用shutdown关闭线程池】

        /**
         * 7、 终止线程:线程池信息: 核心线程数量5,最大数量10,队列大小3,超出核心线程数量的线程存活时间:5秒, 指定拒绝策略的
         * 
         * @throws Exception
         */
        private void threadPoolExecutorTest7() throws Exception {
            // 创建一个 核心线程数量为5,最大数量为10,等待队列最大是3 的线程池,也就是最大容纳13个任务。
            // 默认的策略是抛出RejectedExecutionException异常,java.util.concurrent.ThreadPoolExecutor.AbortPolicy
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            System.err.println("有任务被拒绝执行了");
                        }
                    });
            // 测试: 提交15个执行时间需要3秒的任务,看超过大小的2个,对应的处理情况
            for (int i = 0; i < 15; i++) {
                int n = i;
                threadPoolExecutor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.println("开始执行:" + n);
                            Thread.sleep(3000L);
                            System.err.println("执行结束:" + n);
                        } catch (InterruptedException e) {
                            System.out.println("异常:" + e.getMessage());
                        }
                    }
                });
                System.out.println("任务提交成功 :" + i);
            }
            // 1秒后终止线程池
            Thread.sleep(1000L);
            threadPoolExecutor.shutdown();
            // 再次提交提示失败
            threadPoolExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("追加一个任务");
                }
            });
            // 结果分析
            // 1、 10个任务被执行,3个任务进入队列等待,2个任务被拒绝执行
            // 2、调用shutdown后,不接收新的任务,等待13任务执行结束
            // 3、 追加的任务在线程池关闭后,无法再提交,会被拒绝执行
        }
    

    【执行结果】


    【代码示例:使用shutdownNow关闭线程池】

        /**
         * 8、 立刻终止线程:线程池信息: 核心线程数量5,最大数量10,队列大小3,超出核心线程数量的线程存活时间:5秒, 指定拒绝策略的
         * 
         * @throws Exception
         */
        private void threadPoolExecutorTest8() throws Exception {
            // 创建一个 核心线程数量为5,最大数量为10,等待队列最大是3 的线程池,也就是最大容纳13个任务。
            // 默认的策略是抛出RejectedExecutionException异常,java.util.concurrent.ThreadPoolExecutor.AbortPolicy
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 5, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(3), new RejectedExecutionHandler() {
                        @Override
                        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                            System.err.println("有任务被拒绝执行了");
                        }
                    });
            // 测试: 提交15个执行时间需要3秒的任务,看超过大小的2个,对应的处理情况
            for (int i = 0; i < 15; i++) {
                int n = i;
                threadPoolExecutor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.println("开始执行:" + n);
                            Thread.sleep(3000L);
                            System.err.println("执行结束:" + n);
                        } catch (InterruptedException e) {
                            System.out.println("异常:" + e.getMessage());
                        }
                    }
                });
                System.out.println("任务提交成功 :" + i);
            }
            // 1秒后终止线程池
            Thread.sleep(1000L);
            List<Runnable> shutdownNow = threadPoolExecutor.shutdownNow();
            // 再次提交提示失败
            threadPoolExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("追加一个任务");
                }
            });
            System.out.println("未结束的任务有:" + shutdownNow.size());
    
            // 结果分析
            // 1、 10个任务被执行,3个任务进入队列等待,2个任务被拒绝执行
            // 2、调用shutdownnow后,队列中的3个线程不再执行,10个线程被终止
            // 3、 追加的任务在线程池关闭后,无法再提交,会被拒绝执行
        }
    

    【执行结果】



    6、Executors

      Executors是一个工具类,提供了创建各种线程池的方法,下面的代码是等同的:
    (1) Executors.newFixedThreadPool(int nThreads) = ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    【效果】
      创建了一个指定核心线程数,无非核心工作线程,任务缓存队列无限的线程池。
    【源码】




    (2)Executors.newCachedThreadPool() = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>())
    【效果】
      创建了一个核心线程为0,最大线程数为Integer最大值,使用同步队列的线程池。
    【源码】



    (3)Executors.newScheduledThreadPool(int corePoolSize) = new ScheduledThreadPoolExecutor(int corePoolSize)
    【效果】
      创建一个指定核心线程数的定期任务线程池。
    【源码】


    三、总结对比一览

    1、几种常用线程池对比
    线程池创建 核心线程数 最大线程数 队列 适用场景
    Executors.newFixedThreadPool(int nThreads) nThread nThread 阻塞队列,大小无界 任务提交频率稳定
    Executors.newCachedThreadPool() 0 Integer.MAX_VALUE 同步队列,不缓存任务 任务提交频率不定
    Executors.newScheduledThreadPool(int corePoolSize) corePoolSize corePoolSize 无界延时队列 定时任务
    2、两种定时任务对比
    方法 策略
    scheduleAtFixedRate 如果任务执行时间超过定时周期,则下一次任务执行在上一次任务执行结束之后马上开始
    scheduleWithFixedDelay 如果任务执行时间超过定时周期,则下一次任务执行在上一次任务执行结束之后再等待一个周期的时间再开始
    3、两种关闭线程池对比
    方法 是否允许提交新任务 是否执行等待队列中的任务 是否尝试中断正在执行的任务
    shutdown
    shutdownNow

    相关文章

      网友评论

        本文标题:[Java多线程编程之七] 让程序飞 —— 线程池的使用及实现原

        本文链接:https://www.haomeiwen.com/subject/axzxvctx.html