美文网首页
AQS 之 Condition-的源码分析

AQS 之 Condition-的源码分析

作者: 断风雨_2669 | 来源:发表于2019-03-06 22:40 被阅读0次

在使用 Lock 锁的过程中,我们往往会使用到另外一个对象 Condition ,用于等待/通知模式的处理。

Condition 的创建

    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

使用 Condition 的前提是获取锁

final ConditionObject newCondition() {
    return new ConditionObject();
}

从 newCondition 方法看出 Condition 对象实际上是 AQS 的内部类 ConditionObject ()。

成员变量

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

从内部定义的变量 firstWaiter, lastWaiter 看出, ConditionObject 对象内部维护了一个同样以 Node 为节点的等待队列。

await()

await 操作会使当前线程释放锁并进入等待模式。

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        // 当前线程中断 抛出中断异常
        throw new InterruptedException();
    // 将当前线程构造节点插入等待队列尾部
    Node node = addConditionWaiter();
    // 当前线程释放锁,唤醒同步队列 head 的后置节点
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 节点添加到同步队列后 退出循环
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 应该是在其他线程释放锁后被唤醒
        // 检查当前线程是否中断,若未中断则返回 0
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // node 进入自旋过程尝试获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 移除等待队列中状态非 CONDITION 的节点
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 将当前线程构造节点并设置状态为 CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        // 等待队列为空的时候将 firstWaiter 指向 node
        firstWaiter = node;
    else
        // 等待队列非空时将 lastWaiter 尾节点的 nextWaiter 指向 node
        t.nextWaiter = node;
    // 移动尾节点
    lastWaiter = node;
    return node;
}
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        // 当前线程释放锁,并唤醒同步队列中 head 的后置节点
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
// 判断节点是否在同步队列上
final boolean isOnSyncQueue(Node node) {
    // 节点状态为 CONDITION 或 节点的前置为空 说明节点还在等待队列上
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    // 如果节点存在后置节点 next 则说明节点在同步队列上
    if (node.next != null) // If has successor, it must be on queue
        return true;
    /*
     * node.prev can be non-null, but not yet on queue because
     * the CAS to place it on queue can fail. So we have to
     * traverse from tail to make sure it actually made it.  It
     * will always be near the tail in calls to this method, and
     * unless the CAS failed (which is unlikely), it will be
     * there, so we hardly ever traverse much.
     */
    // 从 tail 尾节点开始遍历同步队列查找 node 节点;若存在返回 true,反之返回 false
    return findNodeFromTail(node);
}

await 操作流程如下 :

  • 将当前线程构造一个新的 node 节点,状态为 CONDITION 添加到等待队列尾部
  • 释放锁,唤醒同步队列 head 的后置节点
  • 判断当前 node 节点是否在同步队列中,若不在同步队列上则挂起当前线程,等待其他线程释放锁时被唤醒
  • 节点 node 被唤醒后若在同步队列上,则进入自旋过程再次尝试获取锁

signal()

signal 操作激活等待队列中节点

public final void signal() {
    // 判断当前线程是否为锁的持有者
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        // 判断 first 的后置节点是否为空,为空说明等待队列为空
        if ( (firstWaiter = first.nextWaiter) == null)
            // 等待队列的尾节点置为空
            lastWaiter = null;
        // 将 first 的后置节点置为空,也即是将 first 节点从等待队列中移除
        first.nextWaiter = null;

        // 执行信号转移
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
// 将节点从等待队列 (condition queue) 转移到 同步队列 (sync queue)
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    // 将节点状态设置为 0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    // 将节点添加到同步队列(sync queue)尾部, 此时 p 应该是 node 的前置节点 ws 为 0
    Node p = enq(node);
    // 
    int ws = p.waitStatus;
    // 将 node 的前置节点状态改为 SIGNAL; 便于节点 p 释放锁的时候唤醒 node
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
    }

signal 操作的流程如下:

  • 将等待队列中的节点从队列中移除
  • 将等待队列中的节点状态由 CONDITION 改为 0
  • 将等待队列中的节点添加到 AQS 的同步队列尾部

signal 的作用 只是将节点从等待队列转移到同步队列中,只有当前线程释放锁后,转移到同步队列的节点才会有机会获取到锁。

如下图所示为 Condition 操作节点的转移过程:

image

小结

从 Condition 的 await()、signal() 操作可以看出,其作用等效于 Object 对象的 await(), notify() 方法;

相关文章

网友评论

      本文标题:AQS 之 Condition-的源码分析

      本文链接:https://www.haomeiwen.com/subject/frmfpqtx.html