美文网首页
线程池任务执行分析

线程池任务执行分析

作者: 唯爱_0834 | 来源:发表于2023-01-05 13:28 被阅读0次

    背景

    我们自己创建的线程其只能start()执行一次,一旦执行完毕或被中断,即走terminated终止状态结束线程了,你难道没有这样的疑问为何线程池中的线程却可以一直执行?核心及非核心线程是如何实现的呢?

    线程池回顾

    1. 如何创建一个线程池
    //实际调用对象
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
    
    • corePoolSize:核心线程池
    • maxinumPoolSize:表示最大允许被创建的线程数
    • keepAliveTime,unit: 非核心线程数的存活时间
    • workQueue: 用来暂时保存任务的工作队列
    • threadFactory:用来创建线程,可定义名称及优先级
    • handler: 任务拒绝策略
      1. 当调用shutdown 等方法关闭线程池后,这时候即使线程池内部还有没执行完的任务正在执行,但是由于线程池已经关闭,我们再继续想线程池提交任务就会遭到拒绝
      2. 当达到最大线程数,线程池已经没有能力继续处理新提交的任务时,这是也就拒绝。
    1. 参考此分享文章:深入理解线程池线程池任务执行流程如下:
      image.png
    2. 对应线程池运行流程图如下:


      image.png

    任务调度

    • 当用户提交一个任务会通过Executor.execute()方法执行,他的步骤上方已经总结过了,我们直接看新增线程执行任务的addWorker(Runnable firstTask, boolean core)方法,参数Runnable:运行任务,core是否为核心线程
     private final HashSet<Worker> workers = new HashSet<>(); //线程池中的所有工作线程
    private boolean addWorker(Runnable firstTask, boolean core) {
    //根据当前状态,判断是否添加成功,上方执行方法中的addWorker两个参数firstTask = null ,core = true /false 具体分析
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c); //获取运行状态
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && //状态 > shutDown 表示此时已经不再接受任务
                //shutdown状态不接受新任务,但可以执行已经加入队列中的任务,所以当进入shutdown状态,且传进来的任务为null时,并且任务队列不为null时,是允许添加新线程的,把这个条件取反就是不允许
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty())) 
                    return false;
    
                for (;;) { //使用CAS操作避免加锁
                    int wc = workerCountOf(c); //获取工作线程
                    if (wc >= CAPACITY || 
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false; //大于线程最大容量2的29次方量(所以newCacheExecutor并不能得到Integer.MAX_Value的),或者大于最大允许线程量则不能添加啦
                    if (compareAndIncrementWorkerCount(c)) //可添加就CAS操作线程数+1,成功说明可添加
                        break retry; //break跳出retry对应的循环,执行循环后面的添加worker逻辑
                    c = ctl.get();  // Re-read ctl 重新读取状态
                    if (runStateOf(c) != rs) 
                        continue retry; //状态改变了,跳到外层循环继续重新执行循环
                    // else CAS failed due to workerCount change; retry inner loop
                    //在内存层循环中不停的尝试CAS操作增加线程数
                }
            }
            //找了上方break retry可以正常使用CAS新增线程数
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask); //通过Worker包装runnable任务,稍后我们分析
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock(); //加锁
                    try {
                    
                        int rs = runStateOf(ctl.get());
                        //如果线程池状态rs < Shutdown即只能是Running
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) { //或者shutDown状态但是没有新任务
                            if (t.isAlive()) // 线程已经启动,并且当前没有任何异常的话,则是true,否则为false
                                throw new IllegalThreadStateException(); //我还没有启动呢
                            workers.add(w); //正常添加到线程池中workers工作线程
                            int s = workers.size();
                            if (s > largestPoolSize) //largestPoolSize:记录着线程池中出现过最大线程数量
                                largestPoolSize = s;
                            workerAdded = true; //可以正常工作的标记
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) { //如果正常工作,则开启线程任务
                        t.start();
                        workerStarted = true; //开始工作标记
                    }
                }
            } finally {
                if (! workerStarted) //该任务没有开始,则添加到失败
                    addWorkerFailed(w); 
            }
            return workerStarted;
        }
    
    1. 真正的执行工作交给了Worker(firstTask)类完成的
    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable //实现了Runnable接口,因此t.start()执行的就是worker的run方法啊
             {
         
            final Thread thread;
           
            Runnable firstTask;
            
            volatile long completedTasks;
    
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);  //创建thread(this:Worker) ,则t.start()调用worker的run,同时原来的Runnable被封装为Worker的属性firstTask
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
            
        //getThreadFactory即为ThreadPoolExecutor创建thread工厂(实现ThreadFactory)可修改Thread名称,优先级等操作实现的
        public ThreadFactory getThreadFactory() {
            return threadFactory;
        }
    
    • 创建线程交由线程池设定的ThreadFactory
    • 当线程执行thread.start()其实就是执行worker.run() 调用runWorker
    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask; //这个就是我们执行线程池executor.execute()方法时候的runnable
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
            //如果task不为null,并且从workQueue中获取任务不为null,则会一直执行下去
                while (task != null || (task = getTask()) != null) { //task是需要执行的任务,不一定是刚刚添加的那个了,这样其实worker线程并没有完成工作,自然也就不会销毁了
                    w.lock();
                   
                    if ((runStateAtLeast(ctl.get(), STOP) || //检查线程状态,若线程池处于中断状态,调用interrupt将线程中断
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt(); //中断线程
                    try {
                        beforeExecute(wt, task); //可以在任务真正执行之前做点啥,空实现
                        Throwable thrown = null;
                        try {
                            task.run(); //执行execute()方法中的run方法,在t.start()线程内,这只是一个方法执行哈!
                        } catch (Throwable x) {
                        } finally {
                            afterExecute(task, thrown); //线程之后可以做啥,空实现
                        }
                    } finally {
                        task = null;
                        w.completedTasks++; //该线程执行完成任务+1
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    • runWorker执行过程如下:
      1. while循环调取getTask()获取task,若不为null,则一直执行下去
      2. 检查线程是否被中断
      3. beforeExecute:可以在任务真正执行之前做点啥,空实现
      4. 执行task.run() 即执行execute()方法中的run方法,在t.start()线程内,这只是一个方法执行哦!
      5. afterExecute:任务执行之后可以做啥,空实现
    1. 重点逻辑是while循环,其是线程池中线程能够一直运行的原因,当我们第一次创建worker并执行任务后,并没有结束线程,而是通过while循环调用getTask()方法从阻塞队列中去task继续调用task.run()执行任务,注意这里run()只是一个普通的方法调用,并不是start()哦!运行线程就是Worker线程中
    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
                // 对应ShutDown虽然不添加任务,但是可以执行阻塞队列中的,Stop以后就不能子在执行任务了
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null; //返回null,停止执行任务
                }
    
                int wc = workerCountOf(c);
                // allowCoreThreadTimeOut 表示是否允许核心线程超时销毁,默认false不销毁.若设置成true,核心线程也会销毁的
                //只有正在工作的线程数大于核心线程数才会为true,佛足额返回false
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //
            
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                //如果timed为true(wx > 核心线程),通过poll取任务,如果为false,通过take取任务
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //这两个参数就是创建线程池中保存时间量
                        workQueue.take();
                    if (r != null) //如果有任务就退出死循环,返回任务交给上方的worker线程运行
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    
    • 通过以上代码分析:根据wc记录已运行线程数与核心线程数比较
      1. 若wc > 核心线程数,则通过poll()从队列中取任务
      2. 若wc <= 核心线程数,则通过take()取任务
    • 那么poll()与take()区别是什么呢?workQueue创建线程池时设置的阻塞队列,即实现BlockingQueue接口常用的ArrayBlockingQueue,LinkedBlockingQueue,PriorityQueue,SynchronizedQueue;
    1. 以ArrayBlockingQueue为例查看其poll和take方法
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0) { //是否队列中的元素个数为0,说明空队列
                    if (nanos <= 0L) //等待时间到了,队列中还未有数据加入,则返回null,
                        return null;
                    /**
                    * 调用该方法的前提是,当前线程已经成功获得与该条件对象绑定的重入锁,否* * 则调用该方法时会抛出IllegalMonitorStateException。
                    * nanosTimeout指定该方法等待信号的的最大时间(单位为纳秒)。若指定时间* * 内收到signal()或signalALL()则返回nanosTimeout减去已经等待的时间;
                    *若指定时间内有其它线程中断该线程,则抛出InterruptedException并清除当前线程的打断状态;
                    * 若指定时间内未收到通知,则返回0或负数。 
                    */
                    nanos = notEmpty.awaitNanos(nanos);  //每次signal唤醒重新等待
                }
                return dequeue(); //如果有元素取出
            } finally {
                lock.unlock();
            }
        }
    //如果poll超时返回null,则回调到
    f ((wc > maximumPoolSize || (timed && timedOut)) //true
                    && (wc > 1 || workQueue.isEmpty())) { //队列也是空的,走进去
                    if (compareAndDecrementWorkerCount(c)) //CAS可以减少c的个数
                        return null; //返回了null,该线程不能再上方的while循环中继续获取就结束线程啦,非核心线程就over啦,嘿嘿!
                    continue;
                }
    
    • 分析以上流程
      1. poll()中参数表示超时等待时间
      2. 若超过该时间阻塞队列依然为空,则返回null,退出while循环,线程执行结束,非核心线程被销毁
      3. 线程获取lock锁,执行awaitNanos最大等待nanos秒之内若收到signal()或signalALL()被唤醒,执行dequeue去阻塞队列中取任务执行
    1. take方法执行核心线程
    public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)  //不能使用if,避免虚假唤醒
                    notEmpty.await();  //一旦count队列为空,会一致await阻塞在这里的,直到workQueue.offer()添加元素时唤醒
                return dequeue(); //取出队头元素
            } finally {
                lock.unlock();
            }
    }
    
    • await不设置超时等待时间,notEmpty.await()一直阻塞,那这个阻塞又是何时被唤醒的呢?
    1. 当然是下一个任务达到的时候也就是调用execute的时候添加一个新的任务Task;
    //这个就是调用当前核心线程已经满了,则添加到阻塞队列中,
    //刚刚上方的核心线程在等待任务,添加以后肯定就调用notEmpty.signal()唤醒等待线程取任务执行啦
    if (isRunning(c) && workQueue.offer(command)) 
    
    • 我们来验证一下我们的想法:workQueue就是选择的队列,这里看ArrayBlockingQueue,当然对于其他队列也是相同的
     public boolean offer(E e) {
            checkNotNull(e);
            final ReentrantLock lock = this.lock; //获取锁,跟上方加锁时同一把锁
            lock.lock();
            try {
                if (count == items.length)
                    return false; //如果当前队列已满,不能再加入了false
                else {
                    enqueue(e); //正常添加到队列中
                    return true;
                }
            } finally {
                lock.unlock();
            }
        }
       
    //enqueue添加到数组循环队列中后调用notEmpty.signal()唤醒一个await线程取任务开始工作啦!
    private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal();
        }
    
    • 通过生成-消费者模式,将execute加入队列的任务通知等待的核心线程取阻塞队列中的任务开始执行!

    总结

    1. 线程池核心线程一直在运行,不会终止的原因是由于使用while循环轮训阻塞队列中是否存在任务,若没有CPU不会空转而是调用await()等待函数,当阻塞队列中添加任务时会被唤醒,去取任务继续执行while循环;
    2. 核心和非核心是由等待函数决定的,设置是否有等待超时时间,若超时后返回null退出while循环,线程执行结束被销毁;

    参考文档

    1. 深入理解线程池
    2. ThreadPoolExecutor线程池源码和典型问题

    相关文章

      网友评论

          本文标题:线程池任务执行分析

          本文链接:https://www.haomeiwen.com/subject/lqqjcdtx.html