美文网首页
Java线程池研究

Java线程池研究

作者: 涂豪_OP | 来源:发表于2018-08-08 16:33 被阅读3次

        线程池是java中的重要知识点,今天研究下,首先来看下线程池是怎么使用的,然后在使用的基础上再进行原理剖析:

    public class MyTask implements Runnable{
        private int taskNum;
        
        public MyTask(int num) {
            this.taskNum = num;
        }
         
        @Override
        public void run() {
            System.out.println("正在执行task "+taskNum);
            try {
                Thread.currentThread().sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task "+taskNum+"执行完毕");
        }
    }
    

        上面首先创建一个线程子类,线程池就是首先预创建指定的数量,当我们需要执行某个逻辑的时候,把这些逻辑封装到一个Runnable对象里面,然后丢给线程池,线程池就会自动去执行,所以首先要创建一个线程子类,封装我们的业务逻辑。

    public class Test {
        public static void main(String[] args) {  
            //创建一个线程池:第一个参数表示核心线程数;第二个参数表示最大线程数
            //第三个参数表示线程存活的时间;第四个参数是存活的时间单位;最后一个
            //数表示一个任务队列,当没有足够线程去执行任务时,任务将被添加到队列       
            ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(5));
             
            //循环将一个个Runnable对象丢给线程池去执行
            for(int i=0;i<15;i++){
                MyTask myTask = new MyTask(i);
                pool.execute(myTask);
                System.out.println("线程池中线程数目:"+pool.getPoolSize()+",队列中等待执行的任务数目:"+
                pool.getQueue().size()+",已执行完的任务数目:" + pool.getCompletedTaskCount());
            }
            
            //关闭线程池
            pool.shutdown();
        }
    }
    

        上面首先创建一个线程池对象pool,然后丢给线程池对象pool的execute方法区执行,输出结果如下:

    //核心线程执行任务
    正在执行task 0
    线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完的任务数目:0
    线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完的任务数目:0
    正在执行task 1
    线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完的任务数目:0
    正在执行task 2
    线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完的任务数目:0
    正在执行task 3
    线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完的任务数目:0
    正在执行task 4
    //一共就5个核心线程,已经用完了,别的任务只能去队列里面等待了
    线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完的任务数目:0
    线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完的任务数目:0
    线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完的任务数目:0
    线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完的任务数目:0
    线程池中线程数目:5,队列中等待执行的任务数目:5,已执行完的任务数目:0
    //队列的容量是5,现在已经满了,所以要创建新的非核心线
    //程执行任务;队列不满,是不会创建非核心线程去执行的
    线程池中线程数目:6,队列中等待执行的任务数目:5,已执行完的任务数目:0
    //非核心线程执行任务
    正在执行task 10
    线程池中线程数目:7,队列中等待执行的任务数目:5,已执行完的任务数目:0
    正在执行task 11
    线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完的任务数目:0
    正在执行task 12
    线程池中线程数目:9,队列中等待执行的任务数目:5,已执行完的任务数目:0
    正在执行task 13
    线程池中线程数目:10,队列中等待执行的任务数目:5,已执行完的任务数目:0
    正在执行task 14
    task 1执行完毕
    task 13执行完毕
    task 2执行完毕
    正在执行task 6
    task 10执行完毕
    正在执行task 8
    task 11执行完毕
    task 4执行完毕
    task 3执行完毕
    正在执行task 9
    正在执行task 7
    task 0执行完毕
    正在执行task 5
    task 14执行完毕
    task 12执行完毕
    

        通过上面的例子可以看出,简单的使用线程池还是很方便的,下面从线程类ThreadPoolExecutor开始剖析线程池的原理,先从ThreadPoolExecutor的类信息开始看:

    public class ThreadPoolExecutor extends AbstractExecutorService {
        //原子整数,他是一个复合型数据,低29位表示线程池中活动的线程数量,
        //用workerCount表示;高3位代表线程池运行状态:RUNNING、
        //SHUTDOWN、STOP、TIDYING和TERMINATED,用runState表示
        //这个变量特别重要,后面很多操作都要获取他才能决定下一步
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
        //线程池中活动的线程数量workerCount占的位数:32 - 3 = 29
        private static final int COUNT_BITS = Integer.SIZE - 3;
    
        //线程池中获得的线程的最大数量,2的29次幂 - 1
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        //线程池状态,RUNNING表示正在运行,接受新任务并处理队列中的任务;-1在底层
        //由32个1表示,左移29位就是111 00000 00000000 00000000 00000000
        //也就是低29位全部为0;高3位全部为1的话,表示SHUTDOWN状态
        private static final int RUNNING    = -1 << COUNT_BITS;
    
        //线程池状态,不接受新任务,但是会处理队列里面的任务。0不管怎么
        //左移都是0所以低29位是0,高3位全部是0的话,表示SHUTDOWN状态
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
    
        //线程池状态,不接受新任务,不会处理队列里面的任务;而且会中断掉正在处理的任务;
        //1由31个0和1个1组成,左移29位就是001 00000 00000000 00000000 00000000,
        //也就是低29位全部是0的,高3位是001的话,表示STOP状态
        private static final int STOP       =  1 << COUNT_BITS;
    
        //线程池状态,表示所有任务已经结束,workerCount为0,线程池过渡到TIDYING
        //状态;2在底层由30个0和一个10组成,左移29位就是010 00000 00000000
        // 00000000 00000000,也就是低29位全部为0,高3位为010就是TIDYING状态
        private static final int TIDYING    =  2 << COUNT_BITS;
    
    
        //线程池状态,表示terminated()方法已经完成;3在底层由30个0和一个11组成
        //左移29位就是011 00000 00000000 00000000 00000000,也就是低29位
        //为0,高3位是011的话,线程池此时是TERMINATED状态
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        //任务队列,当线程池的核心线程不够用时,新添加
        //的任务将会被放入此队列,待以后执行这些任务
        private final BlockingQueue<Runnable> workQueue;
    
        //重入锁
        private final ReentrantLock mainLock = new ReentrantLock();
    
        //一个Worker类型的集合,Worker实现了Runnable接口,可以猜测
        //线程池中的运行单元是不是就是他呢?
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
        //一个Worker类型的集合,Worker实现了Runnable接口,可以猜测
        //线程池中的运行单元是不是就是他呢?
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
        //线程池中曾经创建过的最大的线程数量
        private int largestPoolSize;
    
        //任务已经完成的数量
        private long completedTaskCount;
    
        //线程工厂类,创建线程用的
        private volatile ThreadFactory threadFactory;
    
        //拒绝策略。当任务队列已满,而且线程数量达到
        //最大值时,如果还添加任务,就会采取此拒绝策略
        private volatile RejectedExecutionHandler handler;
    
    
        //空闲线程存活的时间,时间一到,直接终止
        private volatile long keepAliveTime;
    
        //是否终止处于空闲的核心线程,如果是true,时间到了就终止核心线程,否则不终止
        private volatile boolean allowCoreThreadTimeOut;
    
        //核心线程数量
        private volatile int corePoolSize;
    
        //此线程池最大的线程数,创建线程池的时候手动指定
        private volatile int maximumPoolSize;
    
        //此线程池最大的线程数,创建线程池的时候手动指定
        private volatile int maximumPoolSize;
    }
    

        上面就是ThreadPoolExecutor类的一些成员变量。下面分析下他的构造函数,ThreadPoolExecutor类有4个构造函数,3个构造函数最终调的是第4个,所以直接来看第四个构造函数:

    /**
     * 参数解析
     * @param corePoolSize      核心线程数
     * @param maximumPoolSize   最大线程数
     * @param keepAliveTime     空闲线程存活的时间
     * @param unit              上面那个时间的单位
     * @param workQueue         任务队列
     * @param threadFactory     线程工厂对象
     * @param handler           拒绝策略
     */
     public ThreadPoolExecutor(int corePoolSize,
                               int maximumPoolSize,
                               long keepAliveTime,
                               TimeUnit unit,
                               BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory,
                               RejectedExecutionHandler handler) {
         //一波判断,各个参数必须要在正常,不能残疾
         if (corePoolSize < 0 ||
             maximumPoolSize <= 0 ||
             maximumPoolSize < corePoolSize ||
             keepAliveTime < 0)
             throw new IllegalArgumentException();
         if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
    
             //一波赋值,不解释
             this.corePoolSize = corePoolSize;
             this.maximumPoolSize = maximumPoolSize;
             this.workQueue = workQueue;
             this.keepAliveTime = unit.toNanos(keepAliveTime);
             this.threadFactory = threadFactory;
             this.handler = handler;
     }
    
        线程池里面有两个线程非常重要,一个是核心线程corePoolSize,另一是最大线程maximumPoolSize;当有任务到来时,先用核心线程去执行;如果核心线程用完了,那么任务放入队列里面存放起来,此时就算设置了非核心线程,非核心线程也是懒得动的;当队列满了以后,才会起非核心线程去执行任务(非核心线程有点飘)。其执行流程如下: 线程池执行流程

        下面就来看下线程池执行的方法execute吧:

      public void execute(Runnable command) {
            //非空判断,不解释
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            //获取原子整数的值,这个整数聚合了当前活动的线程总数和线程池状态
            int c = ctl.get();
    
            //如果核心线程没有用完,那么用核心线程执行任务
            if (workerCountOf(c) < corePoolSize) {
    
                //addWorker就是创建一个Worker对象来执行此任务,众多
                //的worker是放在集合里面统一管理的,如果新建的worker
                //成功加入此集合,那么说明addWorker成功;注意addWorker
                //成功并不代表此任务就执行成功了。Worker对象是Runnable
                //的子类,里面有个Thread的成员变量,我们传进来的任务就是
                //由这个变量来运行,所以Worker是任务执行的载体,很重要
                if (addWorker(command, true))
                    return;
    
                //addWorker失败,说明worker创建失败,或者
                //没有添加进集合里面,这里再次获取线程池的信息
                c = ctl.get();
            }
            //如果线程池处于运行状态,而且任务成功被添加进队列,重新检查。因
            //为在判断线程池状态和添加任务之后,线程池的状态可能再次发生变化
            //workQueue.offer就是把我们传进来的任务添加进任务队列
            if (isRunning(c) && workQueue.offer(command)) {
    
                //再次获取线程池状态
                int recheck = ctl.get();
    
                //若干线程池状态发生变化,不再运行了,
                //那么从任务队列移除此任务,相当于回退操作
                if (! isRunning(recheck) && remove(command))
    
                    //执行拒绝策略
                    reject(command);
    
                //如果线程池正在运行,但是活跃线程数量为0,那么再addWorker一次
                else if (workerCountOf(recheck) == 0)
    
                    //这里传入null的原因是目标任务在外层if已经添加进去了。注意
                    //最后一个参数,没有核心线程了,说明只能用非核心线程执行任务
                    addWorker(null, false);
            }
    
            //如果添加队列失败,那么手动新增线程去执行;如果还是失败
            //说明线程池挂了或者处于饱和状态,没得救了,执行拒绝策略
            else if (!addWorker(command, false))
                reject(command);
        }
    

        execute就是这样了,调用addWorker去新增一个Worker对象,然后通过此对象来运行我们的任务,所以先简要分析下Worker类,他是ThreadPoolExecutor的内部类:

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
                //设置同步状态,-1代表在调用runWorker前禁止中断
                setState(-1); // inhibit interrupts until runWorker
    
                //给成员变量赋值,这个值可能是空,如果是空
                //,就从任务队列里面获取(当然不是在这里获取)
                this.firstTask = firstTask;
    
                //构建Thread对象,最终就是他去执行我们传进来的任务
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            //这里才是执行任务的地方
            public void run() {
                runWorker(this);
            }
            ......
    }
    

        Worker本身不是很难,可以简单的理解成我们常说的线程线程,简单了解下即可,下面研究重要的方法addWorker:

        //根据线程池当前的状态和边界值(核心线程数和最大线程数)来就检查一个Worker是否
        //可以添加到线程池。如果可以,workerCount将会增加;如果有可能,此Worker将被
        //创建和运行传进来的任务。如果线程池挂了,或者线程数量超出边界值,此方法将会返
        //回false;如果线程工厂创建线程失败,那么也会返回false,然后回退
        private boolean addWorker(Runnable firstTask, boolean core) {
            //for循环的标记位
            retry:
    
            //双重for循环,外层for循环用来判断线程池
            //状态;内层for循环用来增加线程数的CAS操作
            for (;;) {
                //获取线程池信息
                int c = ctl.get();
    
                //获取线程池活动运行状态
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                // 如果线程池当前状态大于等于SHUTDOWN,也就是SHUTDOWN
                // 、STOP、TIDYING和TERMINATED其中之一,那么继续后面的
                // 判断,后面的判断不好理解,主要在于外层有个取反操作,可以
                // 转换如下:(rs!=SHUTDOWN || firstTask!=null ||
                // workQueue.isEmpty())。分析:1.如果线程池状态大于SHUTDOWN
                //,说明不宜接受新任务,也不会处理队列里面的任务,返回false;2.如
                //果传进来的firstTask不为空,说明是添加新任务,此时不会处理,返回
                //false,注意firstTask是可以为空的,为空代表着新起线程执行队列里面
                //的任务;workQueue.isEmpty()代表没有等待的任务了,此时线程池的状
                //是大于等于SHUTDOWN的,当然不会新起线程去执行任务了。
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    //获取线程数量
                    int wc = workerCountOf(c);
    
                    //如果线程数量超出边界,那么返回false,Worker添加失败
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
    
                    //如果没有超出边界,原子操作增加线程数量
                    if (compareAndIncrementWorkerCount(c))
                        //增加完成后,跳出外层for循环,添加worker
                        //成功,addWorker方法成功了一半,可喜可贺
                        break retry;
    
                    //如果自增线程数量失败,再次获取线程池运行时信息
                    c = ctl.get();  // Re-read ctl
    
                    //如果当前的状态不等于之前的状态,跳出内层循环,执行外循环,再次判断
                    //状态;因为执行下次循环的时候,线程池的状态可能会发生变化。反正这里
                    //就是往死里去增加线程池记录的线程数量,不成功誓不罢休,除非超出边界了
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                    // CAS操作失败要么是因为线程池状态改变了,要么是因为workCount改变了
                    // ,如果workCount改变了,说明存在竞争,继续内循环自增workerCount
                }
            }
    
            //流程执行到这里,说明活跃的线程数已经成功自增
    
            //定义两个变量,
            boolean workerStarted = false;
            boolean workerAdded = false;
    
            //线程池中的一个线程对象就是一个Worker对象
            Worker w = null;
    
            try {
                //构建一个Worker对象
                w = new Worker(firstTask);
    
                //拿到Worker对象里面的线程类
                final Thread t = w.thread;
    
                //如果线程不为空
                if (t != null) {
                    //重入锁
                    final ReentrantLock mainLock = this.mainLock;
    
                    //先锁起来
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        //拿到锁有重新检查线程池状态
                        int rs = runStateOf(ctl.get());
    
                        //如果线程池处于运行状态,或者线程池SHUTDOWN了,同时
                        //firstTask为空,说明要起线程去执行任务队列里面的任务
                        //,其他情况一律不执行。这也说明了SHUTDOWN状态下的线程
                        //池,是会去执行任务队列里面的任务的,SHUTDOWN比较厚道
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
    
                            //检查刚起的线程是不是已经被执行了,如果执行了,死给你看
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
    
                            //workers是一个HashSet,里面保存的是Worker对象
                            workers.add(w);
    
                            //HashSet的元素个数
                            int s = workers.size();
    
                            //如果HashSet里面的Worker数量大于曾经创建过
                            //的最大的线程数量,那么对这个数量进行重新赋值
                            if (s > largestPoolSize)
                                largestPoolSize = s;
    
                            //Worker对象添加成功
                            workerAdded = true;
                        }
                    } finally {
                        //释放锁
                        mainLock.unlock();
                    }
    
                    //如果Worker添加成功,启动线程去执行
                    if (workerAdded) {
                        t.start();
                        //设置状态
                        workerStarted = true;
                    }
                }
            } finally {
                //任务执行失败的话,把Worker从HashSet里面移除
                if (! workerStarted)
                    //调用addWorkerFailed处理失败的后续操作
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

        addWorker方法有点难。这个方法上来就是一个双重for死循环,外层for循环的作用是判断线程池的状态,如果当前线程池的状态大于SHUTDOWN,那么此线程池不接受新任务;如果大于等于SHUTDOWN,而且传进来的任务不为空(说明是新任务),那么也不接受;如果线程池状态大于等于SHUTDOWN,而且任务队列为空,没的说,也不接受;三种不接受都导致返回false,这就是外层for循环到的作用;一旦线程池状态校验通过,那么就进入第二层for循环,增加线程池活跃线程的数量,通过死循环,气而不馁的增加数量,除非数量超过线程池的边界,此时才返回false。如果线程池的线程数量自增成功,那么构建一个Worker对象,此时再次检查线程池状态,因为上面两个for循环是线程不安全的,一个线程通过了上面两个for循环检查状态和自增数量后,另外一个线程也可以去自增,此时可能导致线程池的状态发生变化,所以需要再次检查;这次检查通过后,将新建的Woker对象加入集合workers,接着就执行Worker对象里面的Thread的run方法;如果失败,调用addWorkerFailed。这段代码逻辑很简单,但是实现起来是比较复杂的。下面看下Worker的Thread的run方法是怎么执行的:

    /** Delegates main run loop to outer runWorker  */
    //这里才是执行任务的地方
    public void run() {
        runWorker(this);
    }
    

        简单的调用runWorker,runWorker方法是一个核心方法,很难,下面尽量分析:

        final void runWorker(Worker w) {
            //拿到当前的线程,也就是调用runWorker方法的线程
            Thread wt = Thread.currentThread();
    
            //拿到Task
            Runnable task = w.firstTask;
    
            //将firstTask置空
            w.firstTask = null;
    
            //遥想当年创建Worker的时候设置了一个状态-1,代表
            //不允许打断,这里设置成1代表可以打断
            w.unlock(); // allow interrupts
    
            //标记线程是不是异常终止的
            boolean completedAbruptly = true;
    
            try {
                //如果Task不为空,说明用户传入了一个任务;如果为空,说
                //明要从任务队列里面获取任务来执行,否则跳出while循环
                while (task != null || (task = getTask()) != null) {
                    //加锁,目的是为了在线程池shutdown
                    //的时候,对于正在执行的任务不被中断
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    //如果线程池正在stop,那么要保证当前线程是中断状态
                    //如果不是,要保证当前先不是中断状态
                    //runStateAtLeast(ctl.get(), STOP)的作用就是拿到线程池
                    //的运行状态,然后和STOP比对,大于等于STOP就返回true;
                    //Thread.interrupted是设置调用runWorker方法的线程的状态为中断,仅仅是设置
                    //线程中断状态,不会真的中断运行runWorker方法的线程,除非runWorker方法的线程
                    //里面有sleep、wait或者join,此时就抛出InterruptedException异常
                    //!wt.isInterrupted()是指任务线程没有被中断
                    //所以if的作用是如果线程池的状态大于等于STOP,而且任务线程没有中断,那么给任务线程设置一个中断
                    //标记位;或者运行runWorker方法的线程别设置了中断位,而且线程池的状态大于等于STOP,那么任务线程也要设置标记位
                    //否则就不会给任务线程设置标记位,这就是为什么线程池shutdown之后,任务还能运行的原因
                    if ((runStateAtLeast(ctl.get(), STOP) 
                            || (Thread.interrupted() 
                            && runStateAtLeast(ctl.get(), STOP))) 
                            && !wt.isInterrupted())
                        //设置任务线程的中断状态,也就是说interrupt会发出中断信号,这个信号只能被wait()、sleep()和join()方法
                        //捕捉并产生中断。interrupt本身仅仅只会设置线程中断标记位,并不会真正产生中断
                        wt.interrupt();
                    try {
                        //任务执行前需要做的工作,默认空实现,可自己扩展
                        //beforeExecute可能会导致线程异常而死亡
                        beforeExecute(wt, task);
    
                        //创建一个Throwable对象,捕获异常用
                        Throwable thrown = null;
                        try {
                            //执行任务
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            //任务执行后需要做的工作,默认空实现,可自己扩展
                            //这里把抛出的异常都传给了afterExecute
                            afterExecute(task, thrown);
                        }
                    } finally {
                        //任务置空
                        task = null;
    
                        //完成的任务自增
                        w.completedTasks++;
    
                        //释放锁
                        w.unlock();
                    }
                }
    
                //不是异常中断的
                completedAbruptly = false;
            } finally {
                //退出Worker
                processWorkerExit(w, completedAbruptly);
            }
        }
    

        runWorker方法不太好理解,主要是在锁的那里理解的有偏差。这个方法的主要任务就是执行我们传进来的Runnable对象的run方法。接下来看下getTask方法:

        //获取执行任务
        private Runnable getTask() {
            //上次从队列里面获取任务是否超时导致未能获取到任务
            boolean timedOut = false; // Did the last poll() time out?
    
            //死循环
            for (;;) {
                //获取线程池运行时状态
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                //如果线程池状态大于等于SHUTDOWN,任务队列是空的,那么返回null同时将
                //workerCount自减(CAS操作),因为addWorker中已经将他自增了,所以这里要退货。
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                //拿到活跃线程数量
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                //allowCoreThreadTimeOut默认是false,代表核心线程是否有超时限制
                //如果wc > corePoolSize,说明用到了非核心线程,此时就要有超时限制了
                //timed的主要用于获取任务,如果有超时限制,那么调用poll方法获取任务
                //在keepAliveTime时间内没有获取到任务,就继续下次循环获取任务;如果
                //是false,那么调用take方法获取任务,此时线程就会卡在这,阻塞式获取
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                //这个判断分解后更好理解,他的判断依据是如果当前活跃线程数大于最大线程数,那么直接返回,不搞了;
                //不管任务队列是不是空的,也不管wc到底是不是大于1,统统不玩了。如果获取任务有超时限制,而且上次
                //获取任务还搞砸了,超时了,那么如果活跃线程数大于1,或者队列是空的,那么也不玩了;队列是空的时候
                //不玩很好理解,那么如果允许超时,而且上次还超时了,那么也不玩了呢,应该是队列里面没有任务了吧(不确定)
                if (  (wc > maximumPoolSize || (timed && timedOut))      &&     (wc > 1 || workQueue.isEmpty())   ) {
                    //将活跃线程数自减
                    if (compareAndDecrementWorkerCount(c))
                        //返回空
                        return null;
    
                    //如果自减失败,继续for循环,因为下次for循环的时候,
                    //线程池的状态可能有所改变,如果没变,那么再次自减
                    continue;
                }
    
                try {
                    //允许超时,那么调用poll获取任务,否则调用take阻塞式获取,也就是会一直卡在这
                    //poll只在keepAliveTime这个时间内获取任务,如果获取不到,那么进行下一次循环
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();//阻塞式方式获取,一直等到队列有任务
                    if (r != null)
                        //返回结果
                        return r;
                    //如果没有拿到任务,说明获取超时,修改状态,继续下次循环
                    timedOut = true;
                } catch (InterruptedException retry) {
                    //如果被打算,那么继续下次循环
                    timedOut = false;
                }
            }
        }
    

        getTask方法也比较难,主要是if条件写的太简洁了,分析起来有点吃力。下面分析线程退出的时候线程池做了什么:

        //处理线程的退出。两种场景,一种是由于异常而退出,一种是由于任务执行完毕而退出
        private void processWorkerExit(Worker w, boolean completedAbruptly) {
            //如果是异常退出,那么将线程池的线程数量自减
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //正常退出的话,那么将执行成功的任务自增
                completedTaskCount += w.completedTasks;
    
                //从集合里面删除此worker
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
    
            //尝试终止线程池,不一定能成功
            tryTerminate();
    
            //获取线程池运行时信息
            int c = ctl.get();
    
            //如果此时状态小于STOP
            if (runStateLessThan(c, STOP)) {
    
                //如果不是异常退出的话
                if (!completedAbruptly) {
    
                    //计算核心线程的最小数量
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    
                    //如果没有核心线程了,但是任务队列里面还有任务,那么将min置为1
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
    
                    //如果线程数量不小于最小值,那么退出,队
                    //列里面的任务就由现有的线程去负责执行
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                
                //若干是异常退出的话,那么再增加一个非核心线程
                //如果队列里面还有任务,但是线程数不够,也会走这里
                addWorker(null, false);
            }
        }
    

        这个方法还算比较简单,首先判断此线程是不是异常退出,是的话将线程数量自减。接着,不管是正常退出还是异常退出,都要将线程从worker集合里面移除掉;接着判断线程池状态,如果小于STOP,说明线程池此时正在正常运行,此时判断核心线程的最小数量,如果核心线程的最小数量为0,但是此时队列里面还有任务的话,就将核心线程的最少数量置成1;如果此时线程池的线程数量大于等于这个核心线程池的最小数量,那些队列里面的任务就让这些线程去执行吧。如果是异常退出的线程,或者此时线程池的线程数量小于核心线程的最小值,那么就需要添加一个线程进去。

        下面分析线程池退出机制:

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //安全检查,线程池不是谁都可以关掉的,必须具备相应的权限,不管
            checkShutdownAccess();
    
            //设置线程池状态为SHUTDOWN
            advanceRunState(SHUTDOWN);
    
            //中断空闲中的线程,注意,正在运行的线程是不会中断的
            interruptIdleWorkers();
    
            //shutdown后的回调,空实现,根据自己的需求扩展
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
            
        //最后尝试终止线程池
        tryTerminate();
    }
    

        接着分析tryTerminate:

        final void tryTerminate() {
            //又是死循环,这么多死循环
            for (;;) {
                //拿到线程池的运行时信息
                int c = ctl.get();
                //如果正在运行,或者状态比TIDYING大,或者
                //状态是SHUTDOWN但是队里里面还有任务,那么就不终止
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
                //如果活跃线程数不等于0,那么终止空闲的线程,此时就不能直接退出了,要返回
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
    
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //if里面的逻辑是将状态TIDYING和线程数0聚合在一起
                    //然后更新到ctl,如果状态是TIDYING而且线程数是0
                    //了,那么这个线程就没什么用了,可以退出了
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            //空实现,根据自己的需要扩展
                            terminated();
                        } finally {
                            ctl.set(ctlOf(TERMINATED, 0));
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }
    

        接下来看下空闲线程是怎么退出的:

        private void interruptIdleWorkers(boolean onlyOne) {
            //上锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //遍历Worker
                for (Worker w : workers) {
                    //拿到线程
                    Thread t = w.thread;
                    //如果线程没有别中断,而且能够获取worker的锁,那么
                    //就将此线程的中断标记位置为true,如果获取不到,说明
                    //此worker正在执行,此时就不能中断这些线程了。
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    //如果终止空闲线程的操作只执行一次的话,那么就退出
                    //否则就往死里循环,一直到所有的执行任务的线程都空闲
                    //然后终止为止
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
    

        自此,线程池的源码磕磕碰碰的分析了一遍,有些地方理解的不到位,待功力大增了再修改。

    相关文章

      网友评论

          本文标题:Java线程池研究

          本文链接:https://www.haomeiwen.com/subject/udiavftx.html