AQS

作者: 代代代个码 | 来源:发表于2019-03-17 23:02 被阅读0次

      队列同步器(AbstractQueuedSynchronizer. AQS)在Java并发机制中占据非常基础且重要的地位,是用来构建各种锁和同步组件的基础框架。其内部通过维护一个线程共享的int成员变量来表示同步状态,通过内置的FIFO双向队列来调度和阻塞请求资源的线程,当一个线程尝试获取锁时,如果已经被占用,那么当前线程就会被构造成一个Node节点加入到同步队列的尾部,队列的头节点是成功获取锁的节点,当头节点线程释放锁时,会唤醒后面的节点并释放当前头节点的引用。
      AQS基于模板方法设计而成,它本身没有实现任何同步接口,仅仅定义出了一些获取和释放同步状态的方法供自定义同步组件使用,因此我们在自定义同步组件时需要继承队列同步器并且重写其指定的方法,然后将同步器组合在我们自己的组件中,并调用同步器所提供的模板方法,而这些模板方法内部会调用我们刚刚重写过的方法。
      AQS支持 独占式获取同步状态共享式获取同步状态 两种方式,基于这样的设计,我们就可以很方便地实现不同类型的同步组件来满足各种各样的需求,我们常用的锁和并发工具类都是基于队列同步器实现的,比如ReentrantLock、ReentrantReadWriteLock等锁,还有CountDownLatch、CyclicBarrier、Semaphore等工具类。


      那么队列同步器提供的方法具体有哪些呢?先来看看它对外提供的三个基本的原子方法:

    接口方法 描述
    getState() 用于获取当前同步状态
    setState(int newState) 用于设置新的同步状态
    compareAndSetState(int expect,
    int update)
    利用CAS设置新的同步状态,保证原子性

      上面的三个方法保证以原子操作的方式获取和修改同步状态,我们自定义的同步组件可以直接拿来使用。除此之外,同步器还给我们提供了几个抽象方法供我们实现,目的是实现刚才提到的独占式和共享式操作同步状态,对应到同步器的模板方法也同样分两种,以下是可重写的抽象方法:

    抽象方法 方法描述
    protected boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前同步状态并判断同步状态是否符合预期,然后CAS设置同步状态
    protected boolean tryRelease(int arg) 独占式释放同步状态,此时正在等待获取同步状态的线程将有机会获取同步状态
    protected int tryAcquireShard(int arg) 共享式获取同步状态,返回大于0的值表示获取成功,反之则获取失败
    protected boolean tryReleaseShard(int arg) 共享式释放同步状态
    protected boolean isHeldExclusively() 判断当前同步器是否在独占模式下被线程占有,一般该方法表示是否被当前线程独占

      在实现自定义同步组件时,我们可以调用同步器提供的模板方法,其中一些模板方法会调用我们刚才重写过的抽象方法:

    模板方法 方法描述
    void acquire(int arg) 独占式获取同步状态,如果获取成功,
    则由该方法返回;否则将会进入同步队列等待;
    该方法会调用我们重写过的tryAcquire(int arg)方法
    void acquireInterruptibly(int arg) 与acquire(int arg)相同,但是该方法会响应中断,
    即当前线程未获取到同步状态而进入等待队列,
    若当前线程中断,则该方法会抛出InterruptException
    boolean tryAcquireNanos
    (int arg, long nanos)
    在acquireInterruptibly(int arg)的基础上增加超时
    限制,线程在规定时间内获取到同步状态,返回
    true,否则返回false
    void acquireShared(int arg) 共享式的获取同步状态,和acquire(int arg)
    用法一致,不同点是允许多个线程同时获取同步状态
    void acquireSharedInterruptibly
    (int arg)
    与acquireShared(int arg)相同,但响应中断
    boolean tryAcquireSharedNanos
    (int arg, long nanos)
    在acquireSharedInterruptibly(int arg)的基础上
    增加超时限制
    boolean release(int arg) 独占式的释放同步状态,释放后会将同步队列
    中的首节点线程唤醒
    boolean releaseShared(int arg) 共享式的释放同步状态
    Collection<Thread> getQueuedThreads 获取等待队列中的线程集合

      从上面的模板方法可以看出,AQS提供给我们 独占式获取与释放同步状态、共享式获取与释放同步状态、查询同步队列中的等待线程集合 这3类方法。了解到AQS的大体框架后,下面进入真正的重点环节,我们结合源码来分析一下AQS的实现细节:

    1. 同步队列的实现
        我们上面已经提到AQS内部维护一个同步队列来实现对线程的阻塞和调度,现在我们来看一下同步队列的具体实现。首先队列是一个遵循FIFO的双向队列,若当前线程获取同步状态失败,同步器会将当前线程以及等待状态等信息构造成一个Node节点,然后将该节点放入同步队列中并阻塞当前线程;当首节点同步状态释放后,会把后继节点所代表的线程唤醒,让该线程重新再次去获取同步状态。
        Node节点中保存的信息有:当前线程的引用、等待状态、前驱结点引用、后继节点引用。具体属性名称和描述见下表:
    属性名称 描述
    volatile int waitStatus 等待状态:
    1.CANCELLED,值为1,由于在同步队列中等待的线程等待超时或者
    被中断,需要从同步队列中取消等待,该状态为终态,不会再变化
    2.SIGNAL,值为-1,指的是成功获取同步状态的节点,后继节点的线
    程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,
    将会通知后继节点,使后继节点得以运行
    3.CONDITION,值为-2,节点在等待队列中,节点线程等待在Condition
    上,当其他线程对Condition调用了signal()方法后,该节点将会由等待队列
    转移到同步队列,加入到同步状态的获取中
    4.PROPAGATE,值为-3,表示下一次共享式同步状态获取将会无条件
    地传播下去
    5.INITIAL,值为0,初始状态
    volatile Node prev 前驱结点,当前节点被加入同步队列时被设置(即尾部添加)
    volatile Node next 后继节点
    Node nextWaiter 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARED常量,即表示节点类型和等待队列中的后继节点共用同一个字段
    volatile Thread thread 当前需要获取同步状态的线程

      我们可以从图中直观感受一下同步队列的结构:


    同步队列数据结构

      如果线程获取同步状态失败,需要把线程新加入同步队列中,这个过程必须要保证线程安全,因此同步器提供了一个CAS设置尾节点的方法,需要指出当前线程锁认为的尾节点和当前节点:


    加入新的尾节点
      再来看看设置首节点的过程,首节点时获取同步状态成功的节点,首节点的线程在释放同步状态后会通知后继节点,后积极点将会在获取同步状态成功后将自己设置为新的首节点。由于每次只会由一个线程获取同步状态,所以这个过程不存在线程安全问题,不需要CAS算法:
    设置新的头节点
    1. 独占式同步状态的获取与释放
        独占式同步状态的获取使通过调用同步器的acquire(int arg)方法,该方法对中断不敏感,注意与acquireInterruptibly(int arg)的区别,即当前线程因获取同步状态失败而进入同步队列后,若对当前线程执行中断,该线程不会从队列中移出。继续来看acquire(int arg)方法的实现:
      acquire(int arg)

      可以看到首先会调用tryAcquire(int arg)方法尝试获取同步状态,还记得这个方法的用法吗?这是同步器接口留给我们自己实现的方法,该方法必须保证能够线程安全的去尝试获取同步状态;若获取同步状态失败,则调用addWaiter(...)方法构造出一个独占式(EXCLUSIVE)同步节点并加入到同步队列的尾部,最后调用 acquireQueued(...)方法执行队列中的逻辑,先看一下addWaiter(...)方法的实现:

    addWaiter(Node mode)
      上面的代码首先会用CAS的方式设置当前节点为尾节点,若尾节点为空,说明当前队列中没有节点,则执行enq(final Node node)方法,一起来看一下:
    enq(final Node node)
      可以看到如果判断到尾节点为空,则先构建新的头节点,这时尾节点即头节点;若尾节点存在则利用CAS设置尾节点。

      再来看一下acquireQueued(...)方法方法的代码: acquireQueued(final Node node, int arg)

      该方法是在addWaiter(...)之后执行,也就是说当前节点已经被加入到同步队列中,所以在这里可以看出该方法会使当前节点以轮询的方式不断尝试获取同步状态,获取的过程分两步,首先当前节点的前驱结点必须使首节点,然后再次调用tryAcquire(int arg)方法尝试获取同步状态,如果成功,则把当前节点设置为新的头节点并跳出轮询。

    1. 共享式同步状态的获取与释放
      。。。

      看到这里,我们可以说对AQS有了比较深入的了解,对AQS的介绍也可以告一段落,接下来我们可以深入地了解Java中的锁的实现和并发工具类的实现。

    相关文章

      网友评论

          本文标题:AQS

          本文链接:https://www.haomeiwen.com/subject/kkdpmqtx.html