【Java并发编程】—–“J.U.C”:ThreadPoolEx

作者: Peter潘的博客 | 来源:发表于2016-03-16 04:51 被阅读711次

    前言

    在前面一节JDK线程池(一):体系结构中已经分析了JDK的线程池核心接口的组成,通过那些接口的描述,我们知道线程池它所提供的功能,而本文将围绕JDK中的线程池是如何具体实现这些功能展开分析。
    (PS:由于ThreadPoolExecutor的具体实现比较复杂,如果分析存在有误的地方,请拍砖!)
    本文将以如下内容作为大纲进行分析说明:

    • ThreadPoolExcutor的核心组成
    • 任务的提交的执行流程分析
    • RejectedExecutionHandler分析
    • ThreadFactory分析

    1.走进ThreadPoolExecutor的世界

    首先让我们看看ThreadPoolExecutor的继承结构:


    JDK线程池的继承结构图

    通过上图我们我们可以看到ThreadPoolExecutor继承了抽象类AbstractExecutorService,从而完成对ExecutorService接口的实现,而AbstractExecutorService只是提供了一些模板方法的实现,具体的处理细节都还是落实到ThreadPoolExecutor中。

    1.1 核心参数

    由于线程池要应对不同的负载情况,ThreadPoolExecutor为了更好的适配不同的场景,因此其提供了很多的可调节的参数,让用户根据实际的负载情况进行调节。这些核心参数需要在创建ThreadPoolExecutor时通过构造方法来进行指定,ThreadPoolExecutor中提供了4个重载的构造方法,下面让我们看看ThreadPoolExecutor中最复杂的一个的构造方法的实现(其余的构造方法底层都是调用下面的这个):

            /*
             (1).corePoolSize:设置一个线程池中的核心线程数
             如果设置allowCoreThreadTimeOut为false的情况下:
             即使当线程池中的线程处于空闲状态,这些线程也不会被线程池中移除。
             如果设置了allowCoreThreadTimeOut为true,
             那么当核心线程在空闲了一段时间后依旧没有用于工作,那么将会从线程池中移除。
             注意:(allowCoreThreadTimeOut默认为false,通常情况下也无需做修改)
             
             (2).maximumPoolSize:线程池中所允许创建最大线程数量
             
             (3).keepAliveTime:当线程池中的线程数量大于核心线程数,
             如果这些多出的线程在经过了keepAliveTime时间后,
             依然处于空闲状态,那么这些多出的空闲线程将会被结束其生命周期。
             
             (4).unit:keepAliveTime的时间单位
             
             (5).workQueue:用于存放任务的阻塞队列,当线程池中的核心线程都处在执行任务时,
             提交的任务将被存储在workQueue进行缓冲。
             该队列只能存放通过execute方法提交的Runnable任务。
            
            (6).threadFactory:线程池中用于创建线程的工厂
            在这里使用线程工厂的目的也是为了解耦,将创建的实现细节通过工厂进行封装,
            而不是直接将创建的方式固化在ThreadPoolExecutor本身的代码中。
            
            (7)handler:当线程池中的线程数量达到最大并且阻塞队列也已经满了无法再添加任务时,
            线程池所采取的处理策略。
            
            */
            public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            //对给定的核心参数进行合法性的校验                  
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    

    ThreadPoolExecutor根据上述给定的参数,会根据实际的负载情况对一个线程池中的实际工作线程做出动态调正,我们可以通过getPoolSize()方法来查看当前线程池中实际的线程数量。

       /**
         * 返回当前线程池中实际线程的数量
         *
         * @return the number of threads
         */
        public int getPoolSize() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                /*根据当前线程池的状态进行判断,如果线程池已经处
                于terimated状态时,则返回0,否则就通过worker集合(底层实际是一个HaseSet)中
                返回当前线程的数量。 
                */
                return runStateAtLeast(ctl.get(), TIDYING) ? 0
                    : workers.size();
            } finally {
                mainLock.unlock();
            }
        }
    

    上面的这些核心参数除了在ThreadPoolExecutor创建时指定,在后期运行的过程中也可以进行动态的修改,只是提供其对应的setXXX方法完成修改即可,从这点我们可以很好的感觉到ThreadPoolExecutor实现的灵活性。

    1.2 线程池的状态与工作线程数量

    横看成岭侧成峰,远近高低各不同。

    普通程序猿看整数,它可能就是占4个字节的数字(Java中),而大神看整数却能看出不同的风景。在ThreadPoolExecutor中使用了一种非常巧妙的方式对表示线程池的状态工作线程的数量
    在ThreadPoolExecutor中,使用了一个AtomicInteger对将当前线程的工作状态工作线程数量(有效线程数)使用同一个整数进行包装。
    为了将两个数值包装在同一个整数中,它将32位的高3位表示线程的状态值,而后29位来表示线程的数量。这也意味着,在ThreadPoolExecutor中最多可以存在线程数实际为2^29-1个,当然这个只是理论值,实际的应用根本不可能有这么多线程数量。

    设计思想与目的:

    也许很多人会觉得这样的设计有点奇怪,因为不就是表示2个信息嘛,我的线程数量用个int来表示,而线程状态用个byte来表示不就OK嘛,不就多浪费一个字节数量而已嘛。其原因在于,线程的状态和数量往往需要同时更新,然而线程池天生处在一个并发的环境下,那么当对2个变量进行修改时,那么就势必需要通过锁来进行线程安全的处理,从而保证2个变量修改具备原子性;但是这种做法对于性能的影响是非常严重的,因此在ThreadPoolExecutor将两个变量的分别包装在一个变量中,最后的并发操作发生在AtomicInteger上,而AtomicInteger恰恰就是具有一个无锁原子操作类,这样既可以解决线程安全的问题,又可以规避避免所的使用,从而提供性能。

    下面是ThreadPoolExecutor中对状态和线程数量的源码,这里使用的是JDK1.7,在第375行。

        /*
        使用AtomicInteger来对实际的线程数量(workCount)
        以及这个线程池的状态(runState),
        该值默认为111...000(29个0),每增加一个线程,ctl值就会+1
        使用后29位来保存线程数量。
        */
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        /*
        使用int的bit数量减去3,即32-3=29
        */
        private static final int COUNT_BITS = Integer.SIZE - 3;
       
        /*
          线程池中工作线程的最大数量为2^29-1
        */
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        /*
         使用如下整型值来表示线程池的状态
         RUNNING:当处于RUNNING状态时,
         线程池可以接受新的任务并且会执行任务队列中的任务。
        */
        private static final int RUNNING    = -1 << COUNT_BITS;
        /*
        SHUTDOWN:不再接受新的任务,但是会继续处理队列中还没有处理完成的任务
        */
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        /*
        STOP:不再接受新的任务,
        并且不会继续处理队列中还没有处理完成的任务,
        同时还会去中断当前正在执行的任务。
        */
        private static final int STOP =  1 << COUNT_BITS;
        
        /*
        TIDYING:所有的任务都已经结束,并且workCount的数量为0。
        */
        private static final int TIDYING =  2 << COUNT_BITS;
        
        /*
        当线程池的状态变到TERMINATED状态后,ThreadPoolExecutor
        提供了一个terminated()方法供用户进行扩展实现。我们可以通过这个方法记录线程池关闭等信息。
        通过上面分析可以自己手动进行运算一下,会得到如下的结果:
        (1).当线程池处于RUNNING时,ctl值小于0,
        (2).而当线程池处于其他状态时,则ctl将大于等于0。
        */
        private static final int TERMINATED = 3 << COUNT_BITS;
    
    
        /*
        使用一个整数值进行两个信息的包装与拆解的过程
        获取线程的状态,取32位的前3位即可。
        即就ctl与11100....(29个0)进行按位与。
        */
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        
        /*
        取线程的数量,即取后29位的值,即将ctl与
        00011...(29个1)进行按位与运算。
        */
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        
        /*
        将两个状态值组装成一个整数
        通过rs(状态值)与wc(workCount)值进行或运算,它们各自独立
        不会产生相互的响应。
        */
        private static int ctlOf(int rs, int wc) { return rs | wc; }
        
        private static boolean runStateLessThan(int c, int s) {
            return c < s;
        }
    
        private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        }
    
        /*
        对线程池状态的判断方法,此时无需对ctl进行拆分获前3位进行比较。
        因为SHUTDOWN的值为0,而只要ctl小于0,则说明线程池就处于运行状态。
        */
        private static boolean isRunning(int c) {
            return c < SHUTDOWN;
        }
    
    

    2.看我如帮你完成任务

    之前我们可能将一个任务丢给线程池就不管了,它的底层执行对于我们来说可能是一个黑匣子;下面我们来深入的看看一个任务是如何被提交到线程池中,如何被线程池的线程所执行,我们以execute(Runnable)方法作为例子来进行分析。

    查看ThreadPoolExecutor的executor方法:

        public void execute(Runnable command) {
            //判断提交的任务是否为null
            if (command == null)
                throw new NullPointerException();
            //获取到ctl值    
            int c = ctl.get();
            /*
              通过ctl值进行拆解,获取到具体的线程池中实际的线程数量,
              判断其是否小于用户所执行的corePoolSize,如果小于则
              直接创建一个线程进行执行(通过addWorker(command,true)方法来处理)
              否则就继续执行下面代码
            */
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            /*
            判断线程池中的状态是否为RUNNING,如果是
            则将任务提交到任务队列中,如果提交任务队列成功,
            则会对线程池的状态进行一次重复检查,再次检查当前线程的状态以及实际的线程数量。
            提交失败可能是由于任务队列已经满了,从而无法提交。
            */
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            /*
            当提交任务队列失败,则会去判断当前线程池线程是否已经到达最大值
            如果未到达,则创建一个线程继续执行,否则则执行拒绝处理。
            */
            else if (!addWorker(command, false))
                reject(command);
        }
           /*
            根据给定的任务来创建线程,创建的过程中会根据实际的线程数量以及状态来判断是否去创建。
            core:当core为true,判断当前线程池中实际线程数是否大于corePoolSize,如果大于,则不执行。
                 当core为false,判断当前线程池中实际线程数是否大于maximumPoolSize,如果大于,则不执行。
            */
        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    //获取实际的线程数量
                    int wc = workerCountOf(c);
                    /*
                       判断是否大于2^29-1,同时根据给定的core参数,
                       来选择到底是与corePoolSize还是maximumPoolSize进行比较
                    */
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    //当比较成功,则对工作线程数+1,跳出循环    
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
            /*
                完成对任务的真正执行
            */
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                final ReentrantLock mainLock = this.mainLock;
                /*
                将任务封装在一个Worker中,这里的Worker是
                ThreadPoolExecutor中所定义的一个Runnable实现类。
                */
                w = new Worker(firstTask);
                /*
                   worker中同时封装了线程对象,该线程对象是从线程工厂中所获取
                   因此Worker的数量和线程池中线程的数量是一一对应的。  
                */  
              final Thread t = w.thread;
                if (t != null) {
                    mainLock.lock();
                    try {
                
                        int c = ctl.get();
                        int rs = runStateOf(c);
                        //对状态进行重复检测,这里具体的细节我们暂且忽略,只关注任务什么时候被执行成功。
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            /*
                              通过一个HashSet专门存储Worker,本质上是存储线程,        
                              因为每一个Worker底层都维护着一个线程
                            */   
                            workers.add(w);
                            /*
                            获取当前的Set集合的大小,用户统计线程池中线程数量达到最高峰时的线程数,
                            使用largestPoolSize来存储。
                          */ 
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            //设置任务添加成功,只有该值为true时,任务才会真正得到执行
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        //执行任务    
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    总结线程池任务提交的过程:
    通过上面的代码分析,我们大体上可以知道一个任务提交到线程池中会经历如下的3步过程:

    1. 如果线程池中实际的线程数量小于corePoolSize,那么就启动一个新的线程进行任务的处理。
    2. 如果线程池中实际的线程数量大于等于corePoolSize,则将任务放置到任务队列中进行处理。
    3. 如果由于任务队列已经满了,无法再存放新的任务,则判断线程池中实际的线程数量是否大于maximumPoolSize,如果小于,则创建新的线程执行,否则将拒绝执行任务。

    3.负载过高我该怎么办?

    上面我们已经提到了当线程池实际的数据量到达最大值时,如果再次提交新的执行,则会拒绝执行。那么ThreadPoolExecutor是如何拒绝执行的呢?
    ThreadPool中默认的拒绝策略使用的是中断策略,即当无法接哦受新的任务时,直接抛出RejectedExecutionException异常。

        /**
         * ThreadPool中默认定义的RejectedExecutionHandler
         */
        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
            
    

    AbortPolicy实现了java.util.concurrent.RejectedExecutionHandler接口。

    public interface RejectedExecutionHandler {
    
        //当ThreadPoolExecutor无法接受新的任务时,将会执行该方法完成任务的拒绝执行。    
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    
    public static class AbortPolicy implements RejectedExecutionHandler {
    
            public AbortPolicy() { }
    
            /*
            AbortPolicy的执行拒绝策略非常简单,当无法再次接受新的任务时,就抛出一个异常。
            */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    

    除了AbortPolicy,ThreadPoolExecutor中还定义3个RejectedExecutionHandler的实现类,它们分别是DiscardPolicy、DiscardOldestPolicy、CallerRunsPolicy,下面是这3个实现类所对应的源码:

     public static class DiscardPolicy implements RejectedExecutionHandler {
    
            public DiscardPolicy() { }
    
        //DiscardPolicy的处理方式非常简单,直接忽略提交的任务
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
    
    
    
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
     
            public DiscardOldestPolicy() { }
    
            
            /*
            DiscardOldestPolicy首先判断线程池是否已经关闭,
            如果未关闭,则将任务队列中之前提交的任务移除,将
            新提交的任务加入到队列中
            */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }
        
         public static class CallerRunsPolicy implements RejectedExecutionHandler {
     
            public CallerRunsPolicy() { }
            
            /*
            CallerRunsPolicy所采取的策略是不再启动新的线程,
            而是让当前提交任务的线程直接自己去处理这个任务
            */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
        
    

    在实际的应用中,我们还可以自己去扩展RejectedExecutionHandler,根据自己的业务需求来进行相应的处理,将拒绝的信息通过日志记录从而方便后期进行参数调优。

    4.线程从何而来?

    我们在分析ThreadPoolExecutor的参数时,提到了一个ThreadFactory的东东,它是一个用于创建线程的工厂,该接口的定义也非常的简单,里面只有一个方法。

    public interface ThreadFactory {
    
        Thread newThread(Runnable r);
    }
    
    

    在ThreadPoolExecutor中,默认所使用的ThreadFactory是Executors中所定义的DefaultThreadFactory,该方法的具体实现如下:

    
     public Thread newThread(Runnable r) {
               //设置线程组以及线程的名字
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                //设置线程为前台线程,同时设置线程的优先级  
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
    
    

    ThreadFactory在ThreadPool中主要用于Worker中,在执行任务的时候,用户所提交的任务会被包装成一个Worker来进行执行。而Worker内部维护着一个线程对象,这个线程对象就是从ThreadFactory中所得到的。

    
    /*
        Worker的构造方法,将用户提交的任务进行封装
    */
     Worker(Runnable firstTask) {
                setState(-1);
                this.firstTask = firstTask;
                //通过获取线程工厂来获取到线程
                this.thread = getThreadFactory().newThread(this);
            }
    

    至此,ThreadPoolExecutor中比较核心的内容就分析到这里,如果发现在分析的过程中存在问题,请及时指正!后面的内容将分析一下线程池工具类--Executors以及JDK1.7推出的ForkJoinPool。

    相关文章

      网友评论

      本文标题:【Java并发编程】—–“J.U.C”:ThreadPoolEx

      本文链接:https://www.haomeiwen.com/subject/mejxlttx.html