美文网首页Java 杂谈
JUC同步器框架AbstractQueuedSynchroniz

JUC同步器框架AbstractQueuedSynchroniz

作者: zhrowable | 来源:发表于2019-04-07 13:18 被阅读4次

    JUC同步器框架AbstractQueuedSynchronizer源码图文分析

    前提

    Doug Lea大神在编写JUC(java.util.concurrent)包的时候引入了java.util.concurrent.locks.AbstractQueuedSynchronizer,Abstract Queued Synchronizer,也就是"基于队列实现的抽象同步器",一般我们称之为AQS。其实Doug Lea大神编写AQS是有严谨的理论基础的,他的个人博客上有一篇论文《The java.util.concurrent Synchronizer Framework》,文章在http://ifeve.com上可以找到相关的译文(《JUC同步器框架》),如果想要深入研究AQS必须要理解一下该论文的内容,然后详细分析一下AQS的源码实现。本文在阅读AQS源码的时候选用的JDK版本是JDK11。

    AQS的主要功能

    AQS是JUC包中用于构建锁或者其他同步组件(信号量、事件等)的基础框架类。AQS从它的实现上看主要提供了下面的功能:

    • 同步状态的原子性管理。
    • 线程的阻塞和解除阻塞。
    • 提供阻塞线程的存储队列。

    基于这三大功能,衍生出下面的附加功能:

    • 通过中断实现的任务取消,基于线程中断实现。
    • 可选的超时设置,也就是调用者可以选择放弃等待。
    • 定义了Condition接口,用于支持管程形式的await/signal/signalAll操作,代替了Object类基于JNI提供的wait/notify/notifyAll。

    AQS还根据同步状态的不同管理方式区分为两种不同的实现:独占状态的同步器共享状态的同步器

    JUC同步器框架原理

    The java.util.concurrent Synchronizer Framework》一文中其实有提及到同步器框架的伪代码:

    // acquire操作如下:
    while (synchronization state does not allow acquire) {
        enqueue current thread if not already queued;
        possibly block current thread;
    }
    dequeue current thread if it was queued;
    
    //release操作如下:
    update synchronization state;
    if (state may permit a blocked thread to acquire){
        unblock one or more queued threads;
    }
    

    翻译一下:

    // acquire操作如下:
    while(同步状态申请获取失败){
        if(当前线程未进入等待队列){
            当前线程放入等待队列;
        }
        尝试阻塞当前线程;
    }
    当前线程移出等待队列
    
    //release操作如下:
    更新同步状态
    if(同步状态足够允许一个阻塞的线程申请获取){
        解除一个或者多个等待队列中的线程的阻塞状态;
    }
    

    为了实现上述操作,需要下面三个基本组件的相互协作:

    • 同步状态的原子性管理。
    • 等待队列的管理。
    • 线程的阻塞与解除阻塞。

    其实基本原理很简单,但是为了应对复杂的并发场景和并发场景下程序执行的正确性,同步器框架在上面的acquire操作和release操作中使用了死循环和CAS等操作,很多时候会让人感觉逻辑过于复杂。

    同步状态管理

    AQS内部内部定义了一个32位整型的state变量用于保存同步状态:

    /**
     * The synchronization state.
     */
    private volatile int state;
    
    // 获取state
    protected final int getState() {
        return state;
    }
    
    // 直接覆盖设置state
    protected final void setState(int newState) {
        state = newState;
    }
    
    // CAS设置state
    protected final boolean compareAndSetState(int expect, int update) {
        return STATE.compareAndSet(this, expect, update);
    }
    

    同步状态state在不同的实现中可以有不同的作用或者表示意义,它可以代表资源数、锁状态等等,遇到具体的场景我们再分析它表示的意义。

    CLH队列变体

    CLH锁即Craig, Landin, and Hagersten (CLH) locks,因为它底层是基于队列实现,一般也称为CLH队列锁。CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。从实现上看,CLH锁是一种自旋锁,能确保无饥饿性,提供先来先服务的公平性。先看简单的CLH锁的一个简单实现:

    public class CLHLock implements Lock {
    
        AtomicReference<QueueNode> tail = new AtomicReference<>(new QueueNode());
    
        ThreadLocal<QueueNode> pred;
        ThreadLocal<QueueNode> current;
    
        public CLHLock() {
            current = ThreadLocal.withInitial(QueueNode::new);
            pred = ThreadLocal.withInitial(() -> null);
        }
    
        @Override
        public void lock() {
            QueueNode node = current.get();
            node.locked = true;
            QueueNode pred = tail.getAndSet(node);
            this.pred.set(pred);
            while (pred.locked) {
            }
        }
    
        @Override
        public void unlock() {
            QueueNode node = current.get();
            node.locked = false;
            current.set(this.pred.get());
        }
    
        static class QueueNode {
    
            boolean locked;
        }
    
        // 忽略其他接口方法的实现
    }   
    

    上面是一个简单的CLH队列锁的实现,内部类QueueNode只使用了一个简单的布尔值locked属性记录了每个线程的状态,如果该属性为true,则相应的线程要么已经获取到锁,要么正在等待锁,如果该属性为false,则相应的线程已经释放了锁。新来的想要获取锁的线程必须对tail属性调用getAndSet()方法,使得自身成为队列的尾部,同时得到一个指向前驱节点的引用pred,最后线程所在节点在其前驱节点的locked属性上自旋,值得前驱节点释放锁。上面的实现是无法运行的,因为一旦自旋就会进入死循环导致CPU飙升,可以尝试使用下面将要提到的LockSupport进行改造。

    CLH队列锁本质是使用队列(实际上是单向链表)存放等待获取锁的线程,等待的线程总是在其所在节点的前驱节点的状态上自旋,直到前驱节点释放资源。从实际来看,过度自旋带来的CPU性能损耗比较大,并不是理想的线程等待队列实现

    基于原始的CLH队列锁中提供的等待队列的基本原理,AQS实现一种了CLH锁队列的变体(variant)AQS类的protected修饰的构造函数里面有一大段注释用于说明AQS实现的等待队列的细节事项,这里列举几点重要的:

    • AQS实现的等待队列没有直接使用CLH锁队列,但是参考了其设计思路,等待节点会保存前驱节点中线程的信息,内部也会维护一个控制线程阻塞的状态值。
    • 每个节点都设计为一个持有单独的等待线程并且"带有具体的通知方式"的监视器,这里所谓通知方式就是自定义唤醒阻塞线程的方式而已。
    • 一个线程是等待队列中的第一个等待节点的持有线程会尝试获取锁,但是并不意味着它一定能够获取锁成功(这里的意思是存在公平和非公平的实现),获取失败就要重新等待。
    • 等待队列中的节点通过prev属性连接前驱节点,通过next属性连接后继节点,简单来说,就是双向链表的设计。
    • CLH队列本应该需要一个虚拟的头节点,但是在AQS中没有直接提供虚拟的头节点,而是延迟到第一次竞争出现的时候懒创建虚拟的头节点(其实也会创建尾节点,初始化时头尾节点是同一个节点)。
    • Condition(条件)等待队列中的阻塞线程使用的是相同的Node结构,但是提供了另一个链表用来存放,Condition等待队列的实现比非Condition等待队列复杂。

    线程阻塞与唤醒

    线程的阻塞和唤醒在JDK1.5之前,一般只能依赖于Object类提供的wait()notify()notifyAll()方法,它们都是JNI方法,由JVM提供实现,并且它们必须运行在获取监视器锁的代码块内(synchronized代码块中),这个局限性先不谈性能上的问题,代码的简洁性和灵活性是比较低的。JDK1.5引入了LockSupport类,底层是基于Unsafe类的park()unpark()方法,提供了线程阻塞和唤醒的功能,它的机制有点像只有一个允许使用资源的信号量java.util.concurrent.Semaphore,也就是一个线程只能通过park()方法阻塞一次,只能调用unpark()方法解除调用阻塞一次,线程就会唤醒(多次调用unpark()方法也只会唤醒一次),可以想象是内部维护了一个0-1的计数器。

    LockSupport类如果使用得好,可以提供更灵活的编码方式,这里举个简单的使用例子:

    public class LockSupportMain implements Runnable {
    
        private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
    
        private Thread thread;
    
        private void setThread(Thread thread) {
            this.thread = thread;
        }
    
        public static void main(String[] args) throws Exception {
            LockSupportMain main = new LockSupportMain();
            Thread thread = new Thread(main, "LockSupportMain");
            main.setThread(thread);
            thread.start();
            Thread.sleep(2000);
            main.unpark();
            Thread.sleep(2000);
        }
    
        @Override
        public void run() {
            System.out.println(String.format("%s-步入run方法,线程名称:%s", FORMATTER.format(LocalDateTime.now()),
                    Thread.currentThread().getName()));
            LockSupport.park();
            System.out.println(String.format("%s-解除阻塞,线程继续执行,线程名称:%s", FORMATTER.format(LocalDateTime.now()),
                    Thread.currentThread().getName()));
        }
    
        private void unpark() {
            LockSupport.unpark(thread);
        }
    }
    // 某个时刻的执行结果如下:
    2019-02-25 00:39:57.780-步入run方法,线程名称:LockSupportMain
    2019-02-25 00:39:59.767-解除阻塞,线程继续执行,线程名称:LockSupportMain
    

    LockSupportpark()方法也有带超时的变体版本方法,有些适合使用阻塞超时的场景不妨可以使用。

    独占线程的保存

    AbstractOwnableSynchronizerAQS的父类,一个同步器框架有可能在一个时刻被某一个线程独占,AbstractOwnableSynchronizer就是为所有的同步器实现和锁相关实现提供了基础的保存、获取和设置独占线程的功能,这个类的源码很简单:

    public abstract class AbstractOwnableSynchronizer
        implements java.io.Serializable {
    
        private static final long serialVersionUID = 3737899427754241961L;
    
        protected AbstractOwnableSynchronizer() { }
    
        private transient Thread exclusiveOwnerThread;
    
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
    
        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    }
    

    它就提供了一个保存独占线程的变量对应的Setter和Getter方法,方法都是final修饰的,子类只能使用不能覆盖。

    CLH队列变体的实现

    这里先重点分析一下AQS中等待队列的节点AQS$Node的源码:

    static final class Node {
       // 标记一个节点处于共享模式下的等待
       static final Node SHARED = new Node();
       // 标记一个节点处于独占模式下的等待
       static final Node EXCLUSIVE = null;
       // 取消状态
       static final int CANCELLED =  1;
       // 唤醒状态
       static final int SIGNAL    = -1;
       // 条件等待状态
       static final int CONDITION = -2;
       // 传播状态
       static final int PROPAGATE = -3;
       // 等待状态,初始值为0,其他可选值是上面的4个值
       volatile int waitStatus;
       // 当前节点前驱节点的引用
       volatile Node prev;
       // 当前节点后继节点的引用
       volatile Node next;
       // 当前节点持有的线程,可能是阻塞中等待唤醒的线程
       volatile Thread thread;
       // 下一个等待节点
       Node nextWaiter;
       // 当前操作的节点是否处于共享模式
       final boolean isShared() {
          return nextWaiter == SHARED;
       }
       // 获取当前节点的前驱节点,确保前驱节点必须存在,否则抛出NPE  
       final Node predecessor() {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        
        // 空节点,主要是首次创建队列的时候创建的头和尾节点使用
        Node() {}
    
        // 设置下一个等待节点,设置持有线程为当前线程
        Node(Node nextWaiter) {
            this.nextWaiter = nextWaiter;
            THREAD.set(this, Thread.currentThread());
        }
    
        // 设置waitStatus,设置持有线程为当前线程
        Node(int waitStatus) {
            WAITSTATUS.set(this, waitStatus);
            THREAD.set(this, Thread.currentThread());
        }
    
        // CAS更新waitStatus  
        final boolean compareAndSetWaitStatus(int expect, int update) {
            return WAITSTATUS.compareAndSet(this, expect, update);
        }
        // CAS设置后继节点
        final boolean compareAndSetNext(Node expect, Node update) {
            return NEXT.compareAndSet(this, expect, update);
        }
        // 设置前驱节点
        final void setPrevRelaxed(Node p) {
            PREV.set(this, p);
        }
    
        // 下面是变量句柄的实现,在VarHandle出现之前使用的是Unsafe,其实底层还是照样使用Unsafe
        private static final VarHandle NEXT;
        private static final VarHandle PREV;
        private static final VarHandle THREAD;
        private static final VarHandle WAITSTATUS;
        static {
            try {
                MethodHandles.Lookup l = MethodHandles.lookup();
                NEXT = l.findVarHandle(Node.class, "next", Node.class);
                PREV = l.findVarHandle(Node.class, "prev", Node.class);
                THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
                WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
            } catch (ReflectiveOperationException e) {
                throw new ExceptionInInitializerError(e);
            }
        }     
    }   
    

    其中,变量句柄(VarHandle)是JDK9引用的新特性,其实底层依赖的还是Unsafe的方法,总体和JDK8的实现是基本一致。这里需要关注一下Node里面的几个属性:

    • waitStatus:当前Node实例的等待状态,可选值有5个。
      1. 初始值整数0:当前节点如果不指定初始化状态值,默认值就是0,侧面说明节点正在等待队列中处于等待状态。
      2. Node#CANCELLED整数值1:表示当前节点实例因为超时或者线程中断而被取消,等待中的节点永远不会处于此状态,被取消的节点中的线程实例不会阻塞。
      3. Node#SIGNAL整数值-1:表示当前节点的后继节点是(或即将是)阻塞的(通过park),当它释放或取消时,当前节点必须unpark它的后继节点。
      4. Node#CONDITION整数值-2:表示当前节点是条件队列中的一个节点,当它转换为同步队列中的节点的时候,状态会被重新设置为0。
      5. Node#PROPAGATE整数值-3:此状态值通常只设置到调用了doReleaseShared()方法的头节点,确保releaseShared()方法的调用可以传播到其他的所有节点,简单理解就是共享模式下节点释放的传递标记。
    • prev、next:当前Node实例的前驱节点引用和后继节点引用。
    • thread:当前Node实例持有的线程实例引用。
    • nextWaiter:这个值是一个比较容易令人生疑的值,虽然表面上它称为"下一个等待的节点",但是实际上它有三种取值的情况。
      1. 值为静态实例Node.EXCLUSIVE(也就是null),代表当前的Node实例是独占模式。
      2. 值为静态实例Node.SHARED,代表当前的Node实例是共享模式。
      3. 值为非Node.EXCLUSIVENode.SHARED的其他节点实例,代表Condition等待队列中当前节点的下一个等待节点

    Node类的等待状态waitStatus理解起来是十分费劲的,下面分析其他源码的时候会标识此状态变化的时机。

    其实上面的Node类可以直接拷贝出来当成一个新建的类,然后尝试构建一个双向链表自行调试,这样子就能深刻它的数据结构。例如:

    public class AqsNode {
    
        static final AqsNode SHARED = new AqsNode();
        static final AqsNode EXCLUSIVE = null;
    
        static final int CANCELLED = 1;
        static final int SIGNAL = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
    
        volatile int waitStatus;
    
        volatile AqsNode prev;
    
        volatile AqsNode next;
    
        volatile Thread thread;
    
        AqsNode nextWaiter;
    
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    
        final AqsNode predecessor() {
            AqsNode p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    
        AqsNode() {
        }
    
        AqsNode(AqsNode nextWaiter) {
            this.nextWaiter = nextWaiter;
            THREAD.set(this, Thread.currentThread());
        }
    
        AqsNode(int waitStatus) {
            WAITSTATUS.set(this, waitStatus);
            THREAD.set(this, Thread.currentThread());
        }
    
        final boolean compareAndSetWaitStatus(int expect, int update) {
            return WAITSTATUS.compareAndSet(this, expect, update);
        }
    
        final boolean compareAndSetNext(AqsNode expect, AqsNode update) {
            return NEXT.compareAndSet(this, expect, update);
        }
    
        final void setPrevRelaxed(AqsNode p) {
            PREV.set(this, p);
        }
    
        private static final VarHandle NEXT;
        private static final VarHandle PREV;
        private static final VarHandle THREAD;
        private static final VarHandle WAITSTATUS;
    
        static {
            try {
                MethodHandles.Lookup l = MethodHandles.lookup();
                NEXT = l.findVarHandle(AqsNode.class, "next", AqsNode.class);
                PREV = l.findVarHandle(AqsNode.class, "prev", AqsNode.class);
                THREAD = l.findVarHandle(AqsNode.class, "thread", Thread.class);
                WAITSTATUS = l.findVarHandle(AqsNode.class, "waitStatus", int.class);
            } catch (ReflectiveOperationException e) {
                throw new ExceptionInInitializerError(e);
            }
        }
    
        public static void main(String[] args) throws Exception {
            AqsNode head = new AqsNode();
            AqsNode next = new AqsNode(AqsNode.EXCLUSIVE);
            head.next = next;
            next.prev = head;
            AqsNode tail = new AqsNode(AqsNode.EXCLUSIVE);
            next.next = tail;
            tail.prev = next;
            List<Thread> threads = new ArrayList<>();
            for (AqsNode node = head; node != null; node = node.next) {
                threads.add(node.thread);
            }
            System.out.println(threads);
        }
    }
    // 某次执行的输出:
    [null, Thread[main,5,main], Thread[main,5,main]]
    

    实际上,AQS中一共存在两种等待队列,其中一种是普通的同步等待队列,这里命名为Sync-Queue,另一种是基于Sync-Queue实现的条件等待队列,这里命名为Condition-Queue。

    Sync-Queue

    前面已经介绍完AQS的同步等待队列节点类,下面重点分析一下同步等待队列的相关源码,下文的Sync队列、同步队列和同步等待队列是同一个东西。首先,我们通过分析Node节点得知Sync队列一定是双向链表,AQS中有两个瞬时成员变量用来存放头节点和尾节点:

    // 头节点引用
    private transient volatile Node head;
    // 尾节点引用
    private transient volatile Node tail;
    
    // 变量句柄相关,用于CAS操作头尾节点
    private static final VarHandle STATE;
    private static final VarHandle HEAD;
    private static final VarHandle TAIL;
    
    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
            HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);
            TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);
        } catch (ReflectiveOperationException e) {
                throw new ExceptionInInitializerError(e);
        }
        // 确保LockSupport类已经初始化 - 这里应该是为了修复之前一个因为LockSupport未初始化导致的BUG
        Class<?> ensureLoaded = LockSupport.class;
    }
    
    // 初始化同步队列,注意初始化同步队列的时候,头尾节点都是指向同一个新的Node实例
    private final void initializeSyncQueue() {
        Node h;
        if (HEAD.compareAndSet(this, null, (h = new Node())))
            tail = h;
    }
    
    // CAS设置同步队列的尾节点
    private final boolean compareAndSetTail(Node expect, Node update) {
        return TAIL.compareAndSet(this, expect, update);
    }
    
    // 设置头节点,重点注意这里:传入的节点设置成头节点之后,前驱节点和持有的线程会置为null,这是因为:
    // 1.头节点一定没有前驱节点。
    // 2.当节点被设置为头节点,它所在的线程一定是已经解除了阻塞。
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    

    当前线程加入同步等待队列和同步等待队列的初始化是同一个方法,前文提到过:同步等待队列的初始化会延迟到第一次可能出现竞争的情况,这是为了避免无谓的资源浪费,具体方法是addWaiter(Node mode)

    // 添加等待节点到同步等待队列,实际上初始化队列也是这个方法完成的
    private Node addWaiter(Node mode) {
        // 基于当前线程创建一个新节点,节点的模式由调用者决定
        Node node = new Node(mode);
        for (;;) {
            Node oldTail = tail;
           // 尾节点不为空说明队列已经初始化过,则把新节点加入到链表中,作为新的尾节点,建立和前驱节点的关联关系
            if (oldTail != null) {
                node.setPrevRelaxed(oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return node;
                }
            } else {
            // 尾节点为空说明队列尚未初始化过,进行一次初始化操作
                initializeSyncQueue();
            }
        }
    }
    

    在首次调用addWaiter()方法,死循环至少执行两轮再跳出,因为同步队列必须初始化完成后(第一轮循环),然后再把当前线程所在的新节点实例添加到等待队列中再返回(第二轮循环)当前的节点,这里需要注意的是新加入同步等待队列的节点一定是添加到队列的尾部并且会更新AQS中的tail属性为最新入队的节点实例

    假设我们使用Node.EXCLUSIVE模式入队列,手上有三个线程分别是thread-1、thread-2和thread-3,线程入队的时候都处于阻塞状态,模拟一下依次调用上面的入队方法的同步队列的整个链表的状态。

    先是线程thread-1加入等待队列:

    [图片上传失败...(image-46298-1554614211606)]

    接着是线程thread-2加入等待队列:

    [图片上传失败...(image-114022-1554614211606)]

    最后是线程thread-3加入等待队列:

    [图片上传失败...(image-303488-1554614211606)]

    如果仔细研究会发现,如果所有的入队线程都处于阻塞状态的话,新入队的线程总是添加到队列的tail节点,阻塞的线程总是"争抢"着成为head节点,这一点和CLH队列锁的阻塞线程总是基于前驱节点自旋以获取锁的思路是一致的。下面将会分析的独占模式与共享模式,线程加入等待队列都是通过addWaiter()方法

    Condition-Queue

    前面已经相对详细地介绍过同步等待队列,在AQS中还存在另外一种相对特殊和复杂的等待队列-条件等待队列。介绍条件等待队列之前,要先介绍java.util.concurrent.locks.Condition接口。

    public interface Condition {
        
        // 当前线程进入等待状态直到被唤醒或者中断
        void await() throws InterruptedException;
        // 当前线程进入等待状态,不响应中断,阻塞直到被唤醒
        void awaitUninterruptibly();
        // 当前线程进入等待状态直到被唤醒或者中断,阻塞带时间限制
        long awaitNanos(long nanosTimeout) throws InterruptedException;
        // 当前线程进入等待状态直到被唤醒或者中断,阻塞带时间限制
        boolean await(long time, TimeUnit unit) throws InterruptedException;
        // 当前线程进入等待状态直到被唤醒或者中断,阻塞带时间限制
        boolean awaitUntil(Date deadline) throws InterruptedException;
        // 唤醒单个阻塞线程
        void signal();
        // 唤醒所有阻塞线程
        void signalAll();
    }    
    

    Condition可以理解为Object中的wait()notify()notifyAll()的替代品,因为Object中的相应方法是JNI(Native)方法,由JVM实现,对使用者而言并不是十分友好(可能需要感知JVM的源码实现),而Condition是基于数据结构和相应算法实现对应的功能,我们可以从源码上分析其实现。

    Condition的实现类是AQS的公有内部类ConditionObjectConditionObject提供的入队列方法如下:

    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */ - 条件队列的第一个节点
        private transient Node firstWaiter;
        /** Last node of condition queue. */ - 条件队列的最后一个节点
        private transient Node lastWaiter;
        // 公有构造函数
        public ConditionObject() { }
        // 添加条件等待节点
        private Node addConditionWaiter() {
            // 这里做一次判断,当前线程必须步入此同步器实例
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 临时节点t赋值为lastWaiter引用
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            // 最后一个节点不为条件等待状态,则是取消状态
            if (t != null && t.waitStatus != Node.CONDITION) {
                // 解除所有取消等待的节点的连接
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            // 基于当前线程新建立一个条件等待类型的节点
            Node node = new Node(Node.CONDITION);
            // 首次创建Condition的时候,最后一个节点临时引用t为null,则把第一个节点置为新建的节点
            if (t == null)
                firstWaiter = node;
            else
                // 已经存在第一个节点,则通过nextWaiter连接新的节点
                t.nextWaiter = node;
            // 最后一个节点的引用更新为新节点的引用    
            lastWaiter = node;
            return node;
        } 
        // 从条件等待队列解除所有取消等待的节点的连接,其实就是所有取消节点移除的操作,涉及到双向链表的断链操作、第一个和最后一个节点的引用更新
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                // 注意这里等待状态的判断
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        } 
        // 当前同步器实例持有的线程是否当前线程(currentThread())
        protected boolean isHeldExclusively() {
            throw new UnsupportedOperationException();
        } 
    
    // 暂时不分析其他方法             
    }        
    

    实际上,Condition的所有await()方法变体都调用addConditionWaiter()添加阻塞线程到条件队列中。我们按照分析同步等待队列的情况,分析一下条件等待队列。正常情况下,假设有2个线程thread-1和thread-2进入条件等待队列,都处于阻塞状态。

    先是thread-1进入条件队列:

    [图片上传失败...(image-f37b4b-1554614211606)]

    然后是thread-2进入条件队列:

    [图片上传失败...(image-5c7cdf-1554614211606)]

    条件等待队列看起来也并不复杂,但是它并不是单独存在和使用的,一般依赖于同步等待队列,下面的一节分析Condition的实现的时候再详细分析。

    独占模式与共享模式

    前文提及到,同步器涉及到独占模型和共享模式。下面就针对这两种模式详细分析一下AQS的具体实现源码。

    独占模式

    AQS同步器如果使用独占(EXCLUSIVE)模式,那么意味着同一个时刻,只有节点所在一个线程获取(acuqire)原子状态status成功,此时该线程可以从阻塞状态解除继续运行,而同步等待队列中的其他节点持有的线程依然处于阻塞状态。独占模式同步器的功能主要由下面的四个方法提供:

    • acquire(int arg);申请获取arg个原子状态status(申请成功可以简单理解为status = status - arg)。
    • acquireInterruptibly(int arg):申请获取arg个原子状态status,响应线程中断。
    • tryAcquireNanos(int arg, long nanosTimeout):申请获取arg个原子状态status,带超时的版本。
    • release(int arg):释放arg个原子状态status(释放成功可以简单理解为status = status + arg)。

    独占模式下,AQS同步器实例初始化时候传入的status值,可以简单理解为"允许申请的资源数量的上限值",下面的acquire类型的方法暂时称为"获取资源",而release方法暂时称为"释放资源"。接着我们分析前面提到的四个方法的源码,先看acquire(int arg)

    public final void acquire(int arg) {
        // 获取资源成功或者新增一个独占类型节点到同步等待队列成功则直接返回,否则中断当前线程
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    // 此方法必须又子类覆盖,用于决定是否获取资源成功
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    
    // 中断当前线程
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
    
    // 不可中断的独占模式下,同步等待队列中的线程获取资源的方法
    final boolean acquireQueued(final Node node, int arg) {
        boolean interrupted = false;
        try {
            for (;;) {
                // 获取新入队节点的前驱节点
                final Node p = node.predecessor();
                // 前驱节点为头节点并且尝试获取资源成功,也就是每一轮循环都会调用tryAcquire尝试获取资源,除非阻塞或者跳出循环
                if (p == head && tryAcquire(arg)) {
                    // 设置新入队节点为头节点,原来的节点会从队列中断开
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;   // <== 注意,这个位置是跳出死循环的唯一位置
                }
                // 判断是否需要阻塞当前获取资源失败的节点中持有的线程
                if (shouldParkAfterFailedAcquire(p, node))
                    // 阻塞当前线程,如果被唤醒则返回并清空线程的中断标记
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            if (interrupted)
                selfInterrupt();
            throw t;
        }
    }
    
    /**
     * 检查并且更新获取资源失败的节点的状态,返回值决定线程是否需要被阻塞。
     * 这个方法是所有循环获取资源方法中信号控制的主要方法
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 这里记住ws是当前处理节点的前驱节点的等待状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            // 前驱节点状态设置成Node.SIGNAL成功,等待被release调用释放,后继节点可以安全地进入阻塞状态
            return true;
        if (ws > 0) {
            // ws大于0只有一种情况Node.CANCELLED,说明前驱节点已经取消获取资源,
            // 这个时候会把所有这类型取消的前驱节点移除,找到一个非取消的节点重新通过next引用连接当前节点
            do {
               node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 其他等待状态直接修改前驱节点等待状态为Node.SIGNAL
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }
    
    // 阻塞当前线程,获取并且重置线程的中断标记位
    private final boolean parkAndCheckInterrupt() {
        // 这个就是阻塞线程的实现,依赖Unsafe的API
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    上面的代码虽然看起来能基本理解,但是最好用图推敲一下"空间上的变化":

    [图片上传失败...(image-4b9a97-1554614211606)]

    [图片上传失败...(image-f5243d-1554614211606)]

    接着分析一下release(int arg)的实现:

    // 释放资源
    public final boolean release(int arg) {
        // 尝试释放资源
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    // 尝试释放资源,独占模式下,尝试通过重新设置status的值从而实现释放资源的功能
    // 这个方法必须由子类实现
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    
    // 解除传入节点(一般是头节点)的第一个后继节点的阻塞状态,当前处理节点的等待状态会被CAS更新为0
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        // 当前处理的节点(一般是头节点)状态小于0则直接CAS更新为0
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 如果节点的第一个后继节点为null或者等待状态大于0(取消),则从等待队列的尾节点向前遍历,
            // 找到最后一个不为null,并且等待状态小于等于0的节点
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        // 解除上面的搜索到的节点的阻塞状态
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    接着用上面的图:

    [图片上传失败...(image-e4c83-1554614211606)]

    上面图中thread-2晋升为头节点的第一个后继节点,等待下一个release()释放资源唤醒之就能晋升为头节点,一旦晋升为头节点也就是意味着可以解除阻塞继续运行。接着我们可以看acquire()的响应中断版本和带超时的版本。先看acquireInterruptibly(int arg)

    public final void acquireInterruptibly(int arg)
                throws InterruptedException {
        // 获取并且清空线程中断标记位,如果是中断状态则直接抛InterruptedException异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 如果获取资源失败
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    
    // 独占模式下响应中断的获取资源方法
    private void doAcquireInterruptibly(int arg) throws InterruptedException {
        // 基于当前线程新增一个独占的Node节点进入同步等待队列中
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return;
                }
                // 获取资源失败进入阻塞状态
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                        // 解除阻塞后直接抛出InterruptedException异常
                        throw new InterruptedException();
                }
             } catch (Throwable t) {
                cancelAcquire(node);
                throw t;
        }
    }
    

    doAcquireInterruptibly(int arg)方法和acquire(int arg)类似,最大的不同点在于阻塞线程解除阻塞后并不是正常继续运行,而是直接抛出InterruptedException异常。最后看tryAcquireNanos(int arg, long nanosTimeout)的实现:

    // 独占模式下尝试在指定超时时间内获取资源,响应线程中断
    public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
    }
    
    // 独占模式下带超时时间限制的获取资源方法
    private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
        // 超时期限小于0纳秒,快速失败
        if (nanosTimeout <= 0L)
            return false;
        // 超时的最终期限是当前系统时钟纳秒+外部指定的nanosTimeout增量
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return true;
                }
                // 计算出剩余的超时时间
                nanosTimeout = deadline - System.nanoTime();
                // 剩余超时时间小于0说明已经超时则取消获取
                if (nanosTimeout <= 0L) {
                    cancelAcquire(node);
                    return false;
                }
                // 这里会判断剩余超时时间大于1000纳秒的时候才会进行带超时期限的线程阻塞,否则会进入下一轮获取尝试
                if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                        LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
                }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }
    

    tryAcquireNanos(int arg, long nanosTimeout)其实和doAcquireInterruptibly(int arg)类似,它们都响应线程中断,不过tryAcquireNanos()在获取资源的每一轮循环尝试都会计算剩余可用的超时时间,只有同时满足获取失败需要阻塞并且剩余超时时间大于SPIN_FOR_TIMEOUT_THRESHOLD(1000纳秒)的情况下才会进行阻塞。

    独占模式的同步器的一个显著特点就是:头节点的第一个有效(非取消)的后继节点,总是尝试获取资源,一旦获取资源成功就会解除阻塞并且晋升为头节点,原来所在节点会移除出同步等待队列,原来的队列长度就会减少1,然后头结点的第一个有效的后继节点继续开始竞争资源。

    [图片上传失败...(image-58d39f-1554614211606)]

    使用独占模式同步器的主要类库有:

    • 可重入锁ReentrantLock
    • 读写锁ReentrantReadWriteLock中的写锁WriteLock

    共享模式

    共享(SHARED)模式中的"共享"的含义是:同一个时刻,如果有一个节点所在线程获取(acuqire)原子状态status成功,那么它会解除阻塞被唤醒,并且会把唤醒状态传播到所有的后继节点(换言之就是唤醒整个同步等待队列中的所有节点)。共享模式同步器的功能主要由下面的四个方法提供:

    • acquireShared(int arg);申请获取arg个原子状态status(申请成功可以简单理解为status = status - arg)。
    • acquireSharedInterruptibly(int arg):申请获取arg个原子状态status,响应线程中断。
    • tryAcquireSharedNanos(int arg, long nanosTimeout):申请获取arg个原子状态status,带超时的版本。
    • releaseShared(int arg):释放arg个原子状态status(释放成功可以简单理解为status = status + arg)。

    先看acquireShared(int arg)的源码:

    // 共享模式下获取资源
    public final void acquireShared(int arg) {
        // 注意tryAcquireShared方法值为整型,只有小于0的时候才会加入同步等待队列
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    // 共享模式下尝试获取资源,此方法需要由子类覆盖
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    
    // 共享模式下获取资源和处理同步等待队列的方法
    private void doAcquireShared(int arg) {
        // 基于当前线程新建一个标记为共享的新节点
        final Node node = addWaiter(Node.SHARED);
        boolean interrupted = false;
        try {
            for (;;) {
                final Node p = node.predecessor();
                // 如果当前节点的前驱节点是头节点
                if (p == head) {
                    // 每一轮循环都会调用tryAcquireShared尝试获取资源,除非阻塞或者跳出循环
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {  // <= tryAcquireShared方法>=0说明直资源获取成功
                        // 设置头结点,并且传播获取资源成功的状态,这个方法的作用是确保唤醒状态传播到所有的后继节点
                        // 然后任意一个节点晋升为头节点都会唤醒其第一个有效的后继节点,起到一个链式释放和解除阻塞的动作
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                // 判断获取资源失败是否需要阻塞,这里会把前驱节点的等待状态CAS更新为Node.SIGNAL
                if (shouldParkAfterFailedAcquire(p, node))
                    interrupted |= parkAndCheckInterrupt();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        } finally {
            if (interrupted)
                selfInterrupt();
        }
    }
    
    // 设置同步等待队列的头节点,判断当前处理的节点的后继节点是否共享模式的节点,如果共享模式的节点,
    // propagate大于0或者节点的waitStatus为PROPAGATE则进行共享模式下的释放资源
    private void setHeadAndPropagate(Node node, int propagate) {
        // h为头节点的中间变量
        Node h = head;
        // 设置当前处理节点为头节点
        setHead(node);
        // 这个判断条件比较复杂:入参propagate大于0 || 头节点为null || 头节点的状态为非取消 || 再次获取头节点为null || 再次获取头节点不为取消
        if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // 当前节点(其实已经成为头节点)的第一个后继节点为null或者是共享模式的节点
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    
    // Release action for shared mode:共享模式下的释放资源动作
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            // 头节点不为null并且不为尾节点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 如果头节点等待状态为SIGNAL(-1)则CAS更新它为0,更新成功后唤醒和解除其后继节点的阻塞
                if (ws == Node.SIGNAL) {
                    if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                        continue;
                    // 唤醒头节点的后继节点
                    unparkSuccessor(h);
                }
                // 如果头节点的等待状态为0,则CAS更新它为PROPAGATE(-3)
                else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                    continue;
                }
            // 头节点没有变更,则跳出循环
            if (h == head)
                break;
        }
    }
    

    其实代码的实现和独占模式有很多类似的地方,一个很大的不同点是:共享模式同步器当节点获取资源成功晋升为头节点之后,它会把自身的等待状态通过CAS更新为Node.PROPAGATE,下一个加入等待队列的新节点会把头节点的等待状态值更新回Node.SIGNAL,标记后继节点处于可以被唤醒的状态,如果遇上资源释放,那么这个阻塞的节点就能被唤醒解除阻塞。我们还是画图理解一下,先假设tryAcquireShared(int arg)总是返回小于0的值,入队两个阻塞的线程thread-1和thread-2,然后进行资源释放确保tryAcquireShared(int arg)总是返回大于0的值:

    [图片上传失败...(image-d711f1-1554614211606)]

    看起来和独占模式下的同步等待队列差不多,实际上真正不同的地方在于有节点解除阻塞和晋升为头节点的过程。因此我们可以先看releaseShared(int arg)的源码:

    // 共享模式下释放资源
    public final boolean releaseShared(int arg) {
        // 尝试释放资源成功则调用前面分析过的doReleaseShared以传播唤醒状态和unpark头节点的后继节点
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    
    // 共享模式下尝试释放资源,必须由子类覆盖
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    

    releaseShared(int arg)就是在tryReleaseShared(int arg)调用返回true的情况下主动调用一次doReleaseShared()从而基于头节点传播唤醒状态和unpark头节点的后继节点。接着之前的图:

    [图片上传失败...(image-54bc69-1554614211606)]

    [图片上传失败...(image-4ab50f-1554614211606)]

    接着看acquireSharedInterruptibly(int arg)的源码实现:

    // 共享模式下获取资源的方法,响应线程中断
    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
    
    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                // 和非响应线程中断的acquireShared方法类似,不过这里解除阻塞之后直接抛出异常InterruptedException
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }
    

    最后看tryAcquireSharedNanos(int arg, long nanosTimeout)的源码实现:

    // 共享模式下获取资源的方法,带超时时间版本
    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 注意这里只要tryAcquireShared >= 0或者doAcquireSharedNanos返回true都认为获取资源成功
            return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);
    }
    
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        // 计算超时的最终期限    
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return true;
                    }
                }
                //重新计算剩余的超时时间 
                nanosTimeout = deadline - System.nanoTime();
                // 超时的情况下直接取消获取
                if (nanosTimeout <= 0L) {
                    cancelAcquire(node);
                    return false;
                }
                // 满足阻塞状态并且剩余的超时时间大于阀值1000纳秒则通过LockSupport.parkNanos()阻塞线程
                if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                    LockSupport.parkNanos(this, nanosTimeout);
                // 解除阻塞后判断线程的中断标记并且清空标记位,如果是处于中断状态则抛出InterruptedException 
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }
    

    共享模式的同步器的一个显著特点就是:头节点的第一个有效(非取消)的后继节点,总是尝试获取资源,一旦获取资源成功就会解除阻塞并且晋升为头节点,原来所在节点会移除出同步等待队列,原来的队列长度就会减少1,重新设置头节点的过程会传播唤醒的状态,简单来说就是唤醒一个有效的后继节点,只要一个节点可以晋升为头节点,它的后继节点就能被唤醒。节点的唤醒顺序遵循类似于FIFO的原则,通俗说就是先阻塞或者阻塞时间最长则先被唤醒

    相关文章

      网友评论

        本文标题:JUC同步器框架AbstractQueuedSynchroniz

        本文链接:https://www.haomeiwen.com/subject/djxsiqtx.html