美文网首页
AQS源码解析

AQS源码解析

作者: StevenChu1125 | 来源:发表于2020-12-04 20:07 被阅读0次

等待队列中线程出队列时机

// java.util.concurrent.locks.AbstractQueuedSynchronizer

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

总的来说,一个线程获取锁失败了,被放入等待队列,acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。

下面我们从“何时出队列?”和“如何出队列?”两个方向来分析一下acquireQueued源码:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
    // 标记是否成功拿到资源
    boolean failed = true;
    try {
        // 标记等待过程中是否中断
        boolean interrupted = false;
        // 开始自旋,要么获取锁,要么中断
        for (;;) {
            // 获取当前节点的前驱节点
            final Node p = node.predecessor();
            // 如果p是头节点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头节点是虚节点)
            if (p == head && tryAcquire(arg)) {
                // 获取锁成功,头指针移动到当前node
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者p不为头节点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

注:setHead方法是把当前节点置为虚节点,但并没有修改waitStatus,因为它是一直需要用的数据。

// java.util.concurrent.locks.AbstractQueuedSynchronizer

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

// java.util.concurrent.locks.AbstractQueuedSynchronizer

// 靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 获取头节点的节点状态
    int ws = pred.waitStatus;
    // 说明头节点处于唤醒状态
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    // 通过枚举值我们知道waitStatus>0是取消状态
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            // 循环向前查找取消节点,把取消节点从队列中删除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
         // 设置前驱节点等待状态为SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

从队列中释放节点的疑虑打消了,那么又有新问题了:

  • shouldParkAfterFailedAcquire中取消节点是怎么生成的呢?什么时候会把一个节点的waitStatus设置为-1?
  • 是在什么时间释放节点通知到被挂起的线程呢?

CANCELLED状态节点生成

acquireQueued方法的finally代码:

// java.util.concurrent.locks.AbstractQueuedSynchronizer

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        ...
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                ...
                failed = false;
                ...
            }
            ....
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

通过cancelAcquire方法, 将Node的状态标记为CANCELLED。

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    // 忽略无效节点
    if (node == null)
        return;
    // 设置节点不关联任何线程
    node.thread = null;
    // Skip cancelled predecessors
    // 跳过取消状态的节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    // 获取过滤后的前驱节点的后继节点
    Node predNext = pred.next;
    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    // 设置节点状态为CANCELLED
    node.waitStatus = Node.CANCELLED;
    // If we are the tail, remove ourselves.
    // 如果当前节点是尾节点,将从后往前的第一个非取消状态的节点设置为尾节点
    // 更新失败的话,则进入else,如果更新成功,将tail的后继节点设置为null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        // 如果当前节点不是head的后继节点,1:判断前驱节点状态是否为SIGNAL,2:如果不是,则把前驱节点设置为SIGNAL看是否成功
        // 如果1和2中有一个为true,再判断前驱节点的线程是否为null
        // 如果上述条件都满足,把当前节点的前驱节点的后继节点设置为当前节点的后继节点
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            // 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

当前的流程:

  • 获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED。

  • 根据当前节点的位置,考虑以下三种情况:

(1) 当前节点是尾节点。

(2) 当前节点是Head的后继节点。

(3) 当前节点不是Head的后继节点,也不是尾节点。

参考

  1. https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

相关文章

网友评论

      本文标题:AQS源码解析

      本文链接:https://www.haomeiwen.com/subject/fkgiwktx.html