美文网首页
AQS 源码阅读

AQS 源码阅读

作者: 柯基去哪了 | 来源:发表于2021-04-23 11:02 被阅读0次

    一 简介-API 文档阅读

    提供一个基于 FIFO 等待队列实现阻塞锁和相关同步器的基础框架。这个类被设计用于依靠一个原子类 int 值表示状态并成为大多数同步器的基础类。通过继承这个类并实现必要的方法来使用它。子类通过维护内部的 state 字段来获取和更新线程状态。这一系列方法包含 getState/setState 等。API 文档推荐我们在使用这个类的时候,将其作为目标类的一个内部类来实现,并且也不对外公开。

    这个类支持默认的独占(exclusive)模式或者共享(share)模式,或者同时支持这两种模式。在独占模式下获取到锁后,其他尝试获取锁的线程都会失败。在共享模式下多个线程尝试获取锁或许会成功。

    在共享模式下一个线程获取锁成功,后续存在的等待线程在获取锁的时候还是要重复判断一下是否可以获取锁。在不同的模式下等待的线程共享了同一个 FIFO 队列。 一般来说,这个框架接口的实现类只会实现一种模式(共享或者独占)。支持单一模式的类不需要实现另一个模式下需要实现的方法。

    AQS 内部包含一个实现了 Condition 接口的 ConditionObject 内部类。该类内部维护了一个 Condition 的 Node 队列。包含 Node-firstWaiter 与 Node-lastWaiter。一系列的内部私有方法用于对这个队列的维护和操作。

    private Node addConditionWaiter()
    

    添加一个 Node 节点到当前 Condition 队列的队尾。

    AQS 同步器是 java.util.concurrent 并发包下许多工具类的基础框架。包括 RetreensLock。

    public abstract class AbstractQueuedLongSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    

    AQS 类的编写用到了 模板方法设计模式,由抽象类定义同步器需要实现的方法,具体的锁或者基于 AQS 的同步器在继承了这个类之后要按要求实现指定的方法。

    使用方式

    如果想要以 AQS 类作为基类,需要重新定义如下的系列方法,这系列方法用户获取/设置 state 的值

    • tryAcquire
    • tryRelease
    • tryAcquireShared
    • tryReleaseShared
    • isHeldExclusively

    二 部分代码阅读

    2.1 acquire(int arg)

    acquire.png

    在独占模式下多个线程竞争同一个对象监视器的时候,首先的第一步就是去获取监视器,这里会调用到这个方法(比如 ReentrantLock)

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    先从整体的逻辑说一下这个方法及其内部调用的一系列方法的逻辑:

    多个线程进入,争用对象监视器。争用成功即持有锁,失败的会先加入内部维护的等待队列,这个队列是一个双向的 Node 链表。

    独占模式下获取对象监视器的方法,首先执行 tryAcquire(),如果获取失败就将当前线程加入队列中,加入队列之后再次尝试获取锁。

    在 AQS 中没有给出具体的 tryAcquire 方法实现逻辑,抽象类中的方法中仅仅以抛出异常来结束。所以这个方法的具体实现,要依赖使用 AQS 框架的人。这里仅仅是给出了方法范式。

        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    

    依赖于 AQS 框架的实现,就是在 tryAcquire 里面去获取/修改 AQS 内部维护的 state 状态值,基本上都是使用 cas 方式来更新锁标识字段。

    在直接调用 tryAcquire 方法失败后,就需要将当前线程加入 FIFO 的等待队列,这里调用了 addWaiter 方法

        private Node addWaiter(Node mode) {
            // 将当前线程包装构造为一个 node 节点
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure 先尝试快速入队,如果尝试失败,还是会执行 enq 方法将已经包装好的 node 节点入队
            // 获取尾节点引用并检查尾节点是否为 null,如果是则执行执行 enq 方法入队,如果不是则执行前面注释的 'fast path of enq'
            Node pred = tail;
            if (pred != null) {
                // 获取尾节点的前驱节点
                node.prev = pred;
                // 尝试使用 cas 方式将包装的节点以尾插法加入队列
                if (compareAndSetTail(pred, node)) {
                    // 入队成功,调整 sync 队列尾节点位置并返回已经包装好的 node 节点
                    pred.next = node;
                    return node;
                }
            }
            // 走到这里,则说明前面的快速入队策略没有成功,那么最终还是要执行 enq 方法入队
            enq(node);
            return node;
        }
    
    node 队列.png

    此处传入的 mode 参数是 独占模式,然后将当前线程构造成一个 node 节点。这里加入等待队列有两种策略,第一种当 tail 节点不为空的时候,直接尝试使用 CAS 方式将新节点加入到队尾;当 tail 为空,则调用了 enq 入队方法,总结一下,进入到 enq 方法的场景列表:

    • 尾节点为空,即整个 sync 队列还未初始化
    • cas 尾插法将节点加入队列失败
        private Node enq(final Node node) {
            // 死循环,即要求入队操作一定要成功
            for (;;) {
                // 获取尾节点
                Node t = tail;
                // 如果尾节点为空,则执行 sync 队列的初始化
                if (t == null) { // Must 
                    // 创建一个新的 node 节点作为初始化 sync 队列的头节点
                    if (compareAndSetHead(new Node()))
                        // 将 tail 与 head 指向这个头节点,然后又会进入下一轮 for 循环
                        tail = head;
                } else {
                    // 尾节点不为空,可能是进入方法的时候就不为空了,也可能是进入方法后第一轮循环初始化创建了一个节点,将要插入的 node 节点的前驱引用指向已经存在的尾节点
                    node.prev = t;
                    // cas 方式尾插法加入节点,注意如果 cas 方式执行失败了,会再次进入 for 循环然后尝试入队,直到最后尝试成功
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    这里是 CAS+自旋 的方式构建等待线程队列。这种构建方式因为使用了自旋,所以会有不确定时长的自旋时间,比起前面代码自己注释的 快速入队 要稍慢。不过这种方式也比挂起线程更快,同时也减少了上下文切换,节约了资源。

    需要注意的是,node 队列的头节点是不包含任何线程的,是一个虚拟节点。在队列初始化的时候,会由无参构造创建一个 node 节点作为头节点。当有后续节点成为头节点的时候(成为头节点,其内的线程就已经持有了锁,然后会被 AQS 清除持有线程的引用)。

    前面的逻辑已经将当前线程加入到 sync 队列中,现在检查线程的中断状态并再次尝试获取独占锁

        // 传入以当前节点为参数构造的 node 节点
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    // 获取 node 的前驱节点
                    final Node p = node.predecessor();
                    // 如果当前 node 的前驱节点是头节点并尝试获取(改变)状态
                    if (p == head && tryAcquire(arg)) {
                        // tryAcquire 成功,表明当前节点已经成功获取到锁,更新头节点关系
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // 当前节点的前驱节点不是头节点或者 tryAcquire 失败,挂起当前线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                // 方法最终退出前检查,这是异常状态下的进入,取消获取锁的相关操作
                if (failed)
                    cancelAcquire(node);
            }
        }
        
    

    入队后的线程节点进入方法 acquireQueued(Final Node node, int arg),在这个方法中依然存在一个死循环。

    获取当前节点的前置节点,检查当这个前置节点是不是头结点(头结点即意味着这个节点正持有锁),然后当前 Node 节点尝试获取对象监视器,如果获取成功,表示当前节点成功获取到对象监视器,将当前节点设置为新的头结点。同时置空前置节点的后继引用(方便 GC 回收)。

    方法 shouldParkAfterFailedAcquire 会根据情况阻塞线程,这里又要回顾一下之前的 status 字段各个值的含义,方法内判断的依据就是根据 status 字段。

    SIGNAL = -1:当前线程的后继线程需要被unpark(唤醒);
    
        /**
         * 判断前驱节点的 status 状态值是否是 SIGNAL,如果是则返回 true,否则返回false。
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus; // 前驱节点线程的状态标识字段值
            if (ws == Node.SIGNAL) // 当前线程的后继线程需要被unpark(唤醒),直接返回 true
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) { // 当前线程被取消(前驱节点所搭载的线程) 
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do { // 回溯前驱节点的前驱节点,直到找到一个状态值为 非取消 状态的节点
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node; // 找到一个 status 为 非取消 状态的节点后,将这个节点的后继节点指向当前的节点
            } else { // 其他情况就将传入的前驱节点状态置为 SIGNAL 状态
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    方法 parkAndCheckInterrupt 会阻塞当前线程,并且返回“线程被唤醒之后”的中断状态

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    注意, 这里使用的方法是

    interrupted()

    在调用这个方法的适合,会清除线程的标识位,这样在下一次唤醒该线程的时候才会生效。

    2.2 release(int arg)

        public final boolean release(int arg) {
            // 尝试释放锁
            if (tryRelease(arg)) {
                // 获取头节点
                Node h = head;
                // 头节点不为空且头节点的节点状态不为0(不为0表示这个节点不是初始化虚拟节点)
                if (h != null && h.waitStatus != 0)
                    // 修改节点 status 字段并唤醒等待线程
                    unparkSuccessor(h);
                return true;
            }
            // 释放锁失败,返回 false
            return false;
        }
    

    和 acquire 一样,release 方法会先尝试调用 tryRelease 方法来释放锁,但是这个方法的实现并不在 AQS 内,而是实现了 AQS 的类来填充这个方法。

        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            // 因为要释放当前节点,所以先更新节点 status 字段为初始化值 0
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            // 获取一个可用的后继节点(节点可用的条件是节点 status 字段不为 cancel 状态),注意这里是从尾节点从后向前尝试唤醒。
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    相关文章

      网友评论

          本文标题:AQS 源码阅读

          本文链接:https://www.haomeiwen.com/subject/sedfrltx.html