美文网首页androidJava知识点
线程之线程池原理及实现

线程之线程池原理及实现

作者: 安仔夏天勤奋 | 来源:发表于2019-04-18 13:43 被阅读111次

    线程池主要用来解决线程生命周期开销问题和资源不足问题。通过对多个任务重复使用线程,线程创建的开销就被分摊到了多个任务上了,而且由于在请求到达时线程已经存在,所以消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使用应用程序响应更快。另外,通过适当的调整线程中的线程数目可以防止出现资源不足的情况。

    学习线程池知识,通过如下几点去展开:

    1. 线程池的好处。
    2. 结合源码简单分析线程池工作原理及实现。
    3. 简单分析常用四种线程池原理、场景、实现。

    线程池的好处

    • 线程是稀缺资源,使用线程池可以减少创建和销毁线程的次数,每个工作线程都可以重复使用,通过复用线程,省去创建线程的过程,因此整体上提升了系统的响应速度
    • 通过复用已存在的线程和降低线程关闭的次数来尽可能降低系统性能损耗,这样就降低资源消耗
    • 根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃
    • 如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,因此,需要使用线程池来管理线程。

    线程池的工作原理

    通过一张线程池执行过程图,更加直观地了解线程池的工作原理。如图:


    线程池的创建

    创建线程池主要是通过ThreadPoolExecutor类来完成,结合源码通过参数去理解创建线程池有哪些需要配置的参数。

    /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} or {@code handler} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    • corePoolSize:表示核心线程池的大小。当提交一个任务时,如果当前核心线程池的线程个数没有达到corePoolSize,则会创建新的线程来执行所提交的任务,即使当前核心线程池有空闲的线程。如果当前核心线程池的线程个数已经达到了corePoolSize,则不再重新创建线程。如果调用了prestartCoreThread()或者 prestartAllCoreThreads(),线程池创建的时候所有的核心线程都会被创建并且启动。
    • maximumPoolSize:表示线程池能创建线程的最大个数。如果当阻塞队列已满时,并且当前线程池线程个数没有超过maximumPoolSize的话,就会创建新的线程来执行任务。
    • keepAliveTime:空闲线程存活时间。如果当前线程池的线程个数已经超过了corePoolSize,并且线程空闲时间超过了keepAliveTime的话,就会将这些空闲线程销毁,这样可以尽可能降低系统资源消耗。
    • unit:时间单位。为keepAliveTime指定时间单位。
    • workQueue:阻塞队列。用于保存任务的阻塞队列,关于阻塞队列。可以使用ArrayBlockingQueueLinkedBlockingQueueSynchronousQueuePriorityBlockingQueue
    • threadFactory:创建线程的工程类。可以通过指定线程工厂为每个创建出来的线程设置更有意义的名字,如果出现并发问题,也方便查找问题原因。
    • handler:饱和策略。当线程池的阻塞队列已满和指定的线程都已经开启,说明当前线程池已经处于饱和状态了。当队列和线程池都满了,说明线程池处于饱和状态,那么必须对新提交的任务采用一种特殊的策略来进行处理。这个策略默认配置是AbortPolicy,表示无法处理新的任务而抛出异常。JAVA提供了4中策略:
      • AbortPolicy: 直接拒绝所提交的任务,并抛出RejectedExecutionException异常。
      • CallerRunsPolicy:只用调用者所在的线程来执行任务。
      • DiscardPolicy:不处理直接丢弃掉任务。
      • DiscardOldestPolicy:丢弃掉阻塞队列中存放时间最久的任务,执行当前任务。

    线程的execute()方法

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
    
       /*
         * clt记录着runState和workerCount
         */
        int c = ctl.get();
        /*
         * workerCountOf方法取出低29位的值,表示当前活动的线程数;
         * 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
         * 并把任务添加到该线程中。
         */
        if (workerCountOf(c) < corePoolSize) {
            /*
             * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
             * 如果为true,根据corePoolSize来判断;
             * 如果为false,则根据maximumPoolSize来判断
             */
            if (addWorker(command, true))
                return;
            /*
             * 如果添加失败,则重新获取ctl值
             */
            c = ctl.get();
        }
        /*
         * 如果当前线程池是运行状态并且任务添加到队列成功
         */
        if (isRunning(c) && workQueue.offer(command)) {
            // 重新获取ctl值
            int recheck = ctl.get();
            // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,
            // 这时需要移除该command
            // 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
            if (! isRunning(recheck) && remove(command))
                reject(command);
            /*
             * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
             * 这里传入的参数表示:
             * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
             * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
             * 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
             */
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        /*
         * 如果执行到这里,有两种情况:
         * 1. 线程池已经不是RUNNING状态;
         * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
         * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
         * 如果失败则拒绝该任务
         */
        else if (!addWorker(command, false))
            reject(command);
    }
    

    简单来说,在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:

    1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
    2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
    3. 如果workerCount >= corePoolSize &&workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
    4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。
      这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。

    线程池的关闭

    关闭线程池,可以通过shutdown()shutdownNow()这两个方法。它们的原理都是遍历线程池中所有的线程,然后依次中断线程。shutdown()shutdownNow()还是有不一样的地方:

    • shutdownNow()首先将线程池的状态设置为STOP,然后尝试停止所有的正在执行和未执行任务的线程,并返回等待执行任务的列表。
    • shutdown()只是将线程池的状态设置为SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

    可以看出shutdown()会将正在执行的任务继续执行完,而shutdownNow()会直接中断正在执行的任务。调用了这两个方法的任意一个,isShutdown()都会返回true,当所有的线程都关闭成功,才表示线程池成功关闭,这时调用isTerminated()才会返回true。

    线程池管理类 ExecutorService

    现在大多数使用真正的线程池接口 ExecutorService去实现线程池管理。在不同的场景用不同的线程池,要配置一个线程池是比较复杂的,尤其是对于线程池的原理不是很清楚的情况下,很有可能配置的线程池不是较优的,因此在Executors类里面提供了一些静态工厂,生成一些常用的线程池。常用的线程池有四种,如下:

    • newSingleThreadExecutor
    • newFixedThreadPool
    • newCachedThreadPool
    • newScheduledThreadPool

    newSingleThreadExecutor

    适用的场景

    一个任务一个任务执行的场景

    简单理解

    创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。线程的存活时间是无限的;当该线程正繁忙时,对于新任务会进入阻塞队列中(无界的阻塞队列)。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

    /**
         * Creates an Executor that uses a single worker thread operating
         * off an unbounded queue. (Note however that if this single
         * thread terminates due to a failure during execution prior to
         * shutdown, a new one will take its place if needed to execute
         * subsequent tasks.)  Tasks are guaranteed to execute
         * sequentially, and no more than one task will be active at any
         * given time. Unlike the otherwise equivalent
         * {@code newFixedThreadPool(1)} the returned executor is
         * guaranteed not to be reconfigurable to use additional threads.
         *
         * @return the newly created single-threaded Executor
         */
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    newSingleThreadExecutor底层FinalizableDelegatedExecutorService包装的ThreadPoolExecutor实例。
    corePoolSize为1 (线程池中的最大线程数量);
    maximumPoolSize为1 (阻塞队列最大的阻塞线程数量);
    keepAliveTime为0L;
    unit为:TimeUnit.MILLISECONDS
    workQueue为:new LinkedBlockingQueue<Runnable>() 无解阻塞队列。
    keepAliveTime的jdk中的解释为:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。

    大概意思为:比如说线程池中最大的线程数为50,而其中只有40个线程任务在跑,相当于有10个空闲线程,这10个空闲线程不能让他一直在开着,因为线程的存在也会特别好资源的, 所有就需要设置一个这个空闲线程的存活时间,这么解释应该就很清楚了。

    简单的实现

    public class OneThread extends Thread{
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "  OneThread执行中。。。");
        }
    }
    
    public class TestThreadPool {
       public static void main(String[] args){
            singleThreadExecutor();
        }
        private static void singleThreadExecutor(){
            //创建一个线程池
            ExecutorService pool = Executors. newSingleThreadExecutor();
            //创建实现了Runnable接口对象
            Thread tt1 = new OneThread();
            Thread tt2 = new OneThread();
            Thread tt3 = new OneThread();
            Thread tt4 = new OneThread();
            Thread tt5 = new OneThread();
            //将线程放入池中并执行
            pool.execute(tt1);
            pool.execute(tt2);
            pool.execute(tt3);
            pool.execute(tt4);
            pool.execute(tt5);
            //关闭   在终止前允许执行以前提交的任务
            pool.shutdown();
            //pool.shutdownNow() 方法阻止等待任务启动并试图停止当前正在执行的任务
         }
    
    }
    

    运行结果

    pool-1-thread-1  OneThread执行中。。。
    pool-1-thread-1  OneThread执行中。。。
    pool-1-thread-1  OneThread执行中。。。
    pool-1-thread-1  OneThread执行中。。。
    pool-1-thread-1  OneThread执行中。。。
    

    newFixedThreadPool

    适用的场景

    执行长期的任务,性能好很多

    简单理解

    创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。如果池中的所有线程均在繁忙状态,对于新任务会进入阻塞队列中(无界的阻塞队列)

    newFixedThreadPool底层

    /**
         * Creates a thread pool that reuses a fixed number of threads
         * operating off a shared unbounded queue.  At any point, at most
         * {@code nThreads} threads will be active processing tasks.
         * If additional tasks are submitted when all threads are active,
         * they will wait in the queue until a thread is available.
         * If any thread terminates due to a failure during execution
         * prior to shutdown, a new one will take its place if needed to
         * execute subsequent tasks.  The threads in the pool will exist
         * until it is explicitly {@link ExecutorService#shutdown shutdown}.
         *
         * @param nThreads the number of threads in the pool
         * @return the newly created thread pool
         * @throws IllegalArgumentException if {@code nThreads <= 0}
         */
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    返回ThreadPoolExecutor实例,接收参数为所设定线程数量nThread。
    corePoolSize(线程池中的最大线程数量)为nThread;
    maximumPoolSize为(阻塞队列最大的阻塞线程数量)nThread;
    keepAliveTime为0L(不限时);
    unit为:TimeUnit.MILLISECONDS;
    WorkQueue为:new LinkedBlockingQueue<Runnable>()无解阻塞队列。

    简单的实现

    public class TwoThread extends Thread{
        private String mName;
        public TwoThread(String name){
            mName=name;
        }
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "   TwoThread执行_"+mName+"。。。");
        }
    }
    
    public class TestThreadPool {
        public static void main(String[] args){
            fixedThreadExecutor();
        }
       private static void fixedThreadExecutor(){
            //创建一个线程池
            ExecutorService pool = Executors. newFixedThreadPool(3);
            //创建实现了Runnable接口对象
            Thread t1 = new TwoThread("1");
            Thread t2 = new TwoThread("2");
            Thread t3 = new TwoThread("3");
            Thread t4 = new TwoThread("4");
            Thread t5 = new TwoThread("5");
            //将线程放入池中并执行
            pool.execute(t1);
            pool.execute(t2);
            pool.execute(t3);
            pool.execute(t4);
            pool.execute(t5);
            //关闭   在终止前允许执行以前提交的任务
            pool.shutdown();
            //pool.shutdownNow(); //方法阻止等待任务启动并试图停止当前正在执行的任务
        }
    }
    

    运行结果

    pool-1-thread-3   TwoThread执行_3。。。
    pool-1-thread-1   TwoThread执行_1。。。
    pool-1-thread-3   TwoThread执行_4。。。
    pool-1-thread-1   TwoThread执行_5。。。
    pool-1-thread-2   TwoThread执行_2。。。
    

    newCachedThreadPool(推荐使用)

    适用的场景

    执行很多短期异步的小程序或者负载较轻的服务器

    简单理解

    可缓存线程池,当线程池大小超过了处理任务所需的线程,那么就会回收部分空闲(一般是60秒无执行)的线程,当有任务来时,又智能的添加新线程来执行。有新任务到来,则插入到SynchronousQueue中,由于SynchronousQueue是同步队列,因此会在池中寻找可用线程来执行,若有可以线程则执行,若没有可用线程则创建一个线程来执行该任务;若池中线程空闲时间超过指定大小,则该线程会被销毁。

    /**
         * Creates a thread pool that creates new threads as needed, but
         * will reuse previously constructed threads when they are
         * available.  These pools will typically improve the performance
         * of programs that execute many short-lived asynchronous tasks.
         * Calls to {@code execute} will reuse previously constructed
         * threads if available. If no existing thread is available, a new
         * thread will be created and added to the pool. Threads that have
         * not been used for sixty seconds are terminated and removed from
         * the cache. Thus, a pool that remains idle for long enough will
         * not consume any resources. Note that pools with similar
         * properties but different details (for example, timeout parameters)
         * may be created using {@link ThreadPoolExecutor} constructors.
         *
         * @return the newly created thread pool
         */
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    返回ThreadPoolExecutor实例。
    corePoolSize为0;
    maximumPoolSize为Integer.MAX_VALUE;
    keepAliveTime为60L;
    unit为TimeUnit.SECONDS;
    workQueueSynchronousQueue(同步队列)。

    创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

    简单的实现

    public class ThreeThread extends Thread {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + "    ThreeThread执行中。。。");
        }
    }
    public class TestThreadPool {
        public static void main(String[] args){
            cacheThreadExecutor();
        }
       private static void cacheThreadExecutor(){
            //创建一个线程池
            ExecutorService pool = Executors. newCachedThreadPool();
            //创建实现了Runnable接口对象
            Thread t1 = new ThreeThread();
            Thread t2 = new ThreeThread();
            Thread t3 = new ThreeThread();
            Thread t4 = new ThreeThread();
            Thread t5 = new ThreeThread();
            //将线程放入池中并执行
            pool.execute(t1);
            pool.execute(t2);
            pool.execute(t3);
            pool.execute(t4);
            pool.execute(t5);
            //关闭  在终止前允许执行以前提交的任务
            pool.shutdown();
            //pool.shutdownNow()方法 阻止等待任务启动并试图停止当前正在执行的任务
        }
    }
    

    运行结果

    pool-1-thread-1    ThreeThread执行中。。。
    pool-1-thread-2    ThreeThread执行中。。。
    pool-1-thread-3    ThreeThread执行中。。。
    pool-1-thread-4    ThreeThread执行中。。。
    pool-1-thread-5    ThreeThread执行中。。。
    

    newScheduledThreadPool

    适用的场景

    周期性执行任务的场景。

    简单理解

    创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。如果所有线程均处于繁忙状态,对于新任务会进入DelayedWorkQueue队列中,这是一种按照超时时间排序的队列结构。

     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
     }
    
    public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE,
                  DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
                  new DelayedWorkQueue());
        }
    
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    
    

    底层:创建ScheduledThreadPoolExecutor实例。
    corePoolSize为传递来的参数;
    maximumPoolSize为Integer.MAX_VALUE;
    keepAliveTime为0;
    unit为:TimeUnit.NANOSECONDS;
    workQueue为:new DelayedWorkQueue() 一个按超时时间升序排序的队列
    更加详细的ScheduledThreadPoolExecutor

    简单的实现

    public class FourThread extends Thread {
        private String mName;
        public FourThread(String name){
            mName = name;
        }
        @Override
        public void run() {
                System.out.println(Thread.currentThread().getName() + "  FourThread执行_"+mName+"。。。");
        }
    }
    
    public class FiveThread extends Thread {
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " ==========================");
        }
    }
    
    public class TestThreadPool {
        public static void main(String[] args){
            newScheduledThreadPool();
        }
       private static void newScheduledThreadPool(){
            //创建一个线程池
            ExecutorService pool = Executors. newScheduledThreadPool(3);
            //创建实现了Runnable接口对象
            Thread t1 = new FourThread("1");
            Thread t2 = new FourThread("2");
    
            Thread five = new FiveThread();
    
            //每隔一段时间打印,证明两者是互不影响的
            ((ScheduledExecutorService) pool).scheduleAtFixedRate(t1,500,1000, TimeUnit.MILLISECONDS);
            ((ScheduledExecutorService) pool).scheduleAtFixedRate(t2,500,1000, TimeUnit.MILLISECONDS);
            ((ScheduledExecutorService) pool).scheduleAtFixedRate(five,500,2000, TimeUnit.MILLISECONDS);
            //关闭  在终止前允许执行以前提交的任务
            //pool.shutdown();
            //pool.shutdownNow()方法 阻止等待任务启动并试图停止当前正在执行的任务
        }
    }
    

    运行结果

    pool-1-thread-2 ==========================
    pool-1-thread-3  FourThread执行_1。。。
    pool-1-thread-1  FourThread执行_2。。。
    pool-1-thread-3  FourThread执行_1。。。
    pool-1-thread-1  FourThread执行_2。。。
    pool-1-thread-3 ==========================
    pool-1-thread-2  FourThread执行_1。。。
    pool-1-thread-1  FourThread执行_2。。。
    pool-1-thread-2  FourThread执行_1。。。
    pool-1-thread-3  FourThread执行_2。。。
    pool-1-thread-1 ==========================
    pool-1-thread-2  FourThread执行_1。。。
    pool-1-thread-1  FourThread执行_2。。。
    pool-1-thread-2  FourThread执行_1。。。
    pool-1-thread-3  FourThread执行_2。。。
    pool-1-thread-2 ==========================
    pool-1-thread-3  FourThread执行_1。。。
    

    如果细心的阅读上面四种底层源码实例,会发现传入的queue有所不同。为什么会不同的呢?其实在JDK中已经说得很清楚了,一共有三种类型的queue
    BlockingQueue<Runnable> workQueue可用于传输和保持提交的任务。可以使用此队列与池大小进行交互。存在三种情况,如下:

    1. 如果运行的线程少于corePoolSize,则 Executor始终首选添加新的线程,而不进行排队。(如果当前运行的线程小于corePoolSize,则任务根本不会存放,添加到queue中,而是直接开始运行)。
    2. 如果运行的线程等于或多于 corePoolSize,则 Executor始终首选将请求加入队列,而不添加新的线程。
    3. 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝。

    BlockingQueue缓冲队列有三种类型策略

    直接提交

    工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

    无界队列

    使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

    有界队列

    当使用有限的 maximumPoolSizes时,有界队列(如ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

    一般如果线程池任务队列采用LinkedBlockingQueue队列(无界队列策略)的话,那么不会拒绝任何任务(因为队列大小没有限制),这种情况下,ThreadPoolExecutor最多仅会按照最小线程数来创建线程,也就是说线程池大小被忽略了。

    如果线程池任务队列采用SynchronousQueue队列(直接提交策略)的话,也就是说他存数任务的能力是没有限制的,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加。

    如果线程池任务队列采用ArrayBlockingQueue队列(有界队列)的话,那么ThreadPoolExecutor将会采取一个非常负责的算法, 比如假定线程池的最小线程数为4,最大为8所用的ArrayBlockingQueue最大为10。随着任务到达并被放到队列中, 线程池中最多运行4个线程(即最小线程数)。即使队列完全填满,也就是说有10个处于等待状态的任务,ThreadPoolExecutor也只会利用4个线程。如果队列已满,而又有新任务进来,此时才会启动一个新线程,这里不会因为队列已满而拒接该任务,相反会启动一个新线程。 新线程会运行队列中的第一个任务,为新来的任务腾出空间。 这个算法背后的理念是:该池大部分时间仅使用核心线程(4个),即使有适量的任务在队列中等待运行。这时线程池就可以用作节流阀。如果挤压的请求变得非常多,这时该池就会尝试运行更多的线程来清理;这时第二个节流阀—最大线程数就起作用了。

    如果线程池任务队列采用DelayedWorkQueue队列(时间排序队列)的话,保证添加到队列中的任务,会按照任务的延时时间进行排序,延时时间少的任务首先被获取。

    总结

    • 了解使用线程池的好处。
    • 简单地从源码中了解线程池的工作原理及对创建过程更加了解。
    • 了解四种常用的线程池的场景、用法及其实现。

    《Java并发编程的艺术》
    ThreadPoolExecutor源码分析,很详细

    相关文章

      网友评论

        本文标题:线程之线程池原理及实现

        本文链接:https://www.haomeiwen.com/subject/pvsbwqtx.html