Java并发编程实战: AQS 源码 史上最详尽图解+逐行注释

作者: 光剑书架上的书 | 来源:发表于2019-07-13 16:33 被阅读166次

    Java并发编程实战: AQS 源码 史上最详尽图解+逐行注释

    引言: 学习一个java并发编程工具的时候,我们首先要抓住这三点:

    状态

    一般是一个state属性,它基本是整个工具的核心,通常整个工具都是在设置和修改状态,很多方法的操作都依赖于当前状态是什么。

    由于状态是全局共享的,一般会被设置成volatile类型,以保证其修改的可见性;

    队列

    队列通常是一个等待对象 Node 的集合,大多数以链表的形式实现。队列采用的是悲观锁的思想,表示当前所等待的资源,状态或者条件短时间内可能无法满足。因此,它会将当前线程包装成某种类型的数据结构 Node ,放入一个等待队列中,当一定条件满足后,再从等待队列中取出。

    CAS

    CAS操作是最轻量的并发处理,通常我们对于状态的修改都会用到CAS操作,因为状态可能被多个线程同时修改,CAS操作保证了同一个时刻,只有一个线程能修改成功,从而保证了线程安全,CAS操作基本是由Unsafe工具类的compareAndSwapXXX来实现的;CAS采用的是乐观锁的思想,因此常常伴随着自旋,如果发现当前无法成功地执行CAS,则不断重试,直到成功为止,自旋的的表现形式通常是一个死循环for(;;)。

    [ ref: https://segmentfault.com/a/1190000015739343 ]

    AbstractQueuedSynchronizer

    双向 CLH 链表

    节点模型

    节点状态

    简介

    • AbstractQueuedSynchronizer是JDK实现其他同步工具的基础。
      AQS内部封装了一个状态volatile int state用来表示资源,提供了独占以及共享两种操作:acquire(acquireShare)/release(releaseShare)。
      acquire的语义是:获取资源,如果当前资源满足条件,则直接返回,否则挂起当前线程
      release的语义是:释放资源,唤醒挂起线程

    特征

    • first-in-first-out (FIFO) wait queues

    • blocking locks and related synchronizers (semaphores, events, etc)

    • 乐观锁

      • 共享锁shared是一个乐观锁。可以允许多个线程阻塞点,可以多个线程同时获取到锁。它允许一个资源可以被多个读操作,或者被一个写操作访问,但是两个操作不能同时访问。
      • Java中的乐观锁基本都是通过CAS操作实现的,CAS是一种更新的原子操作,比较当前值跟传入值版本号是否一样,一样的更新,否则失败。
    • 悲观锁

      • 独占锁exclusive是一个悲观锁。保证只有一个线程经过一个阻塞点,只有一个线程可以获得锁。
      • Java中的悲观锁就是synchronized,AQS框架下的锁则是先尝试CAS乐观锁去获取,获取不到才会转为悲观锁,如ReentrantLock
    • 大量使用了CAS操作, 并且在冲突时,采用自旋方式重试,以实现轻量级和高效地获取锁。

    • AQS可以实现独占锁和共享锁

    • 通过一个CLH队列实现的(CLH锁即Craig, Landin, and Hagersten (CLH) locks,CLH锁是一个自旋锁,能确保无饥饿性,提供先来先服务的公平性。CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。)

    核心业务逻辑

    • 1.AQS中用state属性表示锁同步状态,如果能成功将state属性通过CAS操作从0设置成1即获取了锁. 当state>0时表示已经获取了锁,当state = 0无锁。
    • 2.获取了锁的线程才能将exclusiveOwnerThread设置成自己
    • 3.addWaiter负责将当前等待锁的线程包装成Node,并成功地添加到队列的末尾,这一点是由它调用的enq方法保证的,enq方法同时还负责在队列为空时初始化队列。
    • 4.acquireQueued方法用于在Node成功入队后,继续尝试获取锁(取决于Node的前驱节点是不是head),或者将线程挂起
    • 5.shouldParkAfterFailedAcquire方法用于保证当前线程的前驱节点的waitStatus属性值为SIGNAL,从而保证了自己挂起后,前驱节点会负责在合适的时候唤醒自己。
    • 6.parkAndCheckInterrupt方法用于挂起当前线程,并检查中断状态。
    • 7.如果最终成功获取了锁,线程会从lock()方法返回,继续往下执行;否则,线程会阻塞等待。

    三板斧

    状态

    • volatile state属性

      • private volatile int state;
      • 该属性的值即表示了锁的状态,state为0表示锁没有被占用,state大于0表示当前已经有线程持有该锁,这里之所以说大于0而不说等于1是因为可能存在可重入的情况。你可以把state变量当做是当前持有该锁的线程数量。
      • CAS 操作用来改变状态
    • waitStatus 的状态值

      • static final int CANCELLED = 1;

        • 表示Node所代表的当前线程已经取消了排队,即放弃获取锁了。
      • static final int SIGNAL = -1;

        • 它不是表征当前节点的状态,而是当前节点的下一个节点的状态。
        • 当一个节点的waitStatus被置为SIGNAL,就说明它的下一个节点(即它的后继节点)已经被挂起了(或者马上就要被挂起了),因此在当前节点释放了锁或者放弃获取锁时,如果它的waitStatus属性为SIGNAL,它还要完成一个额外的操作——唤醒它的后继节点。
      • static final int CONDITION = -2;

      • static final int PROPAGATE = -3;

    • CAS操作

      • CAS操作主要针对5个属性

        • AQS的3个属性state,head和tail
        • Node对象的两个属性waitStatus,next
       private static final Unsafe unsafe = Unsafe.getUnsafe();
       private static final long stateOffset;
       private static final long headOffset;
       private static final long tailOffset;
       private static final long waitStatusOffset;
       private static final long nextOffset;
    
       static {
           try {
               stateOffset = unsafe.objectFieldOffset
                   (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
               headOffset = unsafe.objectFieldOffset
                   (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
               tailOffset = unsafe.objectFieldOffset
                   (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
               waitStatusOffset = unsafe.objectFieldOffset
                   (Node.class.getDeclaredField("waitStatus"));
               nextOffset = unsafe.objectFieldOffset
                   (Node.class.getDeclaredField("next"));
    
           } catch (Exception ex) { throw new Error(ex); }
       }
    
    

    CAS操作代码

    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
    private static final boolean compareAndSetWaitStatus(Node node, int expect,int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
    }
    private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }
    

    队列

    • AQS中,队列的实现是一个双向链表,被称为sync queue,它表示所有等待锁的线程的集合
    • AQS中的队列是一个CLH队列,它的head节点永远是一个哑结点(dummy node), 它不代表任何线程(某些情况下可以看做是代表了当前持有锁的线程),因此head所指向的Node的thread属性永远是null。只有从次头节点往后的所有节点才代表了所有等待锁的线程。也就是说,在当前线程没有抢到锁被包装成Node扔到队列中时,即使队列是空的,它也会排在第二个,我们会在它的前面新建一个dummy节点
    • 在并发编程中使用队列通常是将当前线程包装成某种类型的数据结构扔到等待队列中.
    • 队列中的节点数据结构
    static final class Node {
    
        // 共享
        static final Node SHARED = new Node();
        // 独占
        static final Node EXCLUSIVE = null;
    
        /**
         * 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,他会一直保持取消状态不会转变为其他状态
         */
        static final int CANCELLED =  1;
        /**
         * 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
         * (说白了就是处于等待被唤醒的线程(或是节点)只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行)
         */
        static final int SIGNAL    = -1;
        /**
         * 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
         */
        static final int CONDITION = -2;
        /**
         * 表示下一次共享式同步状态获取,将会无条件地传播下去
         */
        static final int PROPAGATE = -3;
    
        /** 等待状态 */
        volatile int waitStatus;
    
        /** 前驱节点,当节点添加到同步队列时被设置(尾部添加) */
        volatile Node prev;
    
        /** 后继节点 */
        volatile Node next;
    
        /** 等待队列中的后续节点。如果当前节点是共享的,那么字段将是一个 SHARED 常量,也就是说节点类型(独占和共享)和等待队列中的后续节点共用同一个字段 */
        Node nextWaiter;
        
        /** 获取同步状态的线程 */
        volatile Thread thread;
    
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    
        Node() { // Used to establish initial head or SHARED marker
        }
    
        Node(Thread thread, Node mode) { // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
    
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
        
    }
    
    • 状态变量waitStatus
      表示当前Node所代表的线程的等待锁的状态,在独占锁模式下,我们只需要关注CANCELLED SIGNAL两种状态即可。

    • nextWaiter属性
      在独占锁模式下永远为null,仅仅起到一个标记作用,没有实际意义。

    AQS核心属性

    锁相关的属性有两个

    • private volatile int state; //锁的状态
    • private transient Thread exclusiveOwnerThread; // 当前持有锁的线程,注意这个属性是从AbstractOwnableSynchronizer继承而来

    sync queue相关的属性有两个

    • private transient volatile Node head; // 队头,为dummy node
    • private transient volatile Node tail; // 队尾,新入队的节点

    队列中的Node的属性

    // 节点所代表的线程
    volatile Thread thread;
     
    // 双向链表,每个节点需要保存自己的前驱节点和后继节点的引用
    volatile Node prev;
    volatile Node next;
     
    // 线程所处的等待锁的状态,初始化时,该值为0
    volatile int waitStatus;
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    

    acquire分析

    • tryAcquire()尝试直接去获取资源,如果成功则直接返回;

    • addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;

    • acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。

    如果线程在等待过程中被中断过,先不响应的。在获取资源后才再进行自我中断selfInterrupt()

    tryAcquire(arg) : 获取锁的业务逻辑

    • 判断当前锁有没有被占用:

    1.如果锁没有被占用, 尝试以公平的方式获取锁
    2.如果锁已经被占用, 检查是不是锁重入
    获取锁成功返回true, 失败则返回false

    addWaiter(Node mode)

    当tryAcquire失败后,才会调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg),addWaiter方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点。

    使用了自旋保证插入队尾成功。

    在获取锁失败后调用, 将当前请求锁的线程包装成Node扔到sync queue中去,并返回这个Node。

    源代码

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
    // 如果队列不为空, 则用CAS方式将当前节点设为尾节点
            if (pred != null) {
                node.prev = pred;
     // 检查tail的状态,如果当前是pred
                if (compareAndSetTail(pred, node)) { // 将当前节点设为尾节点
                    pred.next = node; // 把tail的next节点指向当前Node
                    return node;
                }
            }
        
    
        // 代码会执行到这里, 只有两种情况:
        //    1. 队列为空
        //    2. CAS失败 
        // 注意, 这里是并发条件下, 所以什么都有可能发生, 尤其注意CAS失败后也会来到这里. 例如: 有可能其他线程已经成为了新的尾节点,导致尾节点不再是我们之前看到的那个pred了。
    
       // 如果当前node插入队尾失败,则通过自旋保证替换成功(自旋+CAS)
            enq(node);   
            return node;
        }
    
    • enq() 方法

      • 在该方法中, 我们使用了死循环, 即以自旋方式将节点插入队列,如果失败则不停的尝试, 直到成功为止, 另外, 该方法也负责在队列为空时, 初始化队列,这也说明,队列是延时初始化的(lazily initialized):
    private Node enq(final Node node) {
       for (;;) {
           Node t = tail;
           // 如果是空队列, 首先进行初始化
           // 这里也可以看出, 队列不是在构造的时候初始化的, 而是延迟到需要用的时候再初始化, 以提升性能
           if (t == null) { 
    // 注意,初始化时使用new Node()方法新建了一个dummy节点
    // 从这里可以看出, 在这个等待队列中,头结点是一个“哑节点”,它不代表任何等待的线程。 
    // head节点不代表任何线程,它就是一个空节点!
               if (compareAndSetHead(new Node()))
                   tail = head; // 这里仅仅是将尾节点指向dummy节点,并没有返回
           } else {
           // 到这里说明队列已经不是空的了, 这个时候再继续尝试将节点加到队尾
    
    // 1.设置node的前驱节点为当前的尾节点
               node.prev = t;
    
    // 2.修改tail属性,使它指向当前节点; 这里的CAS保证了同一时刻只有一个节点能成为尾节点,其他节点将失败,失败后将回到for循环中继续重试。
               if (compareAndSetTail(t, node)) {
    
    // 3.修改原来的尾节点,使它的next指向当前节点
                   t.next = node;
                   return t;
               }
           }
       }
    }
    

    将一个节点node添加到sync queue的末尾需要三步:

    1.设置node的前驱节点为当前的尾节点:node.prev = t
    2.修改tail属性,使它指向当前节点
    3.修改原来的尾节点,使它的next指向当前节点

    • 尾分叉
    // Step1
        node.prev = t;
    // Step2
        if (compareAndSetTail(t, node)) {
    // Step3        
            t.next = node;
    
            return t;
    }
    

    需要注意,这里的三步并不是一个原子操作,第一步很容易成功;而第二步由于是一个CAS操作,在并发条件下有可能失败,第三步只有在第二步成功的条件下才执行。这里的CAS保证了同一时刻只有一个节点能成为尾节点,其他节点将失败,失败后将回到for循环中继续重试。

    所以,当有大量的线程在同时入队的时候,同一时刻,只有一个线程能完整地完成这三步,而其他线程只能完成第一步,于是就出现了尾分叉.

    这里第三步是在第二步执行成功后才执行的,这就意味着,有可能即使我们已经完成了第二步,将新的节点设置成了尾节点,此时原来旧的尾节点的next值可能还是null(因为还没有来的及执行第三步),所以如果此时有线程恰巧从头节点开始向后遍历整个链表,则它是遍历不到新加进来的尾节点的,但是这显然是不合理的,因为现在的tail已经指向了新的尾节点。

    另一方面,当我们完成了第二步之后,第一步一定是完成了的,所以如果我们从尾节点开始向前遍历,已经可以遍历到所有的节点。

    这也就是为什么我们在AQS相关的源码中 (比如:unparkSuccessor(Node node) 中的:

    for (Node t = tail; t != null && t != node; t = t.prev))
    

    通常是从尾节点开始逆向遍历链表——因为一个节点要能入队,则它的prev属性一定是有值的,但是它的next属性可能暂时还没有值。

    至于那些“分叉”的入队失败的其他节点,在下一轮的循环中,它们的prev属性会重新指向新的尾节点,继续尝试新的CAS操作,最终,所有节点都会通过自旋不断的尝试入队,直到成功为止。

    acquireQueued(final Node node, int arg)

    addWaiter的将当前线程加入队列后,使用acquireQueued进行阻塞,直到获取到资源后返回。

    condition = lock.newCondition();
    lock.lock();
    try{
      while(!条件谓词成立){
        condition.await();
      }
    }
    finally{
      lock.unlock();
    }
    
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    // 当前节点的前驱是 head 节点时, 再次尝试获取锁
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    //在获取锁失败后, 判断是否需要把当前线程挂起
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    

    shouldParkAfterFailedAcquire(Node pred, Node node)

    • 这个函数只有在当前节点的前驱节点的waitStatus状态本身就是SIGNAL的时候才会返回true, 其他时候都会返回false:
    // Returns true if thread should block.
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; // 获得前驱节点的ws
        if (ws == Node.SIGNAL)
            // 前驱节点的状态已经是SIGNAL了(This node has already set status asking a release),说明闹钟已经设了,可以直接高枕无忧地睡了(so it can safely park)
            return true;
        if (ws > 0) {
            // 当前节点的 ws > 0, 则为 Node.CANCELLED 说明前驱节点已经取消了等待锁(由于超时或者中断等原因)
            // 既然前驱节点不等了, 那就继续往前找, 直到找到一个还在等待锁的节点
            // 然后我们跨过这些不等待锁的节点, 直接排在等待锁的节点的后面 (是不是很开心!!!)
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 前驱节点的状态既不是SIGNAL,也不是CANCELLED
            // 用CAS设置前驱节点的ws为 Node.SIGNAL,给自己定一个闹钟
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    • 提示 waitStatus
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
    

    parkAndCheckInterrupt()

    到这个函数已经是最后一步了, 就是将线程挂起, 等待被唤醒. Convenience method to park and then check if interrupted. return true if interrupted

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // 线程被挂起,停在这里不再往下执行了
        return Thread.interrupted();
    }
    

    LockSupport.park()

    public class LockSupport extends Object
    用于创建锁和其他同步类的基本线程阻塞原语。

    • 源代码
       public static void park(Object blocker) {
           Thread t = Thread.currentThread();
           setBlocker(t, blocker);
           UNSAFE.park(false, 0L);
           setBlocker(t, null);
       }
    
       private static void setBlocker(Thread t, Object arg) {
           // Even though volatile, hotspot doesn't need a write barrier here.
           UNSAFE.putObject(t, parkBlockerOffset, arg);
       }
    
    • public static void park(Object blocker) :

    Disables the current thread for thread scheduling purposes unless the permit is available.
    If the permit is available then it is consumed and the call returns immediately; otherwise the current thread becomes disabled for thread scheduling purposes and lies dormant until one of three things happens:

    Some other thread invokes unpark with the current thread as the target; or
    Some other thread interrupts the current thread; or
    The call spuriously (that is, for no reason) returns.

    This method does not report which of these caused the method to return. Callers should re-check the conditions which caused the thread to park in the first place. Callers may also determine, for example, the interrupt status of the thread upon return.

    独占锁

    同一时刻,锁只能被一个线程所持有。

    通过state变量是否为0,我们可以分辨当前锁是否被占用,但光知道锁是不是被占用是不够的,我们并不知道占用锁的线程是哪一个。

    在AQS中,通过exclusiveOwnerThread (独占锁拥有者)属性来保存占用锁的线程是哪一个

    package java.util.concurrent.locks;
    
    /**
     * A synchronizer that may be exclusively owned by a thread.  This
     * class provides a basis for creating locks and related synchronizers
     * that may entail a notion of ownership.  The
     * {@code AbstractOwnableSynchronizer} class itself does not manage or
     * use this information. However, subclasses and tools may use
     * appropriately maintained values to help control and monitor access
     * and provide diagnostics.
     *
     * @since 1.6
     * @author Doug Lea
     */
    public abstract class AbstractOwnableSynchronizer
        implements java.io.Serializable {
    
        /** Use serial ID even though all fields transient. */
        private static final long serialVersionUID = 3737899427754241961L;
    
        /**
         * Empty constructor for use by subclasses.
         */
        protected AbstractOwnableSynchronizer() { }
    
        /**
         * The current owner of exclusive mode synchronization.
         *  当前持有锁的线程
         */
        private transient Thread exclusiveOwnerThread;
    
        /**
         * Sets the thread that currently owns exclusive access.
         * A {@code null} argument indicates that no thread owns access.
         * This method does not otherwise impose any synchronization or
         * {@code volatile} field accesses.
         * @param thread the owner thread
         */
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
    
        /**
         * Returns the thread last set by {@code setExclusiveOwnerThread},
         * or {@code null} if never set.  This method does not otherwise
         * impose any synchronization or {@code volatile} field accesses.
         * @return the owner thread
         */
        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    }
    

    ReentrantLock 源码分析

    ReentrantLock有 公平锁 和 非公平锁 两种实现, 默认实现为非公平锁, 这体现在它的构造函数中:

    public class ReentrantLock implements Lock, java.io.Serializable {
       /** Synchronizer providing all implementation mechanics */
       private final Sync sync;
       
       /**
        * Base of synchronization control for this lock. Subclassed
        * into fair and nonfair versions below. Uses AQS state to
        * represent the number of holds on the lock.
        */
       abstract static class Sync extends AbstractQueuedSynchronizer {
           ...
       }
       
       /**
        * Sync object for non-fair locks
        */
       static final class NonfairSync extends Sync{
           ...
       }
       
       /**
        * Sync object for fair locks
        */
       static final class FairSync extends Sync {
           ...
       }
       
       /**
        * Creates an instance of {@code ReentrantLock}.
        * This is equivalent to using {@code ReentrantLock(false)}.
        */
       public ReentrantLock() {
           // 默认构造函数, 非公平锁
           sync = new NonfairSync();
       }
    
       /**
        * Creates an instance of {@code ReentrantLock} with the
        * given fairness policy.
        *
        * @param fair {@code true} if this lock should use a fair ordering policy
        */
       public ReentrantLock(boolean fair) {
           sync = fair ? new FairSync() : new NonfairSync();
       }
       
       // 获取锁
       public void lock() {
           sync.lock();
       }
       
       ...
    }
    

    FairLock

    • 源代码
        /**
         * Sync object for fair locks
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() { // final的lock()方法
                // 获取锁
                acquire(1);
            }
    
            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
    
        // 首先获取当前锁的状态
                int c = getState();
    
        // c=0 说明当前锁是avaiable的, 没有被任何线程占用, 可以尝试获取
        // 因为是实现公平锁, 所以在抢占之前首先看看队列中有没有排在自己前面的Node
        // 如果没有人在排队, 则通过CAS方式获取锁, 就可以直接退出了
                if (c == 0) {
    //检测自己是不是head节点的后继节点,即处在阻塞队列第一位的节点
                    if (!hasQueuedPredecessors() &&
    // 当前线程还没有获得锁,所以可能存在多线程同时在竞争锁的情况, 所以这里使用CAS操作设置 state
                        compareAndSetState(0, acquires)) {
    
    // 将当前线程设置为占用锁的线程
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
    
    
        // 如果 c>0 说明锁已经被占用了
        // 对于可重入锁, 这个时候检查占用锁的线程是不是就是当前线程,是的话,说明已经拿到了锁, 直接重入就行
                else if (current == getExclusiveOwnerThread()) {
    // 重入锁, state 加1 (acquires)
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
    // 调用setState方法时,是在当前线程已经是持有锁的情况下,因此对state的修改是安全的,只需要普通的方法就可以了。
                    setState(nextc);
                    return true;
                }
    
        // 到这里说明有人占用了锁, 并且占用锁的不是当前线程, 则获取锁失败
                return false;
            }
        }
    

    获取锁的业务逻辑小结

    1.获取锁其实主要就是干一件事:

    将state的状态通过CAS操作由0改写成1.

    由于是CAS操作,必然是只有一个线程能执行成功。则执行成功的线程即获取了锁,在这之后,才有权利将exclusiveOwnerThread的值设成自己,从而成为“王锁拥有者”。

    2.另外对于可重入锁,如果当前线程已经是获取了锁的线程了,它还要注意增加锁的重入次数。

    3.值得一提的是,这里修改state状态的操作,一个用了CAS方法compareAndSetState,一个用了普通的setState方法。这是因为用CAS操作时,当前线程还没有获得锁,所以可能存在多线程同时在竞争锁的情况;而调用setState方法时,是在当前线程已经是持有锁的情况下,因此对state的修改是安全的,只需要普通的方法就可以了。

    因此,在多线程条件下看源码时,我们一定要时刻在心中问自己:

    这段代码是否是线程安全的?同一时刻是否可能有多个线程在执行这行代码?

    获取锁的流程 : aquire() 方法

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                // Acquires in exclusive mode
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    tryAcquire(arg)

            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
                // 当前运行的线程
                final Thread current = Thread.currentThread();
                // 获取当前AQS的 synchronization state
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

    hasQueuedPredecessors() 判断是否有等待时间比当前线程更长的线程:

    • 返回结果:
      true, 如果当前线程之前有一个排队的线程;
      如果当前线程在队列的头部或队列为空, false

    • 等同于:

      getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads()

        public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    
    • addWaiter(Node mode)
    • acquireQueued(final Node node, int arg)
    • selfInterrupt

    Java内存模型

    8种操作

    • lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态。
    • unlock(解锁):作用于主内存变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
    • read(读取):作用于主内存变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用
    • load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
    • use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。
    • assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
    • store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作。
    • write(写入):作用于主内存的变量,它把store操作从工作内存中一个变量的值传送到主内存的变量中。

    操作规则

    • 不允许read和load,store和write操作之一单独出现。
    • 不允许一个线程丢弃它最近的assign操作。即变量在工作内存中改变了账号必须把变化同步回主内存。
    • 不允许一个线程无原因地(没有发生过任何assign操作)把数据从工作内存同步回主内存中。
    • 一个新的变量只允许在主内存中诞生,不允许工作内存直接使用未初始化的变量。
    • 一个变量同一时刻只允许一条线程进行lock操作,但同一线程可以lock多次,lock多次之后必须执行同样次数的unlock操作。
    • 如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前需要重新执行load或assign操作初始化变量的值。
    • 如果一个变量事先没有被lock操作锁定,则不允许对它执行unlock操作;也不允许去unlock一个被其他线程锁定的变量。
    • 对一个变量执行unlock操作之前,必须先把此变量同步到主内存中(执行store和write操作)。

    3个特征

    • 原子性

      • 不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (换到另一个线程) 如: 赋值或者return。比如”a = 1;”和 “return a;”这样的操作都具有原子性。
      • 如果代码不能保证操作为原子操作,可以使用synchronized来保证原子操作
    • 可见性

      • 当一个线程修改了共享变量的值,其他线程能够立即得知这个修改。volatile就是干这个的。

    java内存模型是通过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量。
    普通变量与volatile变量的区别是:volatile的特殊规则保证了新值能立即同步到主内存,以及每次使用前立即从主内存刷新。
    能保证可见性还有synchronized和final

    • 有序性

      • MM的有序性表现为:如果在本线程内观察,所有的操作都是有序的;如果在一个线程中观察另一个线程,所有的操作都是无序的。前半句指“线程内表现为串行的语义”(as-if-serial),后半句值“指令重排序”和普通变量的”工作内存与主内存同步延迟“的现象。

    volatile,与synchronized 可以保证有序性。

    重排序

    • 在执行程序时为了提高性能,编译器和处理器经常会对指令进行重排序。从硬件架构上来说,指令重排序是指CPU采用了允许将多条指令不按照程序规定的顺序,分开发送给各个相应电路单元处理,而不是指令任意重排。重排序分成三种类型:

    • 编译器优化的重排序:编译器在不改变单线程程序语义放入前提下,可以重新安排语句的执行顺序。

    • 指令级并行的重排序:现代处理器采用了指令级并行技术来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。

    • 内存系统的重排序:由于处理器使用缓存和读写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。

    • 内存屏障指令

      • LoadLoad、LoadStore、StoreLoad和StoreStore

    先行发生原则(happens-before)

    • 如果操作A先行发生于操作B,则A产生的影响能被操作B观察到,“影响”包括修改了内存中共享变量的值、发送了消息、调用了方法等。如果两个操作满足happens-before原则,那么不需要进行同步操作,JVM能够保证操作具有顺序性,此时不能够随意的重排序。否则,无法保证顺序性,就能进行指令的重排序。

    • 基本规则

      • 程序次序规则(Program Order Rule):在同一个线程中,按照程序代码顺序,书写在前面的操作先行发生于书写在后面的操纵。准确的说是程序的控制流顺序,考虑分支和循环等。
      • 管理锁定规则(Monitor Lock Rule):一个unlock操作先行发生于后面(时间上的顺序)对同一个锁的lock操作。
      • volatile变量规则(Volatile Variable Rule):对一个volatile变量的写操作先行发生于后面(时间上的顺序)对该变量的读操作。
      • 线程启动规则(Thread Start Rule):Thread对象的start()方法先行发生于此线程的每一个动作。
      • 线程终止规则(Thread Termination Rule):线程的所有操作都先行发生于对此线程的终止检测,可以通过Thread.join()方法结束、Thread.isAlive()的返回值等手段检测到线程已经终止执行。
      • 线程中断规则(Thread Interruption Rule):对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断时事件的发生。Thread.interrupted()可以检测是否有中断发生。
      • 对象终结规则(Finilizer Rule):一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()的开始。
      • 传递性(Transitivity):如果操作A 先行发生于操作B,操作B 先行发生于操作C,那么可以得出A 先行发生于操作C。

    参考资料

    https://docs.oracle.com/javase/8/docs/api/

    https://blog.csdn.net/piaoslowly/article/details/81460002

    https://segmentfault.com/a/1190000015739343

    https://segmentfault.com/a/1190000015752512


    Kotlin 开发者社区

    国内第一Kotlin 开发者社区公众号,主要分享、交流 Kotlin 编程语言、Spring Boot、Android、React.js/Node.js、函数式编程、编程思想等相关主题。

    Kotlin 开发者社区

    学习笔记思维导图:

    相关文章

      网友评论

        本文标题:Java并发编程实战: AQS 源码 史上最详尽图解+逐行注释

        本文链接:https://www.haomeiwen.com/subject/wbtskctx.html