Java并发之 AQS 深入解析(上)

作者: 小鱼人爱编程 | 来源:发表于2021-10-19 13:07 被阅读0次

    前言

    线程并发系列文章:

    Java 线程基础
    Java 线程状态
    Java “优雅”地中断线程-实践篇
    Java “优雅”地中断线程-原理篇
    真正理解Java Volatile的妙用
    Java ThreadLocal你之前了解的可能有误
    Java Unsafe/CAS/LockSupport 应用与原理
    Java 并发"锁"的本质(一步步实现锁)
    Java Synchronized实现互斥之应用与源码初探
    Java 对象头分析与使用(Synchronized相关)
    Java Synchronized 偏向锁/轻量级锁/重量级锁的演变过程
    Java Synchronized 重量级锁原理深入剖析上(互斥篇)
    Java Synchronized 重量级锁原理深入剖析下(同步篇)
    Java并发之 AQS 深入解析(上)
    Java并发之 AQS 深入解析(下)
    Java Thread.sleep/Thread.join/Thread.yield/Object.wait/Condition.await 详解
    Java 并发之 ReentrantLock 深入分析(与Synchronized区别)
    Java 并发之 ReentrantReadWriteLock 深入分析
    Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(原理篇)
    Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(应用篇)
    最详细的图文解析Java各种锁(终极篇)
    线程池必懂系列

    前面几篇分析了synchronized 原理及其使用,synchronized 是JVM实现的,核心代码是C++,对于不熟悉C++语言的读者可能有点难度。JUC 包下提供了新的同步框架:AQS,是纯JAVA代码实现的。如果你了解了synchronized 核心,那么AQS不在话下,若是不了解,本篇将一起从头到尾深入分析AQS。
    通过本篇文章,你将了解到:

    1、如何实现自己的同步框架
    2、AQS 功能解析
    3、AQS 独占锁实现
    4、AQS 共享锁实现
    5、场景模拟与疑难解答

    1、如何实现自己的同步框架

    准备关键数据结构

    第一
    得需要共享变量作为"锁芯"。由于这个共享变量是多线程共享,为保证线程间的可见性,因此需要用volatile关键字修饰。

    第二
    当线程竞争锁成功时则进入临界区执行代码,当失败时需要加入到队列里进行等待,因此需要一个同步队列,用以存放因获取锁失败而挂起的线程。

    第三
    线程之间需要同步,A线程等待B线程生产数据,B线程生产了数据通知A线程,因此需要一个等待(条件)队列。

    核心操作

    数据结构准备好之后,需要操作以上数据结构来实现锁功能。
    线程竞争锁:通过CAS操作"锁芯",操作成功则执行临界区代码,失败则加入到同步队列。
    线程被唤醒:拿到锁的线程执行完临界区代码后释放锁,并唤醒同步队列里等待的线程,被唤醒的线程继续竞争锁。
    线程等待某个条件:线程因为某种条件不满足于是加入到等待队列里,释放锁,并挂起等待。
    条件满足线程被唤醒:条件满足,线程被唤醒后继续竞争锁。

    以上步骤是实现锁功能的核心,不论是synchronized还是AQS,基础功能都是以上步骤,只是他们还拥有更丰富的功能,如可中断(AQS),可重入、可独占可共享(AQS)等。
    自己实现锁的实践请移步:Java 并发"锁"的本质(一步步实现锁)

    2、AQS 功能解析

    AQS是AbstractQueuedSynchronizer的简称,顾名思义:同步器。它是JUC下的同步框架,也是实现锁的核心类。
    AQS是抽象类,提供了基础的方法,需要子类实现具体的获取锁、释放锁操作。


    image.png

    如上图所示,AQS 实现了独占锁/共享锁、可中断锁/不可中断锁的逻辑,当子类扩展AQS时调用对应的方法即可实现不同的锁组合。

    JUC下扩展自AQS的子类封装器:


    image.png

    接下来进入到AQS源码里,看看它是如何实现上述功能的。
    注:Semaphore和CountDownLatch 并不是严格意义上的锁,后面具体分析每种锁的时候再细说

    3、AQS 独占锁实现

    A、先找到关键数据结构

    锁芯

    #AbstractQueuedSynchronizer.java
        private volatile int state;
    

    state 称为共享资源或者同步状态,作为锁的锁芯,可以看出它用volatile修饰了。

    同步队列

    #AbstractQueuedSynchronizer.java
        //指向同步队列的头
        private transient volatile Node head;
        //指向同步队列的尾
        private transient volatile Node tail;
    

    再来看Node里的元素:

    #AbstractQueuedSynchronizer.java
        static final class Node {
            ...
            //前驱节点
            volatile Node prev;
            //后继节点
            volatile Node next;
            //占用独占锁的线程
            volatile Thread thread;
            //指向下一个等待条件的节点
            Node nextWaiter;
            ...
        }
    

    B、操作关键数据结构的方法

    1、先说获取同步状态的操作

    acquire(xx)

    #AbstractQueuedSynchronizer.java
        public final void acquire(int arg) {
            //arg 不同的锁实现有不同的含义
            //tryAcquire 由子类实现具体的获取同步状态操作
            //addWaiter 将当前线程封装在Node里并加入到同步队列
            //acquireQueued 符合条件则挂起线程
            if (!tryAcquire(arg) &&
                    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                //补上中断标记
                selfInterrupt();//-------------(1)
        }
    

    里面写法很简单,重要工作都在各个方法里体现。
    \color{Red}{问题1:如标注的(1),为什么需要selfInterrupt()?}

    tryAcquire(xx)

    真正获取锁的地方,也就是操作锁芯"state"的地方,不同子类有不一样的实现,后面会分类细说。tryAcquire(xx) 返回true表示获取同步状态成功,false表示获取同步状态失败。

    addWaiter(xx)

    #AbstractQueuedSynchronizer.java
        private Node addWaiter(Node mode) {
            //构造新节点
            Node node = new Node(Thread.currentThread(), mode);
            Node pred = tail;
            if (pred != null) {
                //尾节点存在
                //新节点的前驱指针指向尾节点
                node.prev = pred;//-------------(2)
                //CAS修改为尾节点指向新节点
                if (compareAndSetTail(pred, node)) {
                    //成功后
                    //将最后一个节点的后继指针指向新加入的节点
                    //此时新节点正式加入到同步队列里了
                    pred.next = node;
                    return node;
                }
            }
            //前面步骤加入队列失败,则会走到这
            enq(node);
            //返回新加入的节点
            return node;
        }
    
        private Node enq(final Node node) {
            //死循环务必保证插入队列成功
            for (;;) {
                Node t = tail;
                if (t == null) { 
                    //队列是空的,则先创建头节点
                    if (compareAndSetHead(new Node()))
                        //尾节点指向头节点
                        tail = head;
                } else {
                    //和addWaiter里一样的操作,加入新节点到队尾
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    addWaiter(xx)作用是将节点加入到同步队列的尾部。
    需要注意的是:

    头节点不关联任何线程,仅仅起到索引的作用。

    最终,同步队列如下图:


    image.png

    \color{Red}{问题2:如标注的(2),为什么先设置前驱指针?}

    acquireQueued(xx)

    按照以往的经验(synchronized),加入到同步队列后应该挂起线程,来看看AQS实现有何不同:

    #AbstractQueuedSynchronizer.java
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                //记录中断标志
                boolean interrupted = false;
                for (;;) {
                    //找到当前节点的前驱节点
                    final Node p = node.predecessor();
                    //如果前驱节点为头节点,则尝试获取同步状态
                    if (p == head && tryAcquire(arg)) {
                        //获取同步状态成功,将头节点指向下一个节点,并且新的头节点prev=null,表示原本的头节点出队了
                        setHead(node);
                        //原本的头节点next=null,帮助尽快GC
                        p.next = null;
                        failed = false;
                        return interrupted;
                    }
                    //判断获取同步状态失败后是否需要挂起,走到这里说明获取同步状态失败了,可能需要挂起
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        //标记中断
                        interrupted = true;
                }
            } finally {
                if (failed)
                    //还是没获取成功,则取消竞争同步状态操作
                    cancelAcquire(node);
            }
        }
    

    这里面的逻辑可能比较绕,着重分析一下。
    首先外层有个死循环,该循环退出的条件是当前线程成功获取了同步状态。
    其次,如果当前新加入队列的节点的前驱节点是头节点,那么它就会去尝试获取同步状态。若是获取同步状态失败或者它的前驱节点不是头节点,则进入到shouldParkAfterFailedAcquire(xx)方法。

    #AbstractQueuedSynchronizer.java
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            //拿出前驱节点的状态
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                //为SIGNAL 直接返回
                return true;
            if (ws > 0) {
                //该节点已被取消
                do {
                    //一直往前追溯,直到找到不被取消的节点为止
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                //找到
                pred.next = node;
            } else {
                //不是取消状态,则直接设置前驱节点状态为SIGNAL
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    该方法返回true,则说明当前线程所在节点的前驱节点的状态为:SIGNAL。进而执行parkAndCheckInterrupt()方法。

    parkAndCheckInterrupt()

    从名字就可以看出来是挂起线程并检查中断。

    #AbstractQueuedSynchronizer.java
        private final boolean parkAndCheckInterrupt() {
            //挂起线程
            LockSupport.park(this);
            //查询中断标记位
            return Thread.interrupted();
        }
    

    cancelAcquire(xx)

    该方法有两种场景会调用到:

    1、当获取同步状态发生异常时,需要取消线程竞争同步状态的操作。
    2、当获取同步状态的超时时间到来之时,若此刻还无法成功获取同步状态,则调用该方法。

    #AbstractQueuedSynchronizer.java
        private void cancelAcquire(Node node) {
            if (node == null)
                return;
            node.thread = null;
            Node pred = node.prev;
            //若是已经取消了,则跳过
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
            Node predNext = pred.next;
    
            //标记为取消状态
            node.waitStatus = Node.CANCELLED;
            //如果当前节点是尾节点,则将尾节点指向当前节点的前驱节点
            if (node == tail && compareAndSetTail(node, pred)) {
                //再将前驱节点的后继指针置为空,把node从队列里移除
                compareAndSetNext(pred, predNext, null);
            } else {//---------------(3)
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                    //若前驱节点不是头结点,当前节点也不是尾结点
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        //将node的前驱节点的后继指针指向node的后继节点
                        compareAndSetNext(pred, predNext, next);
                } else {
                    //前驱节点是头结点
                    unparkSuccessor(node);
                }
    
                node.next = node; // help GC
            }
        }
    

    \color{Red}{问题3:如标注的(3),node 什么时候从队列里移除?}
    2、再说释放同步状态的操作

    release(xx)

    #AbstractQueuedSynchronizer.java
        public final boolean release(int arg) {
            //tryRelease 释放同步状态
            if (tryRelease(arg)) {
                //释放同步状态成功
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    //waitStatus 不为0,则唤醒线程
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
        private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            //将头结点状态置为0
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            //取出头结点的后继节点
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
              //若是没有后继节点或者是取消状态
                s = null;
                //则从尾部开始寻找离头结点最近的未取消的节点-----------(4)
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            //唤醒线程
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    \color{Red}{问题4:如标注的(4),为什么需要从尾部开始索引?}

    节点状态

    #AbstractQueuedSynchronizer.java
            //节点被取消,不再参与竞争锁
            static final int CANCELLED =  1;
            //表示该节点的后继节点需要唤醒
            static final int SIGNAL    = -1;
            //节点在等待队列里的状态
            static final int CONDITION = -2;
            //表示头结点将唤醒的动作传播下去
            static final int PROPAGATE = -3;
            //默认值为0
    

    至此,独占锁的获取锁、释放锁的流程已经分析完毕,如下图:


    image.png

    4、AQS 共享锁实现

    独占锁是同一时刻只允许一个线程获取锁,而共享锁则不然,来看看AQS里共享锁的实现。
    先来看看获取共享同步状态的操作

    acquireShared(xx)

    #AbstractQueuedSynchronizer.java
        public final void acquireShared(int arg) {
            //获取共享的同步状态,不同锁实现不一样
            //<0 表示获取同步状态失败
            if (tryAcquireShared(arg) < 0)
                //加入同步队列、挂起线程等在此处实现
                doAcquireShared(arg);
        }
    

    与独占锁的获取不一样的是,此处将加入同步队列与挂起线程等操作放到一个方法里了。

    doAcquireShared(xx)

    #AbstractQueuedSynchronizer.java
        private void doAcquireShared(int arg) {
            //加入同步队列,此处节点是共享状态
            final Node node = addWaiter(Node.SHARED);
            //获取同步状态失败
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    //前驱节点
                    final Node p = node.predecessor();
                    if (p == head) {
                        //若是前驱节点为头结点,则尝试获取同步状态
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            //获取同步状态成功
                            //修改头结点,并传递唤醒状态
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                //补中断
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    //与独占锁一致
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }   
    

    tryAcquireShared(arg) 返回值表示当前可用的资源。

    setHeadAndPropagate(xx)

    #AbstractQueuedSynchronizer.java
    private void setHeadAndPropagate(Node node, int propagate) {
            //propagate == 0 表示没有资源可以使用了
            Node h = head; // Record old head for check below
            //设置头结点
            setHead(node);
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                //若后继节点是共享节点,则唤醒
                if (s == null || s.isShared())
                    doReleaseShared();
            }
    

    除了将头结点指向当前节点外,还需要唤醒下一个共享节点。
    而独占锁不会。

    从现实的角度来看也比较容易理解这种操作:

    某个澡堂分男女分批洗,当前只允许女士进去先洗,而一群男士在排队等候,当女士们洗好之后,允许男士进去洗。第一个男士进去后,发现可以洗,于是跟第二个男士说可以洗,你快进来吧,真可以洗,这是共享精神。这个澡堂就是共享的。

    再来看看释放共享同步状态的操作

    releaseShared(xx)

    #AbstractQueuedSynchronizer.java
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                //释放同步状态成功后,通知后续节点
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    可以看出,线程在获取共享锁和释放共享锁后都会尝试唤醒后续节点,都调用了
    doReleaseShared()方法。

    doReleaseShared()

    #AbstractQueuedSynchronizer.java
        private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    //队列里还有节点等候
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        //唤醒后继节点
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    //将头节点状态置为PROPAGATE-------->(5)
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                //该方法可能会被多个线程调用,而线程获取锁后会修改头节点
                //因此,若是发现头结点更改了,则再重新拿新的头结点再试探
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    

    \color{Red}{问题5:如标注的(5),为什么需要PROPAGATE状态?}
    至此,共享锁的获取锁、释放锁的流程已经分析完毕,如下图:

    image.png

    5、场景模拟与疑难分析

    共享锁、独占锁的实现重要方法、数据结构都过了一遍,接下来通过模拟场景来分析上面提到的五个问题。

    1、问:为什么需要selfInterrupt()

    #AbstractQueuedSynchronizer.java
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    LockSupport.park(this)将线程挂起,挂起后调用Thread.interrupted()查询中断状态。而Thread.interrupted()除了查询中断状态外,还会重置中断状态,也就是说之前中断状态为true,调用该方法后中断状态变为false。而从整个acquire(xx)方法来看,没有任何地方处理了中断,因此不能简单将中断状态置为false,还需要恢复到原来的样子,让外部调用者可以感知到是否已经发生过中断了,所以需要selfInterrupt() 重新把中断状态设置为true。
    既然后面要恢复中断状态,那干嘛一开始就置为false呢,直接调用Thread.isInterrupted()不就ok了吗?
    想象一种场景:A线程被中断,此时从挂起状态醒过来,然后去获取同步状态,发现还是无法获取,于是又开始准备挂起了。此处挂起线程使用的是LockSupport.park(xx)方法,其底层使用Parker.park(xx)函数:


    image.png

    注意红框里代码,若是发现当前线程中断状态为true,则直接返回不再挂起线程。若是调用Thread.isInterrupted(),中断状态没有改为false,那么当调用LockSupport.park(xx)方法时,线程是无法挂起的。而acquire(xx)方法里没有获取到锁就一直循环,导致线程一直不断轮询同步状态,造成了不必要的CPU资源浪费。
    Parker细节请移步:Java Unsafe/CAS/LockSupport 应用与原理
    线程中断细节请移步:Java “优雅”地中断线程(原理篇)

    2、问:为什么先设置前驱指针
    当前的顺序是:

    node.prev = pred------>pred.next = node;

    先让新结点的prev指向尾结点,再让尾结点的next指向新结点,如下图:


    image.png

    现在同步队列里有两个结点,其中一个头结点,一个是Node1结点。若是先给pred.next 赋值,假设流程如下:

    1、线程A先竞争锁,竞争失败,先将Node1的next指向NewNodeA。
    2、此时另一个线程B也来竞争锁,失败,也将Node1的next指向NewNodeB。
    3、将tail指针指向新的节点(可能是NewNodeA,也可能是NewNodeB),若是NewNodeA,然后将NewNodeA的prev指向Node1。此时问题出现了:虽然NewNodeA的prev指向了Node1,但是Node1的next却是指向了NewNodeB。

    而先给node.prev 赋值就不会出现上述情况。出现这个问题的根本原因是多线程操作队列元素(给Node1.next赋值)没有做好并发保护,而先给node.prev 并不是操作队列,将操作队列的步骤延迟到CAS成功之后,就能正确地修改队列。
    当然,pred.next = node 执行之前,其它线程可能会遍历查询队列,此时pred.next可能为空,也就是上图的Node1.next可能为空。
    这也是网上一些文章说next指针不可靠的原因。

    3、问:node 什么时候从队列里移除
    cancelAcquire(xx)分三种情形操作同步队列:
    1、若node为队尾节点,则将node从队列移除。
    2、若node为队头节点,则调用unparkSuccessor(xx)检测。
    3、若node为中间节点,则在shouldParkAfterFailedAcquire(xx)/unparkSuccessor(xx) 彻底移除。

    4、问:为什么需要从尾部开始索引
    在第2点里有分析过节点的next指针可能为空,若是从队头开始索引,有可能还没遍历完整个队列就退出遍历了。因此,为了保险起见,从队尾开始索引。

    5、问:为什么需要PROPAGATE状态
    PROPAGATE 在共享节点时才用得到,假设现在有4个线程、A、B、C、D,A/B 先尝试获取锁,没有成功则将自己挂起,C/D 释放锁。可以参照Semaphore获取/释放锁流程。

    1、C 释放锁后state=1,设置head.waitStatus=0,然后将A唤醒,A醒过来后调用tryAcquireShared(xx),该方法返回r=0,此时state=0。
    2、在A还没调用setHeadAndPropagate(xx)之前,D 释放了锁,此时D调用doReleaseShared(),发现head.waitStatus==0,所以没有唤醒其它节点。
    3、此时A调用了setHeadAndPropagate(xx),因为r==0且head.waitStatus==0,因此不会调用doReleaseShared(),也就没有唤醒其它节点。最后导致的是B节点没有被唤醒。

    若是加了PROPAGATE状态,在上面的第2步骤里的D调用doReleaseShared()后,发现head.waitStatus==0,于是设置head.waitStatus=PROPAGATE,在第3步骤里,发现head.waitStatus==PROPAGATE,于是唤醒B。
    虽然在第2步骤里没有唤醒任何线程,但是设置了PROPAGATE状态,在后续的步骤中发现已经设置了PROPAGATE,于是唤醒,这也是PROPAGATE名字的意义:传播。

    由于篇幅原因,下篇将分析AQS 中断与否、条件等待等相关知识。

    本文基于jdk1.8。

    您若喜欢,请点赞、关注,您的鼓励是我前进的动力

    持续更新中,和我一起步步为营系统、深入学习Android/Java

    相关文章

      网友评论

        本文标题:Java并发之 AQS 深入解析(上)

        本文链接:https://www.haomeiwen.com/subject/zdlaoltx.html