美文网首页
ThreadPoolExecutor$Worker 为何要继承

ThreadPoolExecutor$Worker 为何要继承

作者: CHMAX | 来源:发表于2021-03-16 20:16 被阅读0次

其实标题问题的答案就在该类的注释之中,如下:

/**
 * Class Worker mainly maintains interrupt control state for
 * threads running tasks, along with other minor bookkeeping.
 * This class opportunistically extends AbstractQueuedSynchronizer
 * to simplify acquiring and releasing a lock surrounding each
 * task execution.  This protects against interrupts that are
 * intended to wake up a worker thread waiting for a task from
 * instead interrupting a task being run.  We implement a simple
 * non-reentrant mutual exclusion lock rather than use
 * ReentrantLock because we do not want worker tasks to be able to
 * reacquire the lock when they invoke pool control methods like
 * setCorePoolSize.  Additionally, to suppress interrupts until
 * the thread actually starts running tasks, we initialize lock
 * state to a negative value, and clear it upon start (in
 * runWorker).
 */
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

为了方便阅读,翻译如下:

Worker 类主要是维护线程运行任务时的中断控制状态,以及次要的信息记录。② 该类通过继承 AbstractQueuedSynchronizer 来简化每个任务执行周围锁的获取和释放。③ 这可以保障中断仅会唤醒等待任务的工作线程,而非中断正在运行的任务。④ 我们实现了一个简单的非可重入互斥锁,而不是使用 ReentrantLock,因为我们不希望工作线程在调用诸如 setCorePoolSize 之类的线程池控制方法时能够重新获取该锁。⑤ 此外,为了抑制线程真正开始运行之前的中断,我们将锁的状态值初始化为负值,并在启动时将其清除(runWorker 方法中)。

总的来说,依旧不是很好理解,所以我们一句一句来看。

① Worker 类主要是维护线程运行任务时的中断控制状态,以及次要的信息记录。

这里的中断控制状态,是指其继承类 AbstractQueuedSynchronizer 中的 state 字段,该字段即用作内部加锁,又用做中断控制。

信息记录则是指任务完成数计数器 completedTasks 字段。

② 该类通过继承 AbstractQueuedSynchronizer 来简化每个任务执行周围锁的获取和释放。

这句比较好理解,在覆写父类的 tryAcquiretryRelease 方法基础上,添加自定义的 lockunlock 等方法来提供锁的获取和释放,具体请见源码。

③ 这可以保障中断仅会唤醒等待任务的工作线程,而非中断正在运行的任务。

这里我们需要看一下 interruptIdleWorkers 方法,该方法的作用就是中断等待任务的工作线程,代码如下:

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (ThreadPoolExecutor.Worker w : workers) {
            Thread t = w.thread;
            // 加锁并中断
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

需要关心的是加锁逻辑 if (!t.isInterrupted() && w.tryLock())tryLock 方法实际调用的覆写的 tryAcquire 方法,该方法则尝试将 state 值从 0 修改为 1,由于 state 值只有在等待任务时才为 0,而线程运行时该值为 1,所以只有等待任务的线程才能加锁成功,并执行之后的中断操作。

④ 我们实现了一个简单的非可重入互斥锁,而不是使用 ReentrantLock,因为我们不希望工作线程在调用诸如 setCorePoolSize 之类的线程池控制方法时能够重新获取该锁。

前半句很好理解,就是需要一个非可重入的互斥锁,而 ReentrantLock 是可重入的互斥锁,所以不用。后半句则是说防止 setCorePoolSize 方法再次获取该锁,我们先看该方法的代码:

public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // 省略...
    }
}

可以看到该方法会调用上面说到的 interruptIdleWorkers 方法,interruptIdleWorkers 方法会遍历所有的工作线程,尝试加锁成功后执行中断操作,如果执行 setCorePoolSize 方法的是线程池中某个工作线程,在锁可重入的情况下,该工作线程会在之后的 w.tryLock 成功获取锁,并中断自己。为了避免这种情况,就需要有一个非可重入的互斥锁,继承 AbstractQueuedSynchronizer 之后,可以简单快速的实现。

⑤ 此外,为了抑制线程真正开始运行之前的中断,我们将锁的状态值初始化为负值,并在启动时将其清除(runWorker 方法中)

工作线程初始化如下,会将状态字段 state 的值设置为 -1,这样上面的 w.tryLock 就无法获取锁,也就不能执行中断。

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    // 省略...
}

清除该状态是在 runWorker 方法中,看代码:

final void runWorker(ThreadPoolExecutor.Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 省略...
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

可以看到方法一开始就会执行 w.unlock 方法,该方法会将 state 值设置为 0,也就允许被中断了。

总结

基于上面的分析可知,继承 AbstractQueuedSynchronizer 主要作用如下:

  • 需要状态值控制中断
  • 支持非可重入的互斥锁
参考

相关文章

网友评论

      本文标题:ThreadPoolExecutor$Worker 为何要继承

      本文链接:https://www.haomeiwen.com/subject/gmjdcltx.html