美文网首页
线程池Woker类讲解

线程池Woker类讲解

作者: 九点半的马拉 | 来源:发表于2020-05-09 19:31 被阅读0次

    通常我们利用线程池执行任务时,一般会调用execute()方法来进行任务的提交。

    然后如果线程池不是将任务放在工作队列里面,而是要创建核心线程或者非核心线程时,会进一步调用addWorker()方法来创建线程,处理我们的任务。

    然后需要注意的是,在往工作队列中存放任务时,如果发现当前线程时的线程数量为0,则会创建一个非核心线程。这时的addWorker(null, false);与一般的不同,它不放入你要提交的任务,因为你的任务在之前已经被放入到队列中。

    Worker在线程池中是一个较重要的类。

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
    

    从上面可以看出,它实现了AQS和Runable的相关特性。

    三个重要的变量:

    // 对应的线程
     final Thread thread;
    // 在提交execute时的那个任务
    Runnable firstTask;
    // 该线程之前已处理的任务数
    volatile long completedTasks;
    

    在这里给出官方文档对Worker的解释。

    Class Worker主要维护线程运行任务的中断控制状态。此类适时地扩展了AbstractQueuedSynchronizer以简化获取和释放围绕每个任务执行的锁。这可以防止旨在唤醒工作线程等待任务的中断,而不是中断正在运行的任务。我们实现了一个简单的非可重入互斥锁,而不是使用ReentrantLock,因为我们不希望辅助任务在调用诸如setCorePoolSize之类的池控制方法时能够重新获取该锁。另外,为了抑制中断直到线程真正开始运行任务,我们将锁定状态初始化为负值,并在启动时将其清除(在runWorker中)。

    生成Worker并加锁存放到HashSet类型下的workers下后,会启动该Woker对用的Thread的start方法,然后执行run()方法。

    public void run() {
           runWorker(this);
     }
    

    RunWorker()方法中,实际是一个while循环,会持续从队列中获得任务,直到队列为空。

    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
    

    如果线程池大于等于Stop状态,并且当前线程未被中断,那么执行中断操作。
    如果当前线程已经中断了,因为调用了Thread.interrupted方法,中断标志会被清空,然并且当前线程大于等于Stop状态,这时再执行中断操作。

    在这里再详细描述下在RunWoker方法上的官方文档介绍。

    RunWorker 反复从队列中获取任务并执行任务,同时应对许多问题:

    1) 我们可以从最初的任务开始,在这种情况下,我们不需要获取第一个任务。否则,只要池正在运行,我们就会从getTaskd到工作队列中获取任务。如果返回null,则工作器Worker由于池状态或配置参数更改而退出。其他退出是由于外部代码中的异常引发而导致的,在这种情况下completedAbruptly成立,这通常导致processWorkerExit替换此线程。

    2) 在运行任何任务之前,先获取锁,以防止任务执行时其他池中断,然后确保除非池正在停止,否则此线程不会设置其中断。

    3)每个任务运行之前都会调用beforeExecute,这可能会引发异常,在这种情况下,如果不处理任务,我们将导致线程死亡(带有completelyAbruptly true的中断循环)。

    4)假设beforeExecute正常完成,我们运行任务,收集任何引发的异常以发送给afterExecute。我们分别处理RuntimeException,Error(两者规范保证我们可以捕获)和任意Throwables。因为我们无法在Runnable.run中抛出Throwables,所以我们将它们包装在错误的出路(到线程的
    UncaughtExceptionHandler)。 任何抛出的异常也会保守地导致线程死亡。

    5) task.run完成后,我们调用afterExecute,这可能还会引发异常,这也会导致线程死亡。 根据JLS Sec 14.20,此异常是即使task.run抛出也会生效的异常。

    异常机制的最终结果是afterExecute和线程的UncaughtExceptionHandler具有关于用户代码遇到的任何问题的尽可能准确的信息。

    为什么Worker类要继承AQS呢,其实要用锁的状态来区分空闲线程和非空闲线程,在执行runWorker方法中:

    • 获取任务时没有加锁(空闲状态,可中断线程)
    • 要执行任务时才加锁(不允许中断线程)

    有关工作队列阻塞的问题。

    ThreadPoolExecutor有一个变量allowCoreThreadTimeOut,默认是false,设置核心线程是否允许超时。

    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    

    从上面可以看出当线程池的数量超过核心数时,timed为true。

    然后在获取任务的时候getTask()时,有这一步:

    try {
         Runnable r = timed ?
                  workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                  workQueue.take();
         if (r != null)
             return r;
         timedOut = true;
      } catch (InterruptedException retry) {
                    timedOut = false;
    }
    

    当所处线程是非核心线程时,会执行workQueue.poll()方法,在规定时间内获取,没有了就返回null。

    如果当前线程是核心线程,并且没有开始超时机制(默认是未开启),它会调用workQueue.take()方法,当队列为空时会一直阻塞。

    在线程正常退出或者发生异常时,会执行processWorkerExit方法。

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
            //  如果不是获取不到任务而正常退出的,在这里将线程数减1,正常退出的在getTask()方法有这个减1操作
            if (completedAbruptly) 
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
               // 将线程执行的任务数统一加到线程池维护的completedTaskCount字段
                completedTaskCount += w.completedTasks;
                // 在保存的数组中移除该worker
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
            // 尝试将线程池设置为结束状态
            tryTerminate();
    
            int c = ctl.get();
           // 当线程池状态小于stop(运行或关闭)才继续
            if (runStateLessThan(c, STOP)) {
                // 正常退出的情况
                if (!completedAbruptly) {
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                addWorker(null, false);
            }
        }
    

    从上面可以看出,如果是异常退出,会在创建一个线程;
    如果是正常退出的,如果设置了核心线程数可以超时退出,那么min为0,否则min是最大核心线程数; 如果min为0,并且队列不为空,那么min = 1, 如果当前线程数大于等于min,那么不需要再创建一个线程,如果小于的话,再创建一个线程。

    shutdown时会中断掉空闲线程,在shutdownNow时会中断所有线程。

    中断空闲线程:

    private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    // 从这里可以看出,当前线程没有中断,并且所对应的woker能够被加锁
                    // 因为是非重入独占锁,如果有运行的话,这时候是不能加上锁的
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
    

    中断所有线程:

    private void interruptWorkers() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    w.interruptIfStarted();
            } finally {
                mainLock.unlock();
            }
        }
    
    void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
    

    相关文章

      网友评论

          本文标题:线程池Woker类讲解

          本文链接:https://www.haomeiwen.com/subject/fmslnhtx.html