美文网首页
终于搞懂了JUC中的AQS😼

终于搞懂了JUC中的AQS😼

作者: 我有一只喵喵 | 来源:发表于2021-01-04 22:37 被阅读0次

    AQS提供了一个框架来实现阻塞锁和依赖于先进先出(FIFO)等待队列的相关同步器(信号量、事件等),是实现 ReentrantLock、CountDownLatch、Semaphore、FutureTask 等类的基础核心,对应于java.util.concurrent.locks.AbstractQueuedSynchronizer。

    AQS同时支持独占锁共享锁。通常,实现AbstractQueuedSynchronizer的子类只支持其中一种模式,但这两种模式都可以发挥作用,例如在ReadWriteLock中。只支持排他模式或只支持共享模式的子类不需要定义支持未使用模式的方法。

    一、😸AQS基础之 CLH锁

    CLH锁也是一种基于链表的可扩展、高性能、公平(提供先来先服务)的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。由于是 Craig、Landin 和 Hagersten三位大佬的发明,因此命名为CLH锁。

    1.1 数据结构:

    public class CLHLock {
        private AtomicReference<CLHNode> tailNode = new AtomicReference<>();
        private ThreadLocal<CLHNode> currentThreadNode = new ThreadLocal<>();
    
        static final class CLHNode {
            private volatile boolean locked = true;
    
            public boolean isLocked() {
                return locked;
            }
    
            public void setLocked(boolean locked) {
                this.locked = locked;
            }
        }
    }
    

    locked:volatile 修饰的boolean变量表示加锁状态,true代表持有锁成功或正在等待加锁,false表示锁被释放。volatile修饰为了保证此变量对不同的线程可见。
    tailNode:尾节点
    currentThreadNode:当前节点

    1.2 核心思想:

    1.2.1 加锁过程

    获取尾节点,如果尾节点是null,则表示当前线程是第一个过来抢锁的,可以直接加锁成功;如果不为空,则将当前节点设置为尾节点,并对当前节点的前驱结点的locked进行自旋,如果发现其前驱结点的locked字段变为了false,则给当前节点加锁成功。

        public void lock() {
            // 首先对当前线程节点进行初始化
            CLHNode currentNode = currentThreadNode.get();
            if (currentNode == null){
                currentNode = new CLHNode();
                // 设置状态,标识当前节点正在加锁
                currentNode.setLocked(true);
                currentThreadNode.set(currentNode);
            }
    
            /**
             * 先判断当前尾节点是否已经有其他线程节点:
             * 若有,则将当前节点加入的尾节点之后,成为新的尾节点,并自旋前驱节点的加锁状态
             * 若无,则当前节点之前不存在其他线程竞争锁,直接获取成功
             */
    
            CLHNode preNode = tailNode.getAndSet(currentNode);
            if (preNode == null){
                // 对应与第二种情况,直接加锁成功
                return;
            }
            // 能走到这里,说明前驱节点不为空,需要自旋等待前驱节点的线程释放锁以后,再进行加锁
            while (preNode.locked){
    
            }
    
            // 走到这里,说明前驱节点释放了锁,则加锁成功
        }
    

    1.2.2 释放锁过程

    释放锁的过程主要是将当前节点locked标志位置为false的过程。也分情况,如果当前释放锁的线程节点是尾节点,则说明没有其他线程在等待队列中,直接将尾节点设置为null即可,否则需要将当前节点的locked标志位设置为false,来通知等待队列中线程锁已被释放。

        public void unlock() {
            // 获取当前线程节点
            CLHNode currentNode = currentThreadNode.get();
            if (currentNode == null || currentNode.locked == false){
                // 此时可以不做任何操作,或抛出不合法异常,因为走到这里,说明当前线程根本就没有持有锁,何来的释放呢?
                return;
            }
            // CAS尝试将尾部节点设置为null
            if (tailNode.compareAndSet(currentNode,null)){
                // 成功,说明当前线程节点是尾节点,阻塞队列中没有其他线程在竞争锁,将尾部节点设置为null,即可释放锁
            }else{
                // 失败,说明当前线程节点不是尾节点,有其他线程正在自旋当前线程的locked变量
                currentNode.setLocked(false);
            }
        }
    

    二、😽AQS实现原理

    2.1 AQS核心结构及解释

    2.1.1 双向队列节点

    对应实现为java.util.concurrent.locks.AbstractQueuedSynchronizer.Node

    Node

    2.1.2 核心属性

    1)state

    private volatile int state;
    

    对应于java.util.concurrent.locks.AbstractQueuedSynchronizer#state。表示当前锁的状态,在不同的功能实现中代表不同的含义。比如在独占并且不可重入的锁实现中:0代表当前锁未被占用,1代表锁被占用;而在独占并且可重入的锁实现中:0代表当前锁未被占用,而大于0则表示被占用,且表示当前持有锁的线程重入的次数。可以通过getStatesetStatecompareAndSetState来检查或修改同步状态。

    2)head

    private transient volatile Node head;
    

    等待队列的头节点,是懒加载的

    3)tail

    private transient volatile Node tail;
    

    等待队列的尾节点,也是延迟初始化的,仅当调用java.util.concurrent.locks.AbstractQueuedSynchronizer#enq方法时被修改。也就是插入一个新节点时。

    2)exclusiveOwnerThread

    private transient Thread exclusiveOwnerThread;
    

    此属性继承自java.util.concurrent.locks.AbstractOwnableSynchronizer,代表当前持有独占锁的线程。

    2.1.3 扩展方法

    AbstractQueuedSynchronizer采用模板方法,将排队、阻塞等操作统一包装起来,仅暴漏核心方法根据需要实现功能覆写对应的方法即可,所有其他方法都声明为final,因为它们不能被独立更改。这些核心方法的实现需要是线程安全的,

    排它锁对应相关方法:

        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    
        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
    
    

    共享锁对应相关方法:

        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    
        protected boolean tryReleaseShared(int arg) {
            throw new UnsupportedOperationException();
        }
    
    

    其他方法
    该线程是否正在独占资源。只有用到condition才需要去实现它

        protected boolean isHeldExclusively() {
            throw new UnsupportedOperationException();
        }
    

    2.2 ReentrantLock为例分析加锁解锁实现

    ReentrantLock特点:独占可重入支持公平和非公平锁

    ReentrantLock内部java.util.concurrent.locks.ReentrantLock.Sync类继承了AQS,并对AQS中的tryReleaseisHeldExclusively进行了重写。而因为ReentrantLock分为公平锁和不公平锁,所以从Sync又派生出了FairSyncNonfairSync并且对AQS中的tryAcquire进行了重写,分别表示公平锁和非公平锁的实现。

    Sync

    2.2.3 公平式获取锁

    整体大致步骤如下:


    加锁步骤

    入口:java.util.concurrent.locks.ReentrantLock.FairSync#lock

            final void lock() {
                acquire(1);
            }
    

    在lock方法中直接调用AQS的acquire(1);

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    在acquire中首先调用tryAcquire尝试获取锁,如果tryAcquire返回true,则此方法直接返回,代表加锁成功,否则需要执行addWaiter(Node.EXCLUSIVE)为当前线程创建队列节点并入队,然后调用acquireQueued(final Node node, int arg)再次尝试获取锁,或挂起当前线程等待被唤醒,被唤醒以后继续尝试获取锁,直到加锁成功。

    查看tryAcquire具体实现:

        protected final boolean tryAcquire(int acquires) {
            // 获取当前线程
            final Thread current = Thread.currentThread();
            // 获取当前锁的状态
            int c = getState();
            if (c == 0) {
                /**
                 * state为0,说明锁处于空闲状态,可以直接获取
                 *
                 *由于是公平锁实现,所以需要先调用hasQueuedPredecessors看下
                 * 是否在当前节点前面已经有线程在排队获取了
                 *
                 * 如果没有节点在排队,则CAS设置state值,
                 * 并设置当前锁的独占线程为当前线程,返回true,代表加锁成功
                 */
                if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            /**
             * 走到这里,说明state!=0,锁已经被占用
             */
            else if (current == getExclusiveOwnerThread()) {
                /**
                 * 走到这里,说明此时持有锁的线程就是当前线程,则是重入了
                 */
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                // 设置state,此时state表示的锁重入的次数
                setState(nextc);
                return true;
            }
            /**
             * 走到这里,有以下几种情况
             *  1)state==0,但是阻塞队列中已经有其他线程在排队了,所以优先其他线程
             *  2)state==0,并且阻塞队列无其他线程争夺锁,但是CAS失败,被其他线程给抢去了
             *  3)state!=0,当前锁被其他线程持有中
             */
            return false;
        }
    

    以上其实还不算是AQS的核心,tryAquire方法说白了就是最终设置锁状态。AQS的核心还是在acquireQueued方法中,如下是acquireQueued方法的整体流程图:

    acquireQueued 流程

    然后查看addWaiter具体实现分析:

        private Node addWaiter(Node mode) {
            //入参mode代表即将创建节点属于独占模式还是共享模式
    
            // 这里new一个Node对象,并传入当前线程
            Node node = new Node(Thread.currentThread(), mode);
    
            /**
             * 下面这行操作就是,如果当前尾节点存在,则将刚刚
             * 创建的新节点插入在尾节点之后,成为新的尾节点
             */
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            /**
             * 代码如果走到这里,有两种情况
             * 1)当前尾节点存在,但是CAS设置当前节点到尾节点失败(被其他线程节点抢先设置
             * 2)当前不存在尾节点
             */
            enq(node);
            return node;
        }
    

    查看enq()具体实现分析:

        private Node enq(final Node node) {
            /**
             * 首先再明确下,调用此方法的情况
             * 当创建新节点并尝试将新节点放入队列尾部时:
             * 1)当前尾节点存在,但是CAS设置当前节点到尾节点失败(被其他线程节点抢先设置
             * 2)当前不存在尾节点
             */
            for (;;) {
                // 新建一个局部变量t指向尾节点
                Node t = tail;
                if (t == null) {
                    /**
                     * 尾节点为null的情况,此时需要设置一个虚拟头部节点
                     */
                    if (compareAndSetHead(new Node()))
                        // 设置成功,让尾节点引用指向头节点
                        tail = head;
                } else {
                    /**
                     * 尾节点不为null,让新节点的前驱节点指向尾节点
                     * 然后cas尝试将新节点设置为尾节点
                     */
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        // 设置成功,将旧尾节点的后驱指针指向新的尾部节点,也就是当前创建的节点返回
                        t.next = node;
                        /**
                         * 这里需要注意,在整个死循环中,只有这里可以让循环退出
                         * 说明当此方法enq()返回时,那么传入的新节点成功加入到了尾节点
                         * 并且返回新节点的前驱节点
                         */
                        return t;
                    }
                }
            }
        }
    

    查看acquireQueued具体实现分析:

        final boolean acquireQueued(final Node node, int arg) {
            // 定义一个标志位failed,
            boolean failed = true;
            try {
                // 是否需要中断当前线程
                boolean interrupted = false;
                for (;;) {
                    // 获取到当前节点的前驱节点
                    final Node p = node.predecessor();
                    /**
                     * 如果当前节点的前驱节点就是头节点,
                     * 那么说明当前节点是排在阻塞队列第一个位的,此时可以再次尝试获取一下锁
                     */
                    if (p == head && tryAcquire(arg)) {
                        /**
                         * 能够走到这里,说明获取当前节点的前驱节点是头节点,并且获取锁成功
                         *
                         * 然后将当前节点设置头节点,然后将前驱节点的后继节点置为null,
                         * 设置位null的目的是去除旧的头节点的强引用,使之变为垃圾被回收
                         */
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        // 走到这里,不需要对当前线程进行中断
                        return interrupted;
                    }
                    /**
                     * 能够走到这里,有两种情况
                     * 1)当前节点的前驱节点不是头节点
                     * 2)当前节点的前驱节点是头节点,但是尝试获取锁失败
                     */
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                /**
                 * 根据上述死循环,在唯一return的地方之前有设置failed=false的操作
                 * 那么什么时候进入到这里,failed是true呢?那就是可能发送异常的地方:tryAcquire
                */
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    查看shouldParkAfterFailedAcquire的具体实现分析:

     /**
         * 进入到这个方法,那就是该尝试的都尝试了,确实当前线程无法获取到锁
         * 那么就需要检查是否可以将当前线程挂起了。
         *
         * 判断是否可以将当前线程挂起的依据就是在AQS中,
         * 阻塞队列中等待线程的线程是由该线程节点的前驱节点进行唤起的。
         *
         * 所以如果说想要将当前线程挂起,一定得保证当前线程的前驱节点的waitStatus为Node.SIGNAL,
         * 否则如果当前线程挂起,将没有动作会触发让当前线程唤醒
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    
            // 获取当前线程节点的前驱节点状态
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                // 当前线程节点的前驱节点ws为SIGNAL,一切正常,可以挂起当前线程
                return true;
            if (ws > 0) {
                /**
                 * 走到这里说明,当前线程节点的前驱节点对应的线程因为某些原因
                 * 处于取消状态,也就是不会再去抢锁,既然不会再去抢锁,那就无法触发
                 * 唤醒后继节点线程的操作。
                 *
                 * 所以这里需要一直往前找,直到找到一个可以唤醒后继节点的节点,
                 * 然后将当前节点的前驱节点指向这个节点。
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                /**
                 * 循环结束,将找到的这个节点的后继节点设置为当前节点
                 * 注意这里,这个操作也是将从找到的这个节点到当前节点中间处于取消状态的节点移除掉了,
                 * 因为取消状态的节点没什么卵用。
                 */
                pred.next = node;
            } else {
                /**
                 * 走到这里,说明当前节点的前驱节点ws是0(最一开始在enq中初始化的虚拟头节点的ws就是0)
                 * 或者是其他的状态,此时需要将前驱节点的ws设置为SIGNAL
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            /**
             * 走到这里,说明刚开始的时候,当前节点的前驱节点ws状态并不是SIGNAL,并且返回false,代表不挂起当前线程,
             *  然后经过我们的其他额外操作,预期结果应该是将当前节点的前驱节点ws状态设置为了SIGNAL,
             *  此时可能会有两种情况来说明为啥在此处不返回true直接挂起本线程:
             *  1)可能设置SIGNAL失败,所以需要再次检查,那么返回false,下次会再次进来,再检查一次
             *  2)当往前找非取消状态节点时,很可能最终找到的那个节点就是头节点,如果是头节点,说明
             *  此时当前线程节点就是阻塞队列中排队的第一个,可以直接再走一次acquireQueued的for循环,看看是否可以直接加锁成功
             *
             */
            return false;
        }
    

    shouldParkAfterFailedAcquire所做的操作能够保证,如果等待队列中有节点的waitStatus是 Node.SIGNAL,那么一定有线程处于挂起状态,需要被唤醒。

    2.2.4 释放锁

    释放锁相当就要简单一点,首先需要检查释放锁的线程是否就是持有锁的线程,如果都没持有锁,何来的释放呢?检查成功以后,就需要更改当前锁的状态;成功释放锁以后,最关键的一步就是需要唤醒阻塞队列中的等待中的线程了。解锁过程着重看AQS#releaseAQS#unparkSuccessor

    AQS#release

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                // 代码走到这里,锁已经成功释放,然后检查是否需要唤起阻塞队列中的等待线程
                AbstractQueuedSynchronizer.Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    看到这里成功释放锁之后,如何判断是否需要唤起阻塞队列中的等待线程的。首先获取到当前阻塞队列的头节点h,然后分析判断语句:h != null && h.waitStatus != 0。
    1)h != null
    这里首先判断了头节点不为空,假如说头节点都为空了,那么其实相当于队列就是空的,根本没有线程在等待,所以不需要唤起。
    2)h.waitStatus != 0
    走到这里,说明头节点是不为空的,但是,假如头节点的waitStatus是0也不满足。什么情况下waitStatus是0呢?这里需要往上翻加锁代码,查看AQS#enq,也就是说刚开始的头节点都是新建的,新建的Node,其waitStatus本身就是0,只有当有其他线程挂起时,才会将头部节点的waitStatus更新为-1,所以这里如果是0,一样可以断定当前是没有线程处于挂起状态。

    AQS#unparkSuccessor

        private void unparkSuccessor(AbstractQueuedSynchronizer.Node node) {
    
            int ws = node.waitStatus;
            if (ws < 0){
                /**
                 * 走到这里,因为锁要被释放了,所以先更新下头节点状态,代表锁空闲
                 */
                compareAndSetWaitStatus(node, ws, 0);
            }
    
            /**
             * 下面的逻辑其实就是从队列的尾节点开始向前查找,直到找到队列最前面,并且waitStatus是小于等于0的,也就是非取消状态的线程节点
             */
            AbstractQueuedSynchronizer.Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (AbstractQueuedSynchronizer.Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    这里关键的就是找到排队中最靠前的非取消状态的线程节点,将其对应的线程唤醒,但是看代码是从队列的尾部往前查找的,这是为啥?为啥不直接从头部节点往后遍历呢?

    1)原因1 (这个原因参考的美团技术文章,但是我想了想,好像并不是因为这个
    原文是说新增节点入队的情况,其中两行代码如下:

        node.prev = pred;  
        if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
    

    第一行代码和第三行代码并不是原子性的,所以会产生,新加入的node节点前驱已经指向了旧的尾节点,但是旧的尾节点的后继节点并没有指向新加入的node节点(pred.next = node;还没执行),所以导致从前向后遍历,可能根本遍历不到新加入的这个node节点。

    但是我个人觉得,就算遍历不到也没事,因为如果说上述代码的第三行还没执行,一定可以说明新加入的这个node还没走到shouldParkAfterFailedAcquire,所以本身就没被挂起,所以遍历不到无所谓了。

    2)原因2:
    产生CANCELLED状态节点的时候,先断开的是Next指针,Prev指针并未断开,因此也是必须要从后往前遍历才能够遍历完全部的Node。

    三、😻图解加锁解锁过程

    3.1 AQS初始状态

    AQS初始状态

    3.1 加锁

    线程1加锁不释放

    AQS初始状态
    线程2尝试加锁
    线程2尝试加锁
    线程3尝试加锁
    这里直接就给出最终的结果
    线程3尝试加锁

    3.2 解锁

    线程1释放锁

    线程1释放完锁以后,线程2被唤醒然后进入到acquireQueued方法的for循环中,并且此时正好线程2的node节点的前驱节点就是head,所以能够满足第一个if条件,加锁成功以后,将头节点转向自己,此时表示当前持有锁的线程节点就是节点2了。

    线程1释放锁

    参考文章

    一行一行源码分析清楚AbstractQueuedSynchronizer
    算法:CLH锁的原理及实现&并发锁核心类AQS
    从ReentrantLock的实现看AQS的原理及应用

    相关文章

      网友评论

          本文标题:终于搞懂了JUC中的AQS😼

          本文链接:https://www.haomeiwen.com/subject/nmwvwktx.html