AbstractQueuedSynchronizer(AQS)深

作者: SunnyMore | 来源:发表于2018-08-06 20:50 被阅读167次

    上一篇文章中我们对LockAbstractQueuedSynchronizer(AQS)有了初步的认识。在同步组件的实现中,AQS是核心部分,同步组件的实现者通过使用AQS提供的模板方法实现同步组件语义,AQS则实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等等一些底层的实现处理。AQS的核心也包括了这些方面:同步队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取这些特性的实现,而这些实际上则是AQS提供出来的模板方法,归纳整理如下:

    独占式锁:

    void acquire(int arg):独占式获取同步状态,如果获取失败则插入同步队列进行等待;
    void acquireInterruptibly(int arg):acquire方法相同,但在同步队列中进行等待的时候可以检测中断;
    boolean tryAcquireNanos(int arg, long nanosTimeout):acquireInterruptibly基础上增加了超时等待功能,在超时时间内没有获得同步状态返回false;
    boolean release(int arg):释放同步状态,该方法会唤醒在同步队列中的下一个节点

    共享式锁:

    void acquireShared(int arg):共享式获取同步状态,与独占式的区别在于同一时刻有多个线程获取同步状态;
    void acquireSharedInterruptibly(int arg):在acquireShared方法基础上增加了能响应中断的功能;
    boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly基础上增加了超时等待的功能;
    boolean releaseShared(int arg):共享式释放同步状态

    要想掌握AQS的底层实现,其实也就是对这些模板方法的逻辑进行学习。在学习这些模板方法之前,我们得首先了解下AQS中的同步队列是一种什么样的数据结构,因为同步队列是AQS对同步状态的管理的基石。

    2. 同步队列

    AQS提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。该同步器(以下简称同步器)利用了一个int来表示状态,期望它能够成为实现大部分同步需求的基础。使用的方法是继承,子类通过继承同步器并需要实现它的方法来管理其状态,管理的方式就是通过类似acquire和release的方式来操纵状态。然而多线程环境中对状态的操纵必须确保原子性,因此子类对于状态的把握,需要使用这个同步器提供的以下三个方法对状态进行操作:

    • java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()
    • java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)
    • java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)

    子类推荐被定义为自定义同步装置的内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干acquire之类的方法来供使用。该同步器即可以作为排他模式也可以作为共享模式,当它被定义为一个排他模式时,其他线程对其的获取就被阻止,而共享模式对于多个线程获取都可以成功。

    同步器是实现锁的关键,利用同步器将锁的语义实现,然后在锁的实现中聚合同步器。可以这样理解:锁的API是面向使用者的,它定义了与锁交互的公共行为,而每个锁需要完成特定的操作也是透过这些行为来完成的(比如:可以允许两个线程进行加锁,排除两个以上的线程),但是实现是依托给同步器来完成;同步器面向的是线程访问和资源控制,它定义了线程对资源是否能够获取以及线程的排队等操作。锁和同步器很好的隔离了二者所需要关注的领域,严格意义上讲,同步器可以适用于除了锁以外的其他同步设施上(包括锁)。
    同步器的开始提到了其实现依赖于一个FIFO队列,那么队列中的元素Node就是保存着线程引用和线程状态的容器,每个线程对同步器的访问,都可以看做是队列中的一个节点。Node的主要包含以下成员变量:

    Node {
        int waitStatus;
        Node prev;
        Node next;
        Node nextWaiter;
        Thread thread;
    }
    

    以上五个成员变量主要负责保存该节点的线程引用,同步等待队列(以下简称sync队列)的前驱和后继节点,同时也包括了同步状态。

    属性名称 描述
    int waitStatus 表示节点的状态。其中包含的状态有:1. CANCELLED,值为1,表示当前的线程被取消; 2. SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark; 3. CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中; 4. PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行; 5. 值为0,表示当前节点在sync队列中,等待着获取锁。
    Node prev 前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。
    Node next 后继节点。
    Node nextWaiter 存储condition队列中的后继节点。
    Thread thread 入队列时的当前线程。

    节点成为sync队列和condition队列构建的基础,在同步器中就包含了sync队列。同步器拥有三个成员变量:sync队列的头结点head、sync队列的尾节点tail和状态state。对于锁的获取,请求形成节点,将其挂载在尾部,而锁资源的转移(释放再获取)是从头部开始向后进行。对于同步器维护的状态state,多个线程对其的获取将会产生一个链式的结构。


    image

    现在我们知道了节点的数据结构类型,并且每个节点拥有其前驱和后继节点,很显然这是一个双向队列。同样的我们可以用一段demo看一下。

    public class LockDemo {
        private static ReentrantLock lock = new ReentrantLock();
    
        public static void main(String[] args) {
            for (int i = 0; i < 5; i++) {
                Thread thread = new Thread(() -> {
                    lock.lock();
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                });
                thread.start();
            }
        }
    }
    
    

    实例代码中开启了5个线程,先获取锁之后再睡眠10S中,实际上这里让线程睡眠是想模拟出当线程无法获取锁时进入同步队列的情况。通过debug,当Thread-4(在本例中最后一个线程)获取锁失败后进入同步时,AQS时现在的同步队列如图所示:

    LockDemo debug png

    Thread-0先获得锁后进行睡眠,其他线程(Thread-1,Thread-2,Thread-3,Thread-4)获取锁失败进入同步队列,同时也可以很清楚的看出来每个节点有两个域:prev(前驱)和next(后继),并且每个节点用来保存获取同步状态失败的线程引用以及等待状态等信息。另外AQS中有两个重要的成员变量:

    private transient volatile Node head;
    private transient volatile Node tail;
    
    

    也就是说AQS实际上通过头尾指针来管理同步队列,同时实现包括获取锁失败的线程进行入队,释放锁时对同步队列中的线程进行通知等核心方法。其示意图如下:

    png

    通过对源码的理解以及做实验的方式,现在我们可以清楚的知道这样几点:

    1. 节点的数据结构,即AQS的静态内部类Node,节点的等待状态等信息
    2. 同步队列是一个双向队列,AQS通过持有头尾指针管理同步队列

    那么,节点如何进行入队和出队是怎样做的了?实际上这对应着锁的获取和释放两个操作:获取锁失败进行入队操作,获取锁成功进行出队操作。

    3. 独占锁

    3.1 独占锁的获取(acquire方法)

    我们继续通过看源码和debug的方式来看,还是以上面的demo为例,调用lock()方法是获取独占式锁,获取失败就将当前线程加入同步队列,成功则线程执行。而lock()方法实际上会调用AQSacquire()方法,源码如下

    public final void acquire(int arg) {
            //先看同步状态是否获取成功,如果成功则方法结束返回
            //若失败则先调用addWaiter()方法再调用acquireQueued()方法
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
    }
    
    

    上述逻辑主要包括:

    1. 尝试获取(调用tryAcquire更改状态,需要保证原子性);
      在tryAcquire方法中使用了同步器提供的对state操作的方法,利用compareAndSet保证只有一个线程能够对状态进行成功修改,而没有成功修改的线程将进入sync队列排队。
    2. 如果获取不到,将当前线程构造成节点Node并加入sync队列;
      进入队列的每个线程都是一个节点Node,从而形成了一个双向队列,类似CLH队列,这样做的目的是线程间的通信会被限制在较小规模(也就是两个节点左右)。
    3. 再次尝试获取,如果没有获取到那么将当前线程从线程调度器上摘下,进入等待状态。

    总结: 使用LockSupport将当前线程unpark,关于LockSupport后续会详细介绍。关键信息请看注释,acquire根据当前获得同步状态成功与否做了两件事情:1. 成功,则方法结束返回,2. 失败,则先调用addWaiter()然后在调用acquireQueued()方法。

    获取同步状态失败,入队操作

    当线程获取独占式锁失败后就会将当前线程加入同步队列,那么加入队列的方式是怎样的了?我们接下来就应该去研究一下addWaiter()acquireQueued()addWaiter()源码如下:

    private Node addWaiter(Node mode) {
            // 1\. 将当前线程构建成Node类型
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            // 2\. 当前尾节点是否为null?
            Node pred = tail;
            if (pred != null) {
                // 2.2 将当前节点尾插入的方式插入同步队列中
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            // 2.1\. 当前同步队列尾节点为null,说明当前线程是第一个加入同步队列进行等待的线程
            enq(node);
            return node;
    }
    
    

    上述逻辑主要包括:

    1. 使用当前线程构造Node;
      对于一个节点需要做的是将当节点前驱节点指向尾节点(current.prev = tail),尾节点指向它(tail = current),原有的尾节点的后继节点指向它(t.next = current)而这些操作要求是原子的。上面的操作是利用尾节点的设置来保证的,也就是compareAndSetTail来完成的。
    2. 先行尝试在队尾添加;
      如果尾节点已经有了,然后做如下操作:
      (1)分配引用T指向尾节点;
      (2)将节点的前驱节点更新为尾节点(current.prev = tail);
      (3)如果尾节点是T,那么将当尾节点设置为该节点(tail = current,原子更新);
      (4)T的后继节点指向当前节点(T.next = current)。
      注意第3点是要求原子的。
      这样可以以最短路径O(1)的效果来完成线程入队,是最大化减少开销的一种方式。
    3. 如果队尾添加失败或者是第一个入队的节点。
      如果是第1个节点,也就是sync队列没有初始化,那么会进入到enq这个方法,进入的线程可能有多个,或者说在addWaiter中没有成功入队的线程都将进入enq这个方法。
      可以看到enq的逻辑是确保进入的Node都会有机会顺序的添加到sync队列中,而加入的步骤如下:
      (1)如果尾节点为空,那么原子化的分配一个头节点,并将尾节点指向头节点,这一步是初始化;
      (2)然后是重复在addWaiter中做的工作,但是在一个while(true)的循环中,直到当前节点入队为止。
      进入sync队列之后,接下来就是要进行锁的获取,或者说是访问控制了,只有一个线程能够在同一时刻继续的运行,而其他的进入等待状态。而每个线程都是一个独立的个体,它们自省的观察,当条件满足的时候(自己的前驱是头结点并且原子性的获取了状态),那么这个线程能够继续运行。
    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    //1\. 构造头结点
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    // 2\. 尾插入,CAS操作失败自旋尝试
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
    }
    
    

    在上面的分析中我们可以看出在第1步中会先创建头结点,说明同步队列是带头结点的链式存储结构。带头结点与不带头结点相比,会在入队和出队的操作中获得更大的便捷性,因此同步队列选择了带头结点的链式存储结构。那么带头节点的队列初始化时机是什么?自然而然是在tail为null时,即当前线程是第一次插入同步队列compareAndSetTail(t, node)方法会利用CAS操作设置尾节点,如果CAS操作失败会在for (;;)for死循环中不断尝试,直至成功return返回为止。因此,对enq()方法可以做这样的总结:

    1. 在当前线程是第一个加入同步队列时,调用compareAndSetHead(new Node())方法,完成链式队列的头结点的初始化
    2. 自旋不断尝试CAS尾插入节点直至成功为止

    现在我们已经很清楚获取独占式锁失败的线程包装成Node然后插入同步队列的过程了?那么紧接着会有下一个问题?在同步队列中的节点(线程)会做什么事情了来保证自己能够有机会获得独占式锁了?带着这样的问题我们就来看看acquireQueued()方法,从方法名就可以很清楚,这个方法的作用就是排队获取锁的过程,源码如下:

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    // 1\. 获得当前节点的先驱节点
                    final Node p = node.predecessor();
                    // 2\. 当前节点能否获取独占式锁                 
                    // 2.1 如果当前节点的先驱节点是头结点并且成功获取同步状态,即可以获得独占式锁
                    if (p == head && tryAcquire(arg)) {
                        //队列头指针用指向当前节点
                        setHead(node);
                        //释放前驱节点
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // 2.2 获取锁失败,线程进入等待状态等待获取独占式锁
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
    }
    
    

    上述逻辑主要包括:

    1. 获取当前节点的前驱节点;
      需要获取当前节点的前驱节点,而头结点所对应的含义是当前站有锁且正在运行。
    2. 当前驱节点是头结点并且能够获取状态,代表该当前节点占有锁;
      如果满足上述条件,那么代表能够占有锁,根据节点对锁占有的含义,设置头结点为当前节点。
    3. 否则进入等待状态。
      如果没有轮到当前节点运行,那么将当前线程从线程调度器上摘下,也就是进入等待状态。

    这里针对acquire做一下总结:

    1. 状态的维护;
      需要在锁定时,需要维护一个状态(int类型),而对状态的操作是原子和非阻塞的,通过同步器提供的对状态访问的方法对状态进行操纵,并且利用compareAndSet来确保原子性的修改。
    2. 状态的获取;
      一旦成功的修改了状态,当前线程或者说节点,就被设置为头节点。
    3. sync队列的维护。
      在获取资源未果的过程中条件不符合的情况下(不该自己,前驱节点不是头节点或者没有获取到资源)进入睡眠状态,停止线程调度器对当前节点线程的调度。
      这时引入的一个释放的问题,也就是说使睡眠中的Node或者说线程获得通知的关键,就是前驱节点的通知,而这一个过程就是释放,释放会通知它的后继节点从睡眠中返回准备运行。整体示意图为下图:
    png

    获取锁成功,出队操作

    获取锁的节点出队的逻辑是:

    //队列头结点引用指向当前节点
    setHead(node);
    //释放前驱节点
    p.next = null; // help GC
    failed = false;
    return interrupted;
    
    

    setHead()方法为:

    private void setHead(Node node) {
            head = node;
            node.thread = null;
            node.prev = null;
    }
    
    

    将当前节点通过setHead()方法设置为队列的头结点,然后将之前的头结点的next域设置为null并且pre域也为null,即与队列断开,无任何引用方便GC时能够将内存进行回收。示意图如下:

    png

    那么当获取锁失败的时候会调用shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法,看看他们做了什么事情。shouldParkAfterFailedAcquire()方法源码为:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    

    shouldParkAfterFailedAcquire()方法主要逻辑是使用compareAndSetWaitStatus(pred, ws, Node.SIGNAL)使用CAS将节点状态由INITIAL设置成SIGNAL,表示当前线程阻塞。当compareAndSetWaitStatus设置失败则说明shouldParkAfterFailedAcquire方法返回false,然后会在acquireQueued()方法中for (;;)死循环中会继续重试,直至compareAndSetWaitStatus设置节点状态位为SIGNALshouldParkAfterFailedAcquire返回true时才会执行方法parkAndCheckInterrupt()方法,该方法的源码为:

    private final boolean parkAndCheckInterrupt() {
            //使得该线程阻塞
            LockSupport.park(this);
            return Thread.interrupted();
    }
    
    

    该方法的关键是会调用LookSupport.park()方法(关于LookSupport会在以后的文章进行讨论),该方法是用来阻塞当前线程的。因此到这里就应该清楚了,acquireQueued()在自旋过程中主要完成了两件事情:

    1. 如果当前节点的前驱节点是头节点,并且能够获得同步状态的话,当前线程能够获得锁该方法执行结束退出
    2. 获取锁失败的话,先将节点状态设置成SIGNAL,然后调用LookSupport.park方法使得当前线程阻塞

    经过上面的分析,独占式锁的获取过程也就是acquire()方法的执行流程如下图所示:

    acquirepng

    3.2 独占锁的释放(release()方法)

    独占锁的释放就相对来说比较容易理解了,废话不多说先来看下源码:

    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
    }
    
    

    这段代码逻辑就比较容易理解了,如果同步状态释放成功(tryRelease返回true)则会执行if块中的代码,当head指向的头结点不为null,并且该节点的状态值不为0的话才会执行unparkSuccessor()方法。unparkSuccessor方法源码:

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
    
        //头节点的后继节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //后继节点不为null时唤醒该线程
            LockSupport.unpark(s.thread);
    }
    
    

    源码的关键信息请看注释,首先获取头节点的后继节点,当后继节点的时候会调用LookSupport.unpark()方法,该方法会唤醒该节点的后继节点所包装的线程。因此,每一次锁释放后就会唤醒队列中该节点的后继节点所引用的线程,从而进一步可以佐证获得锁的过程是一个FIFO(先进先出)的过程。

    到现在我们终于啃下了一块硬骨头了,通过学习源码的方式非常深刻的学习到了独占式锁的获取和释放的过程以及同步队列。可以做一下总结:

    1. 线程获取锁失败,线程被封装成Node进行入队操作,核心方法在于addWaiter()和enq(),同时enq()完成对同步队列的头结点初始化工作以及CAS操作失败的重试;
    2. 线程获取锁是一个自旋的过程,当且仅当 当前节点的前驱节点是头结点并且成功获得同步状态时,节点出队即该节点引用的线程获得锁,否则,当不满足条件时就会调用LookSupport.park()方法使得线程阻塞
    3. 释放锁的时候会唤醒后继节点;

    总体来说:在获取同步状态时,AQS维护一个同步队列,获取同步状态失败的线程会加入到队列中进行自旋;移除队列(或停止自旋)的条件是前驱节点是头结点并且成功获得了同步状态。在释放同步状态时,同步器会调用unparkSuccessor()方法唤醒后继节点。

    独占锁特性学习

    3.3 可中断式获取锁(acquireInterruptibly方法)

    我们知道lock相较于synchronized有一些更方便的特性,比如能响应中断以及超时等待等特性,现在我们依旧采用通过学习源码的方式来看看能够响应中断是怎么实现的。可响应中断式锁可调用方法lock.lockInterruptibly();而该方法其底层会调用AQSacquireInterruptibly方法,源码为:

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            //线程获取锁失败
            doAcquireInterruptibly(arg);
    }
    
    

    在获取同步状态失败后就会调用doAcquireInterruptibly方法:

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        //将节点插入到同步队列中
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                //获取锁出队
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //线程中断抛异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    

    关键信息请看注释,现在看这段代码就很轻松了吧:),与acquire方法逻辑几乎一致,唯一的区别是当parkAndCheckInterrupt返回true时即线程阻塞时该线程被中断,代码抛出被中断异常。

    3.4 超时等待式获取锁(tryAcquireNanos()方法)

    通过调用lock.tryLock(timeout,TimeUnit)方式达到超时等待获取锁的效果,该方法会在三种情况下才会返回:

    1. 在超时时间内,当前线程成功获取了锁;
    2. 当前线程在超时时间内被中断;
    3. 超时时间结束,仍未获得锁返回false。

    我们仍然通过采取阅读源码的方式来学习底层具体是怎么实现的,该方法会调用AQS的方法tryAcquireNanos(),源码为:

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            //实现超时等待的效果
            doAcquireNanos(arg, nanosTimeout);
    }
    
    

    很显然这段源码最终是靠doAcquireNanos方法实现超时等待的效果,该方法源码如下:

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        //1\. 根据超时时间和当前时间计算出截止时间
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                //2\. 当前线程获得锁出队列
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                // 3.1 重新计算超时时间
                nanosTimeout = deadline - System.nanoTime();
                // 3.2 已经超时返回false
                if (nanosTimeout <= 0L)
                    return false;
                // 3.3 线程阻塞等待 
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                // 3.4 线程被中断抛出被中断异常
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    

    程序逻辑如图所示:

    doAcquireNanos

    程序逻辑同独占锁可响应中断式获取基本一致,唯一的不同在于获取锁失败后,对超时时间的处理上,在第1步会先计算出按照现在时间和超时时间计算出理论上的截止时间,比如当前时间是8h10min,超时时间是10min,那么根据deadline = System.nanoTime() + nanosTimeout计算出刚好达到超时时间时的系统时间就是8h 10min+10min = 8h 20min。然后根据deadline - System.nanoTime()就可以判断是否已经超时了,比如,当前系统时间是8h 30min很明显已经超过了理论上的系统时间8h 20min,deadline - System.nanoTime()计算出来就是一个负数,自然而然会在3.2步中的If判断之间返回false。如果还没有超时即3.2步中的if判断为true时就会继续执行3.3步通过LockSupport.parkNanos使得当前线程阻塞,同时在3.4步增加了对中断的检测,若检测出被中断直接抛出被中断异常。

    4. 共享锁

    4.1 执行过程概述

    获取锁的过程:

    1. 当线程调用acquireShared()申请获取锁资源时,如果成功,则进入临界区。
    2. 当获取锁失败时,则创建一个共享类型的节点并进入一个FIFO等待队列,然后被挂起等待唤醒。
    3. 当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待。

    释放锁过程:

    1. 当线程调用releaseShared()进行锁资源释放时,如果释放成功,则唤醒队列中等待的节点,如果有的话。

    4.2 源码深入分析

    基于上面所说的共享锁执行流程,我们接下来看下源码实现逻辑:
    首先来看下获取锁的方法acquireShared(),如下

       public final void acquireShared(int arg) {
            //尝试获取共享锁,返回值小于0表示获取失败
            if (tryAcquireShared(arg) < 0)
                //执行获取锁失败以后的方法
                doAcquireShared(arg);
        }
    

    这里tryAcquireShared()方法是留给用户去实现具体的获取锁逻辑的。关于该方法的实现有两点需要特别说明:

    1. 该方法必须自己检查当前上下文是否支持获取共享锁,如果支持再进行获取。

    2. 该方法返回值是个重点。其一、由上面的源码片段可以看出返回值小于0表示获取锁失败,需要进入等待队列。其二、如果返回值等于0表示当前线程获取共享锁成功,但它后续的线程是无法继续获取的,也就是不需要把它后面等待的节点唤醒。最后、如果返回值大于0,表示当前线程获取共享锁成功且它后续等待的节点也有可能继续获取共享锁成功,也就是说此时需要把后续节点唤醒让它们去尝试获取共享锁。

    有了上面的约定,我们再来看下doAcquireShared方法的实现:

    //参数不多说,就是传给acquireShared()的参数
    private void doAcquireShared(int arg) {
        //添加等待节点的方法跟独占锁一样,唯一区别就是节点类型变为了共享型,不再赘述
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //表示前面的节点已经获取到锁,自己会尝试获取锁
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    //注意上面说的, 等于0表示不用唤醒后继节点,大于0需要
                    if (r >= 0) {
                        //这里是重点,获取到锁以后的唤醒操作,后面详细说
                        setHeadAndPropagate(node, r);
                        p.next = null;
                        //如果是因为中断醒来则设置中断标记位
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //挂起逻辑跟独占锁一样,不再赘述
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //获取失败的取消逻辑跟独占锁一样,不再赘述
            if (failed)
                cancelAcquire(node);
        }
    }
    

    独占锁模式获取成功以后设置头结点然后返回中断状态,结束流程。而共享锁模式获取成功以后,调用了setHeadAndPropagate方法,从方法名就可以看出除了设置新的头结点以外还有一个传递动作,一起看下代码:

        //两个入参,一个是当前成功获取共享锁的节点,一个就是tryAcquireShared方法的返回值,注意上面说的,它可能大于0也可能等于0
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; //记录当前头节点
            //设置新的头节点,即把当前获取到锁的节点设置为头节点
            //注:这里是获取到锁之后的操作,不需要并发控制
            setHead(node);
            //这里意思有两种情况是需要执行唤醒操作
            //1.propagate > 0 表示调用方指明了后继节点需要被唤醒
            //2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                //如果当前节点的后继节点是共享类型或者没有后继节点,则进行唤醒
                //这里可以理解为除非明确指明不需要唤醒(后继等待节点是独占类型),否则都要唤醒
                if (s == null || s.isShared())
                    //后面详细说
                    doReleaseShared();
            }
        }
    
        private void setHead(Node node) {
            head = node;
            node.thread = null;
            node.prev = null;
        }
    

    最终的唤醒操作也很复杂,专门拿出来分析一下:
    注:这个唤醒操作在releaseShare()方法里也会调用。

    private void doReleaseShared() {
            for (;;) {
                //唤醒操作由头结点开始,注意这里的头节点已经是上面新设置的头结点了
                //其实就是唤醒上面新获取到共享锁的节点的后继节点
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    //表示后继节点需要被唤醒
                    if (ws == Node.SIGNAL) {
                        //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;      
                        //执行唤醒操作      
                        unparkSuccessor(h);
                    }
                    //如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                
                }
                //如果头结点没有发生变化,表示设置完成,退出循环
                //如果头结点发生变化,比如说其他线程获取到了锁,为了使自己的唤醒动作可以传递,必须进行重试
                if (h == head)                   
                    break;
            }
        }
    

    接下来看下释放共享锁的过程:

    public final boolean releaseShared(int arg) {
            //尝试释放共享锁
            if (tryReleaseShared(arg)) {
                //唤醒过程,详情见上面分析
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    注:上面的setHeadAndPropagate()方法表示等待队列中的线程成功获取到共享锁,这时候它需要唤醒它后面的共享节点(如果有),但是当通过releaseShared()方法去释放一个共享锁的时候,接下来等待独占锁跟共享锁的线程都可以被唤醒进行尝试获取。

    4.3 总结

    跟独占锁相比,共享锁的主要特征在于当一个在等待队列中的共享节点成功获取到锁以后(它获取到的是共享锁),既然是共享,那它必须要依次唤醒后面所有可以跟它一起共享当前锁资源的节点,毫无疑问,这些节点必须也是在等待共享锁(这是大前提,如果等待的是独占锁,那前面已经有一个共享节点获取锁了,它肯定是获取不到的)。当共享锁被释放的时候,可以用读写锁为例进行思考,当一个读锁被释放,此时不论是读锁还是写锁都是可以竞争资源的。

    5. 总结:

    如果获取共享锁失败后,将请求共享锁的线程封装成Node对象放入AQS的队列中,并挂起Node对象对应的线程,实现请求锁线程的等待操作。待共享锁可以被获取后,从头节点开始,依次唤醒头节点及其以后的所有共享类型的节点。实现共享状态的传播。这里有几点值得注意:
    1. 与AQS的独占功能一样,共享锁是否可以被获取的判断为空方法,交由子类去实现。
    2. 与AQS的独占功能不同,当锁被头节点获取后,独占功能是只有头节点获取锁,其余节点的线程继续沉睡,等待锁被释放后,才会唤醒下一个节点的线程,而共享功能是只要头节点获取锁成功,就在唤醒自身节点对应的线程的同时,继续唤醒AQS队列中的下一个节点的线程,每个节点在唤醒自身的同时还会唤醒下一个节点对应的线程,以实现共享状态的“向后传播”,从而实现共享功能。

    以上的分析都是从AQS子类的角度去看待AQS的部分功能的,而如果直接看待AQS,或许可以这么去解读:
    首先,AQS并不关心“是什么锁”,对于AQS来说它只是实现了一系列的用于判断“资源”是否可以访问的API,并且封装了在“访问资源”受限时将请求访问的线程的加入队列、挂起、唤醒等操作,AQS只关心“资源不可以访问时,怎么处理?”、“资源是可以被同时访问,还是在同一时间只能被一个线程访问?”、“如果有线程等不及资源了,怎么从AQS的队列中退出?”等一系列围绕资源访问的问题,而至于“资源是否可以被访问?”这个问题则交给AQS的子类去实现。
    AQS的子类是实现独占功能时,例如ReentrantLock,“资源是否可以被访问”被定义为只要AQS的state变量不为0,并且持有锁的线程不是当前线程,则代表资源不能访问。
    AQS的子类是实现共享功能时,例如:CountDownLatch,“资源是否可以被访问”被定义为只要AQSstate变量不为0,说明资源不能访问。这是典型的将规则和操作分开的设计思路:规则子类定义,操作逻辑因为具有公用性,放在父类中去封装。当然,正式因为AQS只是关心“资源在什么条件下可被访问”,所以子类还可以同时使用AQS的共享功能和独占功能的API以实现更为复杂的功能。
    比如:ReentrantReadWriteLock,我们知道ReentrantReadWriteLock的中也有一个叫Sync的内部类继承了AQS,而AQS的队列可以同时存放共享锁和独占锁,对于ReentrantReadWriteLock来说分别代表读锁和写锁,当队列中的头节点为读锁时,代表读操作可以执行,而写操作不能执行,因此请求写操作的线程会被挂起,当读操作依次推出后,写锁成为头节点,请求写操作的线程被唤醒,可以执行写操作,而此时的读请求将被封装成Node放入AQS的队列中。如此往复,实现读写锁的读写交替进行。

    参考文献

    《java并发编程的艺术》

    相关文章

      网友评论

        本文标题:AbstractQueuedSynchronizer(AQS)深

        本文链接:https://www.haomeiwen.com/subject/otygvftx.html