美文网首页Java-多线程
多线程(三)、线程池 ThreadPoolExecutor 知识

多线程(三)、线程池 ThreadPoolExecutor 知识

作者: EvanZch | 来源:发表于2019-12-18 21:34 被阅读0次

    本篇是多线程系列的第三篇,如果对前两篇感兴趣的也可以去看看。

    多线程(一)、基础概念及notify()和wait()的使用

    多线程(二)、内置锁 synchronized

    Android进阶系列文章是我在学习的同时对知识点的整理,一是为了加深印象,二是方便后续查阅。

    如果文中有错误的地方,欢迎批评指出。

    前言

    如果在Android里面,直接 new Thread ,阿里巴巴 Android 开发规范会提示你不要显示创建线程,请使用线程池,为啥要用线程池?你对线程池了解多少?

    image

    一、线程池ThreadPoolExecutor 基础概念

    1、什么是线程池

    多线程(一)、基础概念及notify()和wait()的使用 讲了线程的创建,每当有任务来的时候,通过创建一个线程来执行任务,当任务执行结束,对线程进行销毁,并发操作的时候,大量任务需要执行,每个任务都要需要重复线程的创建、执行、销毁,造成了CPU的资源销毁,并降低了响应速度。

            new Thread(new Runnable() {
                @Override
                public void run() {
                    // 任务执行
                }
            }).start();
    

    **线程池 **:字面上理解就是将线程通过一个池子进行管理,当任务来的时候,从池子中取出一个已经创建好的线程进行任务的执行,执行结束后再将线程放回池中,待线程池销毁的时候再统一对线程进行销毁。

    2、使用线程池的好处

    通过上面的对比,使用线程池基本有以前好处:

    1、降低资源消耗。通过重复使用线程池中的线程,降低了线程创建和销毁带来的资源消耗。

    2、提高响应速度。重复使用池中线程,减少了重复创建和销毁线程带来的时间开销。

    3、提高线程的可管理性。线程是稀缺资源,我们不可能无节制创建,这样会大量消耗系统资源,使用线程池可以统一分配,管理和监控线程。

    3、线程池参数说明

    要使用线程池,就必须要用到 ThreadPoolExecutor 类,

        /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} or {@code handler} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    这里贴了 ThreadPoolExecutor 最复杂的一个构造方法,我们把参数单独拎出来讲

    1、int corePoolSize

    核心线程数量:每当接收到一个任务的时候,线程池会创建一个新的线程来执行任务,直到当前线程池中的线程数目等于 corePoolSize ,当任务大于corePoolSize 时候,会放入阻塞队列

    2、int maximumPoolSize

    非核心线程数量:线程池中允许的最大线程数,如果当前阻塞队列满了,当接收到新的任务就会再次创建线程进行执行,直到线程池中的数目等于maximumPoolSize

    3、long keepAliveTime

    线程空闲时存活时间:当线程数大于没有任务执行的时候,继续存活的时间,默认该参数只有线程数大于corePoolSize时才有用

    4、TimeUnit unit

    keepAliveTime的时间单位

    5、BlockingQueue<Runnable> workQueue

    阻塞队列:当线程池中线程数目超过 corePoolSize 的时候,线程会进入阻塞队列进行阻塞等待,当阻塞队列满了的时候,会根据 maximumPoolSize 数量新开线程执行。

    队列:

    是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。

    进行插入操作的端称为队尾,进行删除操作的端称为队头。

    队列中没有元素时,称为空队列。队列的数据元素又称为队列元素。

    在队列中插入一个队列元素称为入队,从队列中删除一个队列元素称为出队。

    因为队列只允许在一端插入,在另一端删除,所以只有最早进入队列的元素才能最先从队列中删除,故队列又称为先进先出(FIFO—first in first out)线性表。

    阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的缓存容器,而消费者也只从容器里拿元素。

    先看看 BlockingQueue ,它是一个接口,继承 Queue

    public interface BlockingQueue<E> extends Queue<E>
    

    再看看它里面的方法

    image

    针对这几个方法,简单的进行介绍:

    抛出异常 返回特殊值 阻塞 超时
    插入 add(e) offer(e) put(e) offer(e, time, unit)
    移除 remove() poll() take() poll(time, unit)
    检查 element() peek()

    抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。

    返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null

    阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。

    超时:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。

    我们再看JDK为我们提供的一些阻塞队列,如下图:

    image

    简单说明:

    阻塞队列 用法
    ArrayBlockingQueue 一个由数组结构组成的有界阻塞队列。
    LinkedBlockingQueue 一个由链表结构组成的有界阻塞队列。
    PriorityBlockingQueue 一个支持优先级排序的无界阻塞队列。
    DelayQueue 一个使用优先级队列实现的无界阻塞队列。
    SynchronousQueue 一个不存储元素的阻塞队列。
    LinkedTransferQueue 一个由链表结构组成的无界阻塞队列。
    LinkedBlockingDeque 一个由链表结构组成的双向阻塞队列。

    6、ThreadFactory threadFactory

    创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名Executors静态工厂里默认的threadFactory,线程的命名规则是“pool-数字-thread-数字”。

    7、RejectedExecutionHandler handler (饱和策略)

    线程池的饱和策略,如果任务特别多,队列也满了,且没有空闲线程进行处理,线程池将必须对新的任务采取饱和策略,即提供一种方式来处理这部分任务。

    jdk 给我们提供了四种策略,如图:

    image
    策略 作用
    AbortPolicy 直接抛出异常,该策略也为默认策略
    CallerRunsPolicy 在调用者线程中执行该任务
    DiscardOldestPolicy 丢弃阻塞队列最前面的任务,并执行当前任务
    DiscardPolicy 直接丢弃任务

    我们可以看到 RejectedExecutionHandler 实际是一个接口,且只有一个 rejectedExecution 所以我们可以根据自己的需求定义自己的饱和策略。

    /**
     * A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
     *
     * @since 1.5
     * @author Doug Lea
     */
    public interface RejectedExecutionHandler {
    
        /**
         * Method that may be invoked by a {@link ThreadPoolExecutor} when
         * {@link ThreadPoolExecutor#execute execute} cannot accept a
         * task.  This may occur when no more threads or queue slots are
         * available because their bounds would be exceeded, or upon
         * shutdown of the Executor.
         *
         * <p>In the absence of other alternatives, the method may throw
         * an unchecked {@link RejectedExecutionException}, which will be
         * propagated to the caller of {@code execute}.
         *
         * @param r the runnable task requested to be executed
         * @param executor the executor attempting to execute this task
         * @throws RejectedExecutionException if there is no remedy
         */
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    

    4、线程池工作机制

    熟悉了上面线程池的各个参数含义,对线程池的工作原理,我们也可以大致总结如下:

    1、线程池刚创建的时候,里面没有线程在运行,当有任务进来,并且线程池开始执行的时候,会根据实际情况处理。

    2、当前线程池线程数量少于 corePoolSize 时候,每当有新的任务来时,都会创建一个新的线程进行执行。

    3、当线程池中运行的线程数大于等于 corePoolSize ,每当有新的任务来的时候,都会加入阻塞队列中。

    4、当阻塞队列加满,无法再加入新的任务的时候,则会再根据 maximumPoolSize数 来创建新的非核心线程执行任务。

    4、当线程池中线程数目大于等于 maximumPoolSize 时候,当有新的任务来的时候,拒绝执行该任务,采取饱和策略。

    5、当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

    5、创建线程池

    5.1、ThreadPoolExecutor

    直接通过 ThreadPoolExecutor 创建:

            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    2, 10
                    , 1, TimeUnit.SECONDS
                    , new LinkedBlockingQueue<Runnable>(50)
                    , Executors.defaultThreadFactory()
                    , new ThreadPoolExecutor.AbortPolicy());
    
    5.2、Executors 静态方法

    通过工具类java.util.concurrent.Executors 创建的线程池,其实质也是调用 ThreadPoolExecutor 进行创建,只是针对不同的需求,对参数进行了设置。

    image
    1、FixedThreadPool

    可重用固定线程数

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    参数说明:

    int corePoolSize: nThreads

    int maximumPoolSize: nThreads

    long keepAliveTime:0L

    TimeUnit unit:TimeUnit.MILLISECONDS

    BlockingQueue<Runnable> workQueue:new LinkedBlockingQueue<Runnable>()

    可以看到核心线程和非核心线程一致,及不会创建非核心线程,超时时间为0,即就算线程处于空闲状态,也不会对其进行回收,阻塞队列为LinkedBlockingQueue无界阻塞队列。

    当有任务来的时候,先创建核心线程,线程数超过 corePoolSize 就进入阻塞队列,当有空闲线程的时候,再在阻塞队列中去任务执行。

    使用场景:线程池线程数固定,且不会回收,线程生命周期与线程池生命周期同步,适用任务量比较固定且耗时的长的任务。

    2、newSingleThreadExecutor

    单线程执行

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    参数说明

    int corePoolSize: 1

    int maximumPoolSize: 1

    long keepAliveTime:0L

    TimeUnit unit:TimeUnit.MILLISECONDS

    BlockingQueue<Runnable> workQueue:new LinkedBlockingQueue<Runnable>()

    基本和 FixedThreadPool 一致,最明显的区别就是线程池中只存在一个核心线程来执行任务。

    使用场景:只有一个线程,确保所以任务都在一个线程中顺序执行,不需要处理线程同步问题,适用多个任务顺序执行。

    3、newCachedThreadPool
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    参数说明

    int corePoolSize: 0

    int maximumPoolSize: Integer.MAX_VALUE

    long keepAliveTime:60L

    TimeUnit unit:TimeUnit.SECONDS

    BlockingQueue<Runnable> workQueue:new SynchronousQueue<Runnable>()

    无核心线程,非核心线程数量 Integer.MAX_VALUE,可以无限创建,空闲线程60秒会被回收,任务队列采用的是SynchronousQueue,这个队列是无法插入任务的,一有任务立即执行。

    使用场景:由于非核心线程无限制,且使用无法插入的SynchronousQueue队列,所以适合任务量大但耗时少的任务。

    4、newScheduledThreadPool

    定时延时执行

        public ScheduledThreadPoolExecutor(int corePoolSize,
                                           ThreadFactory threadFactory) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue(), threadFactory);
        }
    

    参数说明

    int corePoolSize: corePoolSize (设定)

    int maximumPoolSize: Integer.MAX_VALUE

    long keepAliveTime:0

    TimeUnit unit:NANOSECONDS

    BlockingQueue<Runnable> workQueue:new DelayedWorkQueue()

    核心线程数固定(设置),非核心线程数创建无限制,但是空闲时间为0,即非核心线程一旦空闲就回收, DelayedWorkQueue() 无界队列会将任务进行排序,延时执行队列任务。

    使用场景:newScheduledThreadPool是唯一一个具有定时定期执行任务功能的线程池。它适合执行一些周期性任务或者延时任务,可以通过schedule(Runnable command, long delay, TimeUnit unit) 方法实现。

    6、线程池的执行

    线程池提供了 executesubmit 两个方法来执行

    execute:

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            // 获得当前线程的生命周期对应的二进制状态码
            int c = ctl.get();
            //判断当前线程数量是否小于核心线程数量,如果小于就直接创建核心线程执行任务,创建成功直接跳出,失败则接着往下走.
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //判断线程池是否为RUNNING状态,并且将任务添加至队列中.
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //审核下线程池的状态,如果不是RUNNING状态,直接移除队列中
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                //如果当前线程数量为0,则单独创建线程,而不指定任务.
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //如果不满足上述条件,尝试创建一个非核心线程来执行任务,如果创建失败,调用reject()方法.
            else if (!addWorker(command, false))
                reject(command);
        }
    

    submit():

    image

    源码:

      public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            // 将runnable封装成 Future 对象
            RunnableFuture<T> ftask = newTaskFor(task, result);
            // 执行 execute 方法
            execute(ftask);
            // 返回包装好的Runable
            return ftask;
        }
        
        
      
      // newTaskFor : 通过 FutureTask 
      protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    

    其中 newTaskFor 返回的 RunnableFuture<T> 方法继承了 Runnable 接口,所以可以直接通过 execute 方法执行。

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }
    

    可以看到,submit 中实际也是调用了 execute() 方法,只不过在调用方法之前,先将Runnable对象封装成FutureTask对象,然后再返回 Future<T>,我们可以通过Futureget 方法,拿到任务执行结束后的返回值。

    image

    我们在 多线程(一)、基础概念及notify()和wait()的使用 中也讲了 FutureTask 它提供了 cancelisCancelledisDoneget几个方法,来对任务进行相应的操作。

    总结:

    通常情况下,我们不需要对线程或者获取执行结果,可以直接使用 execute 方法。

    如果我们要获取任务执行的结果,或者想对任务进行取消等操作,就使用 submit 方法。

    7、线程池的关闭

    关于线程的中断在 多线程(一)、基础概念及notify()和wait()的使用 里面有介绍。

    • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
    • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

    8、线程池的合理配置

    线程池的参数比较灵活,我们可以自由设置,但是具体每个参数该设置成多少比较合理呢?这个要根据我们处理的任务来决定,对任务一般从以下几个点分析:

    8.1、任务的性质

    CPU 密集型、IO 密集型、混合型

    CPU密集型应配置尽可能小的线程,如Ncpu+1个线程的线程池

    IO密集型,IO操作有关,如磁盘,内存,网络等等,对CPU的要求不高则应配置尽可能多的线程,如2*Ncpu个线程的线程池

    混合型需要拆成CPU 密集型和IO 密集型分别分析,根据任务数量和执行时间,来决定线程的数量

    8.2、任务的优先级

    高中低优先级

    8.3、任务执行时间

    长中短

    8.4、任务的依耐性

    是否需要依赖其他系统资源,如数据库连接

    Runtime.getRuntime().availableProcessors() : 当前设备的CPU个数

    9、线程池实战

    又巴拉巴拉说了一大推,我觉得唯有代码运行,通过结果分析最能打动人心,下面就通过代码运行结果来分析。

    先看这么一段代码:

        public static void main(String[] args) {
            // 1、通过 ThreadPoolExecutor 创建基本线程池
            final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    3,
                    5,
                    1,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(50));
            for (int i = 0; i < 30; i++) {
                final int num = i;
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 睡两秒后执行
                            Thread.sleep(2000);
                            System.out.println("run : " + num + "  当前线程:" + Thread.currentThread().getName());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                };
                // 执行
                threadPoolExecutor.execute(runnable);
            }
        }
    

    我们通过 ThreadPoolExecutor 创建了一个线程池,然后执行30个任务。

    参数说明

    int corePoolSize: 3

    int maximumPoolSize: 5

    long keepAliveTime:1

    TimeUnit unit:TimeUnit.SECONDS

    BlockingQueue<Runnable> workQueue:new LinkedBlockingQueue<Runnable>(50)

    线程池核心线程数为3,非核心线程数为5,非核心线程空闲1秒被回收,阻塞队列使用了 new LinkedBlockingQueue 并指定了队列容量为50。

    结果:

    image

    我们看到每两秒后,有三个任务被执行。这是因为核心我们设置的核心线程数为3,当多余的任务到来后,会先放入到阻塞队列中,又由于我们设置的阻塞队列容量为50,所以,阻塞队列永远不会满,就不会启动非核心线程。

    我们改一下我们的线程池如下:

    final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    2,
                    5,
                    1,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(25));
    

    参数就不分析了,我们直接看结果:

    image

    我们看到这次每隔两秒有五个任务在执行,为什么?这里要根据我们前面线程池的工作原理来分析,我们有三十个任务需要执行,核心线程数为2,其余的任务放入阻塞队列中,阻塞队列容量为25,剩余任务不超过非核心线程数,当阻塞队列满的时候,就启动了非核心线程来执行。

    我们再简单改一下我们的线程池,代码如下:

            final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    2,
                    5,
                    1,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(24));
    

    相比上面的,我们就将阻塞队列容量改成了24,如果上面你对线程池的工作原理清楚了,你应该能知道我这里改成 24 的良苦用心了,我们先看结果。

    image

    最直接的就是抛异常了,但是线程池仍然再执行任务,首先为啥抛异常?首先,我们需要执行三十个任务,但是我们的阻塞队列容量为 24,队列满后启动了非核心线程,但是非核心线程数量为5,当剩下的这个任务来的时候,线程池将采取饱和策略,我们没有设置,默认为 AbortPolicy,即直接抛异常,如果我们手动设置饱和策略如下:

            final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                    2,
                    5,
                    1,
                    TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(24),new ThreadPoolExecutor.DiscardPolicy());
    

    我们这里采用的饱和策略为 DiscardPolicy ,即丢弃多余任务。最终可以看到结果没有抛异常,最终只执行了29个任务,最后一个任务被抛弃了。

    最后再看一下通过 Executors 静态方法创建的线程池运行上面的任务结果如何,Executors 创建的线程池本质也是通过创建 ThreadPoolExecutor 来执行,可结合上面分析自行总结。

    1、FixedThreadPool

    ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(3);
    

    结果:

    image

    2、newSingleThreadExecutor

    ExecutorService threadPoolExecutor = Executors.newSingleThreadExecutor();
    

    结果:

    image

    3、newCachedThreadPool

    ExecutorService threadPoolExecutor = Executors.newCachedThreadPool();
    

    结果 :

    image

    4、newScheduledThreadPool

    ScheduledExecutorService threadPoolExecutor = Executors.newScheduledThreadPool(3);
    
    image

    总结

    这是多线程的第三篇,这篇文章篇幅有点多, 有点小乱,后续会再整理一下,基本都是跟着自己的思路,在写的同时,自己也会再操作一遍,源码分析过程中,也会尽可能的详细,一步步的深入,后续查阅的时候也方便,文章中有些不是很详细的地方,后面可能会再次更新,或者单独用一篇文章来讲。

    相关文章

      网友评论

        本文标题:多线程(三)、线程池 ThreadPoolExecutor 知识

        本文链接:https://www.haomeiwen.com/subject/jqhpnctx.html