美文网首页
AbstractQueuedSynchronizer 详细解读

AbstractQueuedSynchronizer 详细解读

作者: coding400 | 来源:发表于2019-11-28 15:23 被阅读0次

    概述

    AQS( AbstractQueuedSynchronizer ) 是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。如: ReentrantLock 和 Semaphore都是基于AQS构建的,还包括CountDownLatch、ReentrantReadWriteLock、SynchronousQueue和FutureTask。

    AQS解决了在实现同步器时涉及的大量细节问题,例如等待线程采用FIFO队列操作顺序。在不同的同步器中还可以定义一些灵活的标准来判断某个线程是应该通过还是需要等待。本文通过介绍AQS相关API以及juc包下相关实现类对其进行介绍

    原理

    使用volatile的整形变量state用来表示当前是否可以获取锁,如在某些非共享锁里,state=1 则代表当前锁已经被占有,此时如果有线程来请求锁时且当前独占模式的锁已经被其他线程占有,则会进入AQS里维持的CLH队列(FIFO)里排队,HEAD 位置的线程为正在占用锁的线程,当它释放锁时,会唤醒Head的next 节点的线程,next节点使用自旋的方式不断的尝试获取锁。 自定义的同步器在实现时,只需要定义state变量获取(tryAcquire)与释放(tryRelease)的规则,其他细节在AQS里已经实现。

    获取 state 变量.png

    独占模式

        /**
         * Acquires in exclusive mode, ignoring interrupts.  Implemented
         * by invoking at least once {@link #tryAcquire},
         * returning on success.  Otherwise the thread is queued, possibly
         * repeatedly blocking and unblocking, invoking {@link
         * #tryAcquire} until success.  This method can be used
         * to implement method {@link Lock#lock}.
         *
         * @param arg the acquire argument.  This value is conveyed to
         *        {@link #tryAcquire} but is otherwise uninterpreted and
         *        can represent anything you like.
         */
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    

    根据 java doc的注释:以独占模式获取,忽略中断,通过最少调用一次 tryAcquire 来返回成功,否则将该线程放入队列,可能一直阻塞或非阻塞,直到调用 tryAcquire 返回成功,该方法可以用来实现锁。
    可以看出 AbstractQueuedSynchronizer 中的 acquire 是独占模式需要调用的方法,该方法又依次调用了 3个方法:
    tryAcquire(arg)

        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    

    默认实现抛出一个 UnsupportedOperationException 异常 ,将该方法的具体实现交给子类,那么这里的入参 int arg 便是与 AbstractQueuedSynchronizer 中的 state 变量来比较确认当前线程是否可以获取锁,一般情况下,如果是独占锁,获取锁时通过CAS 将 state 置为 1 ,释放锁时 通过 CAS 将 state 置为 0

    addWaiter(Node.EXCLUSIVE)

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

    该方法用于将当前线程的状态封装到 Node 节点内,然后通过尾插法将 node 节点放入到链表末尾

    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

       final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                //判断当前节点是否被通知中断
                boolean interrupted = false;
                //自旋方式,不断尝试获取资源
                for (;;) {
                    final Node p = node.predecessor();
                    //如果上一个节点是Head节点,则尝试获取资源
                    if (p == head && tryAcquire(arg)) {
                        //获取资源成功,则将当前节点设为Head节点
                        setHead(node);
                        //当前节点获取资源成功,那么之前的Head节点需要释放,因此将之前Head相关引用设为NULL,帮助GC回收
                        p.next = null; // help GC
                        failed = false;
                        //返回中断过程是否中断过
                        return interrupted;
                    }
                    //递归判断当前节点的上一个节点的waitStatus是否有效
                    //park 当前Node节点的线程(wait当前线程),检查是否中断
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    自旋一次,尝试获取锁。可以看到 node.predecessor(); 返回的是当前node的父节点,如果父节点是Head 节点,那么当前 node 节点就会再次执行 tryAcquire(arg) 方法,若返回 success 则获取锁,失败则 park 当前线程进入等待唤醒的状态,同时检查当前线程是否中断,如果被设置了中断的标记,则 将interrupted 设置为true,在获取资源成功的时候,去响应中断

    shouldParkAfterFailedAcquire(Node pred, Node node)

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            //判断前一个节点的waitStatus 是否小于0,如果大于0,则该节点已经被取消
            int ws = pred.waitStatus;
            //如果为等待通知,则表示等待unpark(等待唤醒)
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            //如果上一个节点被取消掉(大于0)则跳过,循环的找上一节点为未取消的(小于0)
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                //如果当前节点的waitStatus 小于0,但不等于SIGNAL,则设置为SIGNAL
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    shouldParkAfterFailedAcquire(Node pred, Node node) 此方法主要用于检查waitStatus的状态,看看自己是否真的可以去休息了(进入waiting状态),万一队列前边的线程都放弃了,则丢弃引用,让GC回收。

    Node节点中维持着waitStatus这个字段代表的含义如下:

    /** waitStatus value to indicate thread has cancelled */
        //值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        //值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,
        //就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        //值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()
        // 方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        //值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
        static final int PROPAGATE = -3;
        //0  代表为初始状态
    
    

    parkAndCheckInterrupt()

    private final boolean parkAndCheckInterrupt() {
            //调用park()使线程进入waiting状态
            LockSupport.park(this);
            //如果被唤醒,查看自己是不是被中断的
            return Thread.interrupted();
        }
    

    parkAndCheckInterrupt() 让线程去休息,真正进入等待状态。park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。

    独占模式的大致流程知道了后,用流程图来总结下(贴上源码)

        public final void acquire(int arg) {
             if (!tryAcquire(arg) &&
                 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                 selfInterrupt();
         }
    
    
    1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
      没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
    2. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被
      unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
    3. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
    独占流程.png

    释放锁

    release(int arg)

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    

    release(int arg) 独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。

    tryRelease

        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
    
    

    tryRelease(int arg) 需要子类实现的获取资源成功的方法,用作父类根据子类释放是否成功作为判断依据

    unparkSuccessor(Node node)

    private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            if (ws < 0) //设置当前线程的waitStatus为初始状态 0
                compareAndSetWaitStatus(node, ws, 0);
            Node s = node.next; //找到head节点的下一个需要唤醒的节点,如果下一个节点被取消了(>0),则从tail往前找
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            //unpart 唤醒线程
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    unparkSuccessor(Node node) 方法用来将 Head(当前持有锁的 Node)节点的 waitStatus 的值重置为 0,然后找到 Head 的下一个未取消 (cancel)的节点找出来并唤醒(因此唤醒的节点便可以继续在 acquireQueued 中自旋获取锁)

    疑问

    在 unparkSuccessor(Node node) 中

            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
    

    如果发现 s 为空,则从 tail 尾向 prev 前遍历整个链表,找到未取消(waitStatus < 0)的节点,那么为什么这里不直接从s 的 next 节点往后找呢?
    个人猜测这里可能是因为计算机的局部性原理,如果head 的 next节点取消了,那么next 节点的 next 节点等也又可能取消了,因此从最新的尾部向前找似乎更合理。

    但是这里我又有一个新的问题!

    在入队 调用 addWaiter(Node mode)方法的时候,因为本身是尾插法,此时便检查 prev.waitStatus < 0 == true,如果为 false 则往 prev 前继续找有效的尾部节点,而不是在释放锁的地方去判断?

    感谢阅读,您的点赞加关注是对我最大的支持

    相关文章

      网友评论

          本文标题:AbstractQueuedSynchronizer 详细解读

          本文链接:https://www.haomeiwen.com/subject/clnvwctx.html