美文网首页
ThreadPoolExecutor$Worker 为何要继承

ThreadPoolExecutor$Worker 为何要继承

作者: CHMAX | 来源:发表于2021-03-16 20:16 被阅读0次

    其实标题问题的答案就在该类的注释之中,如下:

    /**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     */
    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
    
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
    
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
    
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
    
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
    
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    

    为了方便阅读,翻译如下:

    Worker 类主要是维护线程运行任务时的中断控制状态,以及次要的信息记录。② 该类通过继承 AbstractQueuedSynchronizer 来简化每个任务执行周围锁的获取和释放。③ 这可以保障中断仅会唤醒等待任务的工作线程,而非中断正在运行的任务。④ 我们实现了一个简单的非可重入互斥锁,而不是使用 ReentrantLock,因为我们不希望工作线程在调用诸如 setCorePoolSize 之类的线程池控制方法时能够重新获取该锁。⑤ 此外,为了抑制线程真正开始运行之前的中断,我们将锁的状态值初始化为负值,并在启动时将其清除(runWorker 方法中)。

    总的来说,依旧不是很好理解,所以我们一句一句来看。

    ① Worker 类主要是维护线程运行任务时的中断控制状态,以及次要的信息记录。

    这里的中断控制状态,是指其继承类 AbstractQueuedSynchronizer 中的 state 字段,该字段即用作内部加锁,又用做中断控制。

    信息记录则是指任务完成数计数器 completedTasks 字段。

    ② 该类通过继承 AbstractQueuedSynchronizer 来简化每个任务执行周围锁的获取和释放。

    这句比较好理解,在覆写父类的 tryAcquiretryRelease 方法基础上,添加自定义的 lockunlock 等方法来提供锁的获取和释放,具体请见源码。

    ③ 这可以保障中断仅会唤醒等待任务的工作线程,而非中断正在运行的任务。

    这里我们需要看一下 interruptIdleWorkers 方法,该方法的作用就是中断等待任务的工作线程,代码如下:

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (ThreadPoolExecutor.Worker w : workers) {
                Thread t = w.thread;
                // 加锁并中断
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    

    需要关心的是加锁逻辑 if (!t.isInterrupted() && w.tryLock())tryLock 方法实际调用的覆写的 tryAcquire 方法,该方法则尝试将 state 值从 0 修改为 1,由于 state 值只有在等待任务时才为 0,而线程运行时该值为 1,所以只有等待任务的线程才能加锁成功,并执行之后的中断操作。

    ④ 我们实现了一个简单的非可重入互斥锁,而不是使用 ReentrantLock,因为我们不希望工作线程在调用诸如 setCorePoolSize 之类的线程池控制方法时能够重新获取该锁。

    前半句很好理解,就是需要一个非可重入的互斥锁,而 ReentrantLock 是可重入的互斥锁,所以不用。后半句则是说防止 setCorePoolSize 方法再次获取该锁,我们先看该方法的代码:

    public void setCorePoolSize(int corePoolSize) {
        if (corePoolSize < 0)
            throw new IllegalArgumentException();
        int delta = corePoolSize - this.corePoolSize;
        this.corePoolSize = corePoolSize;
        if (workerCountOf(ctl.get()) > corePoolSize)
            interruptIdleWorkers();
        else if (delta > 0) {
            // 省略...
        }
    }
    

    可以看到该方法会调用上面说到的 interruptIdleWorkers 方法,interruptIdleWorkers 方法会遍历所有的工作线程,尝试加锁成功后执行中断操作,如果执行 setCorePoolSize 方法的是线程池中某个工作线程,在锁可重入的情况下,该工作线程会在之后的 w.tryLock 成功获取锁,并中断自己。为了避免这种情况,就需要有一个非可重入的互斥锁,继承 AbstractQueuedSynchronizer 之后,可以简单快速的实现。

    ⑤ 此外,为了抑制线程真正开始运行之前的中断,我们将锁的状态值初始化为负值,并在启动时将其清除(runWorker 方法中)

    工作线程初始化如下,会将状态字段 state 的值设置为 -1,这样上面的 w.tryLock 就无法获取锁,也就不能执行中断。

    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        // 省略...
    }
    

    清除该状态是在 runWorker 方法中,看代码:

    final void runWorker(ThreadPoolExecutor.Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 省略...
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    可以看到方法一开始就会执行 w.unlock 方法,该方法会将 state 值设置为 0,也就允许被中断了。

    总结

    基于上面的分析可知,继承 AbstractQueuedSynchronizer 主要作用如下:

    • 需要状态值控制中断
    • 支持非可重入的互斥锁
    参考

    相关文章

      网友评论

          本文标题:ThreadPoolExecutor$Worker 为何要继承

          本文链接:https://www.haomeiwen.com/subject/gmjdcltx.html