美文网首页一些收藏
知识回顾|并发|AQS

知识回顾|并发|AQS

作者: 三更冷 | 来源:发表于2023-01-31 23:26 被阅读0次

    前提

    参考网上文章,本文针对以下几个问题作出回答,文内可能有遗漏、错误或表达不够清晰的地方(基于JDK8):

    ① AQS是什么?其内部实现?
    ② AQS是如何唤醒下一个线程的?
    ③ 依靠AQS的同步类比如ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等的与AQS的关联?
    ④ ReentrantLock如何实现公平和非公平锁?
    ⑤ CountDownLatch和CyclicBarrier的区别?各自适用于什么场景?

    ① AQS是什么?其内部实现?

    AQS 是 JUC 包中用于构建锁或者其他同步组件(信号量、事件等)的基础类,是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架 [^1] 。下面基于 Doug Lea 所著论文中 [^2] ,关于需求、设计、实现的描述来讨论 AQS 内部实现。

    背景和需求

    那么,设计一个同步框架要满足哪些通用的需求和机制?

    功能上

    1. 阻塞:阻塞线程直到同步状态允许它继续执行(acquire方法)
    2. 唤醒:释放正在阻塞的线程,修改同步状态,使得一或多个被acquire阻塞的线程继续执行(release方法)
    3. 非阻塞: 非阻塞同步(tryLock方法)
    4. 超时设置:携带超时时间,让调用者可以放弃等待(tryAcquireNanos方法)
    5. 支持中断:线程等不及资源了从等待队列退出,通过中断实现任务取消(acquireInterruptibly方法)
    6. 独占/共享:根据资源是否可以被同时访问定义了两种资源共享方式(acquireShared方法)
      1)Exclusive(独占),同一时间只有一个线程可以通过阻塞点(ReentrantLock)
      2)Share(共享),多个线程可以同时执行(Semaphore/CountDownLatch )

    性能目标

    主要的性能目标是可伸缩性,即在大部分情况下,即使,或特别在同步器有竞争的情况下,稳定地保证其效率。在理想的情况下,不管有多少线程正试图通过同步点,通过同步点的开销都应该是个常量。

    设计上

    实现同步器的伪代码如下:

    为了实现上述操作,需要下面三个基本组件的相互协作:

    • 同步状态的原子性管理;
    • 线程的阻塞与解除阻塞;
    • 队列的管理;

    实现

    AQS针对上述三个功能的具体实现:

    1. 使用单个 int(32位)来保存同步状态 (资源的可用状态),并暴露出getState、setState以及compareAndSet操作来读取和更新这个状态。AbstractQueuedLongSynchronizer类使用了64位 long 字段的原子性操作。

    2. 阻塞
      j.u.c包 LockSupport 类来作为线程阻塞和唤醒的工具。方法LockSupport.park阻塞当前线程除非/直到有个LockSupport.unpark 方法被调用(unpark方法被提前调用也是可以的)

    3. 两种队列(都基于Node内部类)
      1)同步等待队列,是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。
      2)条件等待队列,不是必须的,其是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个Condition queue。

    同步等待队列节点类state

    // 取消状态     static final int CANCELLED =  1;
    // 唤醒状态     static final int SIGNAL    = -1;
    // 条件等待状态  static final int CONDITION = -2;
    // 传播状态     static final int PROPAGATE = -3;
    // 等待状态,初始值为0,其他可选值是上面的4个值 volatile int waitStatus;
    

    同步等待队列

    AQS 类的 protected 修饰的构造函数里面有一大段注释用于说明AQS实现的等待队列的细节事项,这里列举几点重要的[^4]:


    假设使用Node.EXCLUSIVE模式把新增的等待线程加入队列,例如有三个线程分别是thread-1、thread-2和thread-3,线程入队的时候都处于阻塞状态,新入队的线程总是添加到队列的tail节点,阻塞的线程总是”争抢”着成为head节点。

         +------+  prev +-----+       +-----+       +-----+
    head |      | <---- |     | <---- |     | <---- |     |  tail
         +------+ ----> +-----+ ----> +-----+ ----> +-----+
                   next thread1       thread2       thread3
    

    条件等待队列

    理解条件等待队列,要先理解java.util.concurrent.locks.Condition接口,Condition可以理解为Object中的wait()、notify()和notifyAll()的替代品。

    Condition的实现类是AQS的公有内部类ConditionObject,使用 condition 时必须先持有相应的锁,每个 ReentrantLock 实例可以通过调用多次 newCondition 产生多个 ConditionObject 的实例。

    条件队列[^5]

    ② AQS是如何唤醒下一个线程的?

    1. 以 ReentrantLock 为例,获取锁失败的线程,会向 CLH 队列中添加一个独占模式的节点到队尾,然后挂起等待唤醒(通过LocksSuport#park挂起,挂起前会修改前驱节点状态为 Node.SIGNAL(-1))。
    2. 锁释放,从队尾往前找,找到 waitStatus<=0 的所有节点中排在最前面的一个,waitStatus 为 SIGNAL 代表后继节点需要被唤醒(通过 LocksSuport#unpark 唤醒线程)。
    3. 线程被唤醒后检查当前线程的中断状态并置 false ,如果发现该线程被中断过,就再中断一次,再然后再次获取锁。
    // 中断标志位的修改
    Thread.interrupted()                                    true ->false 
    Thread.currentThread().interrupt()                      false -> true
    

    代码实例:

    public class ReentrantLockTest {
    
        public static void main(String[] args) throws InterruptedException {
            new Thread(() -> {
                tryAcquire();
            }, "thread-0").start();
    
            Thread.sleep(1000);
            new Thread(() -> {
                tryAcquire();
            }, "thread-1").start();
    
            Thread.sleep(10000);
            new Thread(() -> {
                tryAcquire();
            }, "thread-2").start();
    
            Thread.sleep(10000);
            new Thread(() -> {
                tryAcquire();
            }, "thread-3").start();
        }
    
        private static ReentrantLock lock = new ReentrantLock(true);
    
        private static void tryAcquire(){
            lock.lock();
            try {
                try {
                    Thread.sleep(20000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }finally {
                lock.unlock();
            }
        }
    
    }
    
    

    及图示(重点关注 waitStatus 状态改变),其中初始化时头尾节点是同一个节点,获取锁失败的线程进入到同步队列的队尾:

    ③ 依靠AQS的同步类比如ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等的实现

    ④ ReentrantLock如何实现公平和非公平锁?

    ReentrantLock lock = new ReentrantLock(false);//false为非公平锁,true为公平锁
    

    公平锁:在线程尝试获取公平锁时,先去判断队列中是否有前驱节点,如果没有前驱节点,尝试去获取一次锁,如果获取失败,把线程封装成节点放入队列中,然后该线程会进行最多3次自旋,判断线程是否在队列首部,如果在,尝试获取锁。在自旋次数耗尽后,线程仍未获取到锁,则线程被挂起。

    非公平锁:在线程尝试获取非公平锁时,直接尝试获取锁,如果没有获取到,就把该线程封装成节点放到队列中,然后该线程会进行最多3次自旋,判断线程是否在队列首部,如果在,尝试获取锁。如果在自旋次数耗尽后,线程仍未获取到锁,则线程被挂起。
    公平锁会比非公平锁多一个对队列中是否有前驱节点的判断。[^6]

    ⑤ CountDownLatch和CyclicBarrier的区别?各自适用于什么场景?

    这两个类都可以实现一组线程在到达某个条件之前进行等待,它们内部都有一个计数器,当计数器的值不断的减为0的时候所有阻塞的线程将会被唤醒。
    有区别的是CyclicBarrier的计数器由自己控制,而CountDownLatch的计数器则由使用者来控制,在CyclicBarrier中线程调用await方法不仅会将自己阻塞还会将计数器减1,而在CountDownLatch中线程调用await方法只是将自己阻塞而不会减少计数器的值。另外,CountDownLatch只能拦截一轮,而CyclicBarrier可以实现循环拦截。一般来说用CyclicBarrier可以实现CountDownLatch的功能,而反之则不能。[^7]

    参考文章

    [1] https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html
    [2] https://gee.cs.oswego.edu/dl/papers/aqs.pdf
    [3] https://www.cnblogs.com/dennyzhangdd/p/7218510.html
    [4] https://www.cnblogs.com/throwable/p/13369717.html
    [5] https://javadoop.com/post/AbstractQueuedSynchronizer-2
    [6]https://blog.csdn.net/m0_51958329/article/details/124323234
    [7] https://blog.csdn.net/qq_39241239/article/details/87030142

    相关文章

      网友评论

        本文标题:知识回顾|并发|AQS

        本文链接:https://www.haomeiwen.com/subject/zbmphdtx.html