Java并发之 AQS 深入解析(下)

作者: 小鱼人爱编程 | 来源:发表于2021-10-20 23:36 被阅读0次

    前言

    线程并发系列文章:

    Java 线程基础
    Java 线程状态
    Java “优雅”地中断线程-实践篇
    Java “优雅”地中断线程-原理篇
    真正理解Java Volatile的妙用
    Java ThreadLocal你之前了解的可能有误
    Java Unsafe/CAS/LockSupport 应用与原理
    Java 并发"锁"的本质(一步步实现锁)
    Java Synchronized实现互斥之应用与源码初探
    Java 对象头分析与使用(Synchronized相关)
    Java Synchronized 偏向锁/轻量级锁/重量级锁的演变过程
    Java Synchronized 重量级锁原理深入剖析上(互斥篇)
    Java Synchronized 重量级锁原理深入剖析下(同步篇)
    Java并发之 AQS 深入解析(上)
    Java并发之 AQS 深入解析(下)
    Java Thread.sleep/Thread.join/Thread.yield/Object.wait/Condition.await 详解
    Java 并发之 ReentrantLock 深入分析(与Synchronized区别)
    Java 并发之 ReentrantReadWriteLock 深入分析
    Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(原理篇)
    Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(应用篇)
    最详细的图文解析Java各种锁(终极篇)
    线程池必懂系列

    image.png

    上篇分析了AQS实现共享/独占锁的实现细节以及一些疑难点,本篇继续分析AQS剩余部分知识。通过本篇文章,你将了解到:

    1、可/不可中断的独占锁
    2、可/不可中断共享锁
    3、可/不可限时等待的锁
    4、等待/通知的实现
    5、同步队列与等待队列的异同点
    6、Condition.await/Condition.signal 与Object.wait/Object.notify区别

    1、可/不可中断的独占锁

    可中断锁的定义

    打个小比喻:

    • 先说一种场景:小明是网瘾少年,整天在网吧里打游戏,妈妈打电话叫他回家吃饭(给他发中断命令),小明口头答应了妈妈,但是一挂电话就立马又打游戏了。妈妈反复打电话,都无法中断小明的上网过程,我们说小明的上网过程是不可打断的。
    • 再说另一种场景:小明虽然是网瘾少年,但是很听妈妈的话,妈妈打电话叫他回家吃饭(给他发中断命令),小明口头答应了并且关机了,说明小明的上网过程是可以中断的。

    再来从代码角度来理解,先看一段加锁/解锁的伪代码:

        private void testLock() {
            myLock.lock();
            doSometing1();
            doSometing2();
            myLock.unlock;
        }
    
        private void doSometing1() {//... }
        private void doSometing2() {//... }
    

    lock是独占锁,线程A、B同时争抢锁,假设A成功获取了锁,就能执行doSometing1()、doSometing2()方法了。
    线程B调用myLock.lock()因为无法获取锁阻塞了,然而另外的线程C在等待线程B的数据,因为线程B一直拿不到锁,因此也生产不了C需要的数据,于是C想要中断B,B被唤醒后有两种选择:

    1、不改初衷,继续尝试获取锁,获取不到继续阻塞。不论C怎么中断,都无动于衷。
    2、检测中断是否发生了,若是发生了直接抛出异常(老子不干了,不再尝试获取锁)。

    若myLock的设计满足选择1,称之为锁不可以被中断,若是满足选择2,称之为锁可被中断。
    用图表示如下:


    image.png

    不可中断的独占锁

    之前分析过的独占锁的调用流程:

    acquire(xx)-->acquireQueued(xx)

    在acquireQueued(xx)里:


    image.png

    如上图标注的1、2两个地方,仅仅只是检测了中断,然后将中断值返回给上一层调用者。
    而上一层acquire(xx)里:


    image.png

    发现发生过中断,也只是重新将中断标记补上而已。

    可以看出,线程调用acquire(xx)获取锁,若是发生了中断,线程还是自顾自地去尝试获取锁,外界的中断调用根本无法终止它获取锁的流程。
    此时的锁为:不可中断的独占锁。

    可中断的独占锁

    若要中断获取锁的流程,那么就需要检测中断标记位,并抛出异常或者退出流程。看看AQS是如何实现的:

    #AbstractQueuedSynchronizer.java
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            //若是发生了中断,则抛出中断异常
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
    
        private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        //因为已经在下面检测中断并抛出异常,因此此处都无需返回中断状态
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        //醒过来后,发现被中断了,于是直接抛出中断异常
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    

    可以看出,有两个地方抛出了异常:

    1、在准备获取锁之前检测中断是否已经发生,若是则抛出异常。
    2、被唤醒后检测中断是否已经发生,若是则抛出异常,中断获取锁的流程。

    2、可/不可中断共享锁

    不可中断的共享锁

    之前分析过的共享锁的调用流程:

    acquireShared(xx)-->doAcquireShared(xx)

    在doAcquireShared(xx)里:


    image.png

    与不可中断的独占锁不一样的是:doAcquireShared(xx)检测到中断后直接将中断标记位补上了,不用传递到外层了。
    当然与不可中断的独占锁一样的是:它们都没有处理中断。

    此时的锁为:不可中断的共享锁。

    可中断的共享锁

    #AbstractQueuedSynchronizer.java
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                //若是发生了中断,则抛出中断异常
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            //因为已经在下面检测中断并抛出异常,因此此处无无需补中断
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        //醒过来后,发现被中断了,于是直接抛出中断异常
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    可以看出,与可中断的独占锁处理中断的逻辑一致。

    3、可/不可限时的锁

    限时等待的独占锁

    由上可知,虽然锁可以响应中断,但是还有另外的场景没有覆盖到:

    线程不想一直等待获取锁,而是想要等待一定的时间,若是没有获取到锁,则放弃获取锁。

    来看看AQS里提供的限时等待获取锁机制:

    #AbstractQueuedSynchronizer.java
        public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            //传入nanosTimeout 参数,单位是纳秒,表示若是这个时间耗尽了还是没获取到锁,则退出获取锁流程
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
        }
    
        private boolean doAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            //要求时间>0
            if (nanosTimeout <= 0L)
                return false;
            //计算截止时间
            final long deadline = System.nanoTime() + nanosTimeout;
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                    //查看时间是否耗尽了
                    nanosTimeout = deadline - System.nanoTime();
                    if (nanosTimeout <= 0L)
                        //耗尽,则直接退出
                        return false;
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                        //睡眠一定的时间后醒来
                        LockSupport.parkNanos(this, nanosTimeout);
                    if (Thread.interrupted())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    可以看出,线程挂起后,有两种方式可以将其唤醒:

    1、外界调用中断方法。
    2、挂起限时时间到达。

    当线程被唤醒后检查中断标记位,若是发生了中断则直接抛出异常,否则再尝试获取锁,若是获取锁失败,则判断是否已经超时,若是则退出。

    可限时等待的共享锁

    与可限时等待的独占锁类似,不再详述。

    4、等待/通知的实现

    基础数据结构

    AbstractQueuedSynchronizer里有个子类:ConditionObject
    是实现等待/通知的基础。
    先来看看其结构:

    #ConditionObject
    //指向队头
    private transient Node firstWaiter;
    //指向队尾
    private transient Node lastWaiter;
    

    firstWaiter、lastWaiter 结合Node里的nextWaiter 指针,共同维护了等待队列:


    image.png

    数据结构有了,那么就需要操作数据结构的方法:


    image.png

    可以看出有两种形式的方法:awaitXX(xx)/signalXX():

    await():不限时等待。
    await(long time, TimeUnit unit):限时等待,可以指定时间单位。
    awaitNanos(long nanosTimeout):限时等待,时间单位为纳秒。
    awaitUninterruptibly():不可中断的限时等待。
    awaitUntil(Date deadline):限时等待,指定超时时间为将来的某个时间点。
    signal():通知等待队列里节点。
    signalAll():通知等待队列里所有的节点。

    线程A调用awaitXX(xx)阻塞等待条件满足,线程B调用signalXX()通知线程A条件满足。显然,这就是一次线程的同步过程。
    取await()/signal()/signalAll()方法来分析等待/通知机制。

    ConditionObject.await() 实现

            public final void await() throws InterruptedException {
                //发生中断,抛出异常
                if (Thread.interrupted())
                    throw new InterruptedException();
                //加入等待队列
                Node node = addConditionWaiter();//--------->(1)
                //全部释放锁
                int savedState = fullyRelease(node);//--------->(2)
                int interruptMode = 0;
                //如果不在同步队列里
                while (!isOnSyncQueue(node)) {//--------->(3)
                    //挂起线程
                    LockSupport.park(this);
                    //线程被唤醒后,检查是否发生了中断--------->(4)
                    //被唤醒有两种原因:1是发生了中断,2是别的线程调用了signal
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                //获取同步状态
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    //成功,则标记线程需要重新设置中断标记位
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    //从等待队列里移除被取消的节点--------->(5)
                    unlinkCancelledWaiters();
                //决定如何处理中断
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);//--------->(6)
            }
    

    纵观整个await()方法,主要做了3点工作:

    1、封装线程到Node节点,并加入等待队列。
    2、挂起线程等待条件满足。
    3、线程唤醒后争抢锁。

    注释标记了6个重点方法,分别来看看:
    (1)

            private Node addConditionWaiter() {
                //找出尾节点
                Node t = lastWaiter;
                if (t != null && t.waitStatus != Node.CONDITION) {
                    //发现等待状态不是CONDITION,则遍历等待队列移除被取消的节点
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                //构造节点
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    //若队列为空,则头指针指向当前节点
                    firstWaiter = node;
                else
                    //将当前节点挂到尾节点后
                    t.nextWaiter = node;
                //尾指针指向尾节点
                lastWaiter = node;
                return node;
            }
    

    该方法作用:将线程封装为节点,加入到等待队列。
    (2)
    既然都要等待了,那么就把锁释放掉,别的线程才能获取锁做相应的操作。

        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                //获取当前的同步状态
                int savedState = getState();
                //全部释放同步状态
                if (release(savedState)) {
                    //释放成功
                    failed = false;
                    return savedState;
                } else {
                    //释放失败,说明当前锁不是独占锁,抛出异常
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    

    从该方法可以看出:调用await()时,必须保证AQS为独占锁。
    (3)

        final boolean isOnSyncQueue(Node node) {
            //若是当前节点状态为CONDITION 或者说node前驱节点为null,说明节点不在同步队列里
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            //节点状态不为CONDITION 且node前驱节点存在,并且后继节点存在,则认为在同步队列里
            if (node.next != null) 
                return true;
            //后继节点不存在,可能是节点加入到同步队列尾部的时候,CAS修改tail指向不成功,因此此处再遍历同步队列直接比较节点是否相等
            return findNodeFromTail(node);
        }
    

    需要保证不在同步队列里才能操作。
    (4)

       private int checkInterruptWhileWaiting(Node node) {
                //若是发生了中断,则进一步调用transferAfterCancelledWait判断,否则直接返回0
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
            }
    
        final boolean transferAfterCancelledWait(Node node) {
            //走到这,说明线程曾经被中断过,接下来就是要判断signal动作是否发生了
            //尝试修改节点状态
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                //成功,则加入到同步队列里
                //CAS 成功表明此时还没有发生signal()
                enq(node);
                return true;
            }
            //说明CAS失败,失败的原因是:signal()里已经将node状态改变了。
            //因此,此处只需要等待signal()将节点加入到同步队列即可。
            while (!isOnSyncQueue(node))
                Thread.yield();
            return false;
        }
    

    transferAfterCancelledWait(xx)返回true,表示需要抛出中断异常,返回false,表示只需要将中断标记位补上就好了。
    interruptMode!=0,说明发生了中断,直接退出循环。

    (5)
    虽然前面已经将节点加入到同步队列里,但是可能并没有将节点从等待队列里移除(没有调用signal的情况),因此这里需要检测一下。

    (6)

            private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
                //直接抛出异常,发生了中断,但是还没有signal
                if (interruptMode == THROW_IE)
                    throw new InterruptedException();
                else if (interruptMode == REINTERRUPT)
                    //将中断标记位补上
                    selfInterrupt();
            }
    

    可以看出,即使发生了中断,await(xx)也需要等到获取锁后才会处理中断,这也保证了从await(xx)调用返回时线程一定是获取了锁。

    ConditionObject.signal() 实现

            public final void signal() {
                if (!isHeldExclusively())
                    //不是独占锁,则抛出异常
                    throw new IllegalMonitorStateException();
                //找到第一个节点
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    
            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        //没有后续节点,则尾指针赋null
                        lastWaiter = null;
                    //从等待队列里移除头节点
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&//将节点移动到同步队列
                         //头指针指向下一个节点
                         (first = firstWaiter) != null);
            }
    
            final boolean transferForSignal(Node node) {
                //修改状态
                if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                    return false;
                //加入同步队列
                Node p = enq(node);
                int ws = p.waitStatus;
                //p是node的前驱节点,若前驱节点被取消或者修改状态失败,则直接唤醒当前node关联的线程
                if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                    LockSupport.unpark(node.thread);
                return true;
        }
    

    可以看出,调用signal()时需要保证AQS是独占锁,并且当前线程已经获取了独占锁。
    纵观整个signal()方法,主要做了3点:

    1、将节点从等待队列移除。
    2、将节点加入到同步队列。
    3、若同步队列只有一个节点或者修改前驱节点状态失败,则唤醒当前节点。

    ConditionObject.signalAll() 实现

            public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }
    
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    //从等待队列移除
                    first.nextWaiter = null;
                    //加入到同步队列
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    

    可以看出,signal()只是将等待队列头部节点移动到同步队列,而signalAll()将等待队列里的所有节点移动到同步队列。

    5、同步队列与等待队列的异同点

    1、同步队列是双向链表实现的(FIFO),通过Node.prev/Node.next指针链接前驱、后继节点。
    2、等待队列是单向链表实现的(FIFO),通过Node. nextWaiter指针链接后继节点。
    3、两者队列里的节点都是Node类型。
    4、获取锁失败会加入到同步队列的队尾等待,获取锁成功将从同步队列里移除。
    5、调用await()将会加入到等待队列尾部,调用signal()将从等待队列头部移除,并且加入到同步队列尾部。

    6、Condition.await/Condition.signal 与Object.wait/Object.notify区别

    先看一段代码:
    Object.java 自带的等待/通知应用:

        Object object = new Object();
        private void testObjectWait() {
            synchronized (object) {
                try {
                    object.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void testObjectNotify() {
            synchronized (object) {
                object.notify();
            }
        }
    

    再看Condition 等待/通知应用:

        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        Condition condition2 = lock.newCondition();
        private void testConditionAwait() {
            lock.lock();
            try {
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        private void testConditionSignal() {
            lock.lock();
            try {
                condition.signal();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    

    两者异同:
    相同点

    1、都需要获取锁后才能调用对应的方法,获取锁是为了保证条件变量在线程并发条件下的正确性。
    2、等待/通知 方法成对出现。
    3、等待方法都可以响应中断。
    4、等待方法都支持超时返回。

    不同点

    Condition 等待/通知依赖AQS,也就是需要配合Lock使用,在JDK里实现。
    Object 等待/通知依赖synchronized 关键字,在JVM里实现。

    此外,Condition的等待/通知机制比Object的等待/通知机制更灵活,如下:

    1、Condition 等待可以响应中断,也可以不响应。
    2、Condition 等待可以设置超时为未来的某个时间点。
    3、同一把锁可以生成多个Condition。

    下一篇将会着重分析AQS 衍生的子类封装器如:ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch具体实现及其应用。

    本文基于jdk1.8。

    您若喜欢,请点赞、关注,您的鼓励是我前进的动力

    持续更新中,和我一起步步为营系统、深入学习Android/Java

    相关文章

      网友评论

        本文标题:Java并发之 AQS 深入解析(下)

        本文链接:https://www.haomeiwen.com/subject/jegxaltx.html