美文网首页Java技术问答JUC 并发专题源码分析
Java并发编程——ReentrantLock实现原理

Java并发编程——ReentrantLock实现原理

作者: 小波同学 | 来源:发表于2021-12-06 19:11 被阅读0次

    一、前言

    ReentrantLock主要利用CAS+AQS队列来实现。它支持公平锁和非公平锁,两者的实现类似。

    CAS:Compare and Swap,比较并交换。CAS有3个操作数:内存值V、预期值A、要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。该操作是一个原子操作,被广泛的应用在Java的底层实现中。在Java中,CAS主要是由sun.misc.Unsafe这个类通过JNI调用CPU底层指令实现

    二、AbstractQueuedSynchronizer

    AbstractQueuedSynchronizer (AQS)类如其名,抽象的队列式同步容器,AQS定义类一套多线程访问共享资源的同步器,是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题。

    AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus。

    AQS以模板方法模式在内部定义了获取和释放同步状态的模板方法,并留下钩子函数供子类继承时进行扩展,由子类决定在获取和释放同步状态时的细节,从而实现满足自身功能特性的需求。除此之外,AQS通过内部的同步队列管理获取同步状态失败的线程,向实现者屏蔽了线程阻塞和唤醒的细节。

    2.1 AQS队列节点数据结构

    • prev:指向队列中的上一个节点;
    • waitStatus:节点的等待状态,初始化为0,表示正常同步等待:
      • CANCELLED=1:节点因超时或者被中断而取消时设置为取消状态;
      • SIGNAL=-1:指示当前节点被释放后,需要调用unpark通知后面节点,如果后面节点发生竞争导致获取锁失败,也会将当前节点设置为SIGNAL;
      • CONDITION=-2:指示该线程正在进行条件等待,条件队列中会用到;
      • PROPAGATE=-3:共享模式下释放节点时设置的状态,表示无限传播下去。
    • thread:当前节点操作的线程;
    • nextWaiter:该字段在Condition条件等待中会用到,指向条件队列的下一个节点。或者链接到SHARED常量,表示节点正在以共享模式等待;
    • next:指向队列中的下一个节点。
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        /**
         * 同步等待队列(双向链表)中的节点
         */ 
        static final class Node {
        
            /** 用于指示节点正在共享模式下等待的标记 */
            static final Node SHARED = new Node();
            
            /** 用于指示节点正在独占模式下等待的标记 */
            static final Node EXCLUSIVE = null;
    
            /** 线程被取消了 */
            static final int CANCELLED =  1;
            
            /** 
             * 如果前驱节点的等待状态是SIGNAL,表示当前节点将来可以被唤醒,那么当前节点就可以安全的挂起了 
             * 否则,当前节点不能挂起 
             */
            static final int SIGNAL    = -1;
            
            /** 线程正在等待条件 */
            static final int CONDITION = -2;
            
            /**
             * 指示下一个acquireShared应无条件传播
             */
            static final int PROPAGATE = -3;
    
            //值就是前四个int值和0(CANCELLED/SIGNAL/CONDITION/PROPAGATE/0)
            volatile int waitStatus;
    
            /**
             * 前驱节点
             */
            volatile Node prev;
    
            /**
             * 后继节点
             */
            volatile Node next;
    
            /**
             * 节点中的线程
             */
            volatile Thread thread;
    
    
            Node nextWaiter;
    
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            /**
             * 返回该节点前一个节点
             */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // 用于addWaiter中
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    }
    

    2.2 AQS同步器数据结构

    AQS中同步等待队列的实现是一个带头尾指针(这里用指针表示引用是为了后面讲解源码时可以更直观形象,况且引用本身是一种受限的指针)且不带哨兵结点(后文中的头结点表示队列首元素结点,不是指哨兵结点)的双向链表。

    如上图,AQS中:

    • state:所有线程通过通过CAS尝试给state设值,当state>0时表示被线程占用;同一个线程多次获取state,会叠加state的值,从而实现了可重入;
    • exclusiveOwnerThread:在独占模式下该属性会用到,当线程尝试以独占模式成功给state设值,该线程会把自己设置到exclusiveOwnerThread变量中,表明当前的state被当前线程独占了;
    • 等待队列(同步队列):等待队列中存放了所有争夺state失败的线程,是一个双向链表结构。state被某一个线程占用之后,其他线程会进入等待队列;一旦state被释放(state=0),则释放state的线程会唤醒等待队列中的线程继续尝试cas设值state;
    • head:指向等待队列的头节点,延迟初始化,除了初始化之外,只能通过setHead方法进行修改;
    • tail:指向等待队列的队尾,延迟初始化,只能通过enq方法修改tail,该方法主要是往队列后面添加等待节点。
    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;//指向队列首元素的头指针
    
    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;//指向队列尾元素的尾指针
    

    为了接下来能够更好的理解加锁和解锁过程的源码,对该同步队列的特性进行简单的讲解:

    • 1、同步队列是个先进先出(FIFO)队列,获取锁失败的线程将构造结点并加入队列的尾部,并阻塞自己。如何才能线程安全的实现入队是后面讲解的重点,毕竟我们在讲锁的实现,这部分代码肯定是不能用锁的。
    • 2、队列首结点可以用来表示当前正获取锁的线程。
    • 3、当前线程释放锁后将尝试唤醒后续处结点中处于阻塞状态的线程。

    同步状态变量

    private volatile int state; //同步状态变量
    

    这是一个带volatile前缀的int值,是一个类似计数器的东西。在不同的同步组件中有不同的含义。以ReentrantLock为例,state可以用来表示该锁被线程重入的次数。当state为0表示该锁不被任何线程持有;当state为1表示线程恰好持有该锁1次(未重入);当state大于1则表示锁被线程重入state次。因为这是一个会被并发访问的量,为了防止出现可见性问题要用volatile进行修饰。

    持有同步状态的线程标志

    /**
     * The current owner of exclusive mode synchronization.
     */
     //持有同步状态的线程标志
    private transient Thread exclusiveOwnerThread;
    
    

    如注释所言,这是在独占同步模式下标记持有同步状态线程的。ReentrantLock就是典型的独占同步模式,该变量用来标识锁被哪个线程持有。

    AbstractQueuedSynchronizer自定义同步器实现

    自定义同步器在实现时只需要实现共享资源state的获取于释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了,自定义同步容器实现时主要实现以下几种方法:

    • 1、isHeldExclusively():该线程是否正在独占资源,只有用到condition才需要取实现它。
    • 2、tryAcquire(int):独占方式,尝试获取资源,成功则返回true,失败则返回false。
    • 3、tryRelease(int):独占方式,尝试释放资源,成功则返回true,失败则返回false。
    • 4、tryAcquireShared(int):共享方式,尝试获取资源,附属表示获取失败,0表示获取成功,但是没有剩余资源可用,其他线程不能在获取,正数表示获取成功,并且有剩余资源,也就是自己可以重复获取(可重入)。
    • 5、tryReleaseShare(int):共享方式。尝试释放资源,成功返回true,失败返回false。
    • 6、以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()的时候,会调用tryAcquire()独占该锁并将state+1。此后其他线程在tryAcquire()独占该锁并将state+1。此后表示其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(几锁释放)为止,其他线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

    了解AQS的主要结构后,就可以开始进行ReentrantLock的源码解读了。

    三、ReentrantLock

    ReentrantLock的基本实现可以概括为:先通过CAS尝试获取锁。如果此时已经有线程占据了锁,那就加入AQS队列并且被挂起。当锁被释放之后,排在CLH队列队首的线程会被唤醒,然后CAS再次尝试获取锁。在这个时候,如果:

    • 非公平锁:如果同时还有另一个线程进来尝试获取,那么有可能会让这个线程抢先获取。

    • 公平锁:如果同时还有另一个线程进来尝试获取,当它发现自己不是在队首的话,就会排到队尾,由队首的线程获取到锁。

    • 可重入锁。可重入锁是指同一个线程可以多次获取同一把锁。ReentrantLock和synchronized都是可重入锁。

    • 可中断锁。可中断锁是指线程尝试获取锁的过程中,是否可以响应中断。synchronized是不可中断锁,而ReentrantLock则提供了中断功能。

    公平锁与非公平锁。公平锁是指多个线程同时尝试获取同一把锁时,获取锁的顺序按照线程达到的顺序,而非公平锁则允许线程“插队”。

    3.1 Reentrantlock类结构

    public class ReentrantLock implements Lock, java.io.Serializable {
    
    }
    

    可以看出Reentrantlock 实现类lock接口,首先看下这个lock接口。

    public interface Lock {
    
        //普通的获取锁的方法,lock会阻塞直到成功
        void lock();
    
        //与lock不同的时,它可以相应中断,
        void lockInterruptibly() throws InterruptedException;
         
        //尝试获取锁,立即返回,不阻塞,成功返回true,失败返回false
        boolean tryLock();
    
        //先尝试获取锁,如果成功则返回true,失败则阻塞等待,等待时间由参数决定,
        //在等待的同时相应中断,抛出中断异常,如果在等待的过程中获得类锁则返回true,
        //否则直到时间超时,则返回false。
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    
        //普通释放锁的方法    
        void unlock();
    
        //新建一个条件,lock可以关联多个条件,关于条件在以后篇幅中记录
        Condition newCondition();
    }
    

    可以看出,显示锁,与synchronied相比,支持非阻塞的方式获取锁,可以响应中断,可以限时,这使得它非常的灵活。

    现在回到Reentrantlock类,可以看到其内部基本上定义类三个内部类:

    public class ReentrantLock implements Lock, java.io.Serializable {
    
        private final Sync sync;
        
        abstract static class Sync extends AbstractQueuedSynchronizer {
            //......
        }
        
        //非公平锁
        static final class NonfairSync extends Sync {
            //......
        }
        
        //公平锁
        static final class FairSync extends Sync {
            //......
        }
    }
    

    很显然从上面的类结构,我们可以得出jdk利用CAS算法和AQS实现类ReentrantLock。

    3.2 构造函数分析

    • ReentrantLock出于性能考虑,默认采用非公平锁。
    public class ReentrantLock implements Lock, java.io.Serializable {
    
        private final Sync sync;
        
        /**
         * Creates an instance of {@code ReentrantLock}.
         * This is equivalent to using {@code ReentrantLock(false)}.
         */
        public ReentrantLock() {
            sync = new NonfairSync();
        }
    
        /**
         * Creates an instance of {@code ReentrantLock} with the
         * given fairness policy.
         *
         * @param fair {@code true} if this lock should use a fair ordering policy
         */
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    }
    

    四、NonfairSync非公平锁

    NonfairSync非公平锁加锁流程(简化):

    • 1、基于CAS尝试将state(锁数量)从0设置为1。
      • A、如果设置成功,设置当前线程为独占锁的线程;
      • B、如果设置失败,还会再获取一次锁数量,
        • B1、如果锁数量为0,再基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;

        • B2、如果锁数量不为0或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。

    4.1 非公平模式下的加锁

    public class ReentrantLock implements Lock, java.io.Serializable {
    
        private final Sync sync;
        
        static final class NonfairSync extends Sync {
    
            final void lock() {
                //直接通过CAS,通过改变共享资源 state值,成功则设置当前线程为持有锁的线程(独占),
                //失败则调用顶层AQS的acquire方法,再次尝试,失败则将线程放到阻塞队列
                if (compareAndSetState(0, 1))
                    //将当前线程标记为已持有锁
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
        }
    }
    

    ReentrantLock 中使用 state 用于表示是否被锁和持有的数量,如果当前未被锁定,则立即获得锁,否则调用acquire(1);方法来获得锁,其中acquire为AQS中的方法,其实现为:

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        /**
         * 该方法主要做了三件事情:
         * 1. 调用 tryAcquire 尝试获取锁,该方法需由 AQS 的继承类实现,获取成功直接返回
         * 2. 若 tryAcquire 返回 false,则调用 addWaiter 方法,将当前线程封装成节点,
         *    并将节点放入同步队列尾部
         * 3. 调用 acquireQueued 方法让同步队列中的节点循环尝试获取锁
         */
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    }
    

    4.1.1、tryAcquire(arg)尝试去获取锁。如果尝试获取锁成功,方法直接返回。

    • 这段代码调用 tryAcquire 方法来尝试获取锁,该方法需要被子类重写,在 NonFairSync 中的实现为:
    public class ReentrantLock implements Lock, java.io.Serializable {
    
        private final Sync sync;
        
        static final class NonfairSync extends Sync {
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
        
        abstract static class Sync extends AbstractQueuedSynchronizer { 
            final boolean nonfairTryAcquire(int acquires) {
                //获取当前线程
                final Thread current = Thread.currentThread();
                //获取state变量值
                int c = getState();
                if (c == 0) {
                    //没有线程占用锁
                    if (compareAndSetState(0, acquires)) {
                        //占用锁成功,设置独占线程为当前线程
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //当前线程已经占用该锁 锁重入
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                        
                    // 更新state值为新的重入次数  
                    setState(nextc);
                    return true;
                }
                //获取锁失败
                return false;
            }
        }
    }
    

    非公平锁tryAcquire的流程是:检查state字段,若为0,表示锁未被占用,那么尝试占用,若不为0,检查当前锁是否被自己占用,若被自己占用,则更新state字段,表示重入锁的次数。如果以上两点都没有成功,则获取锁失败,返回false。

    tryAcquire()尝试获取锁

    不同的锁可以有不同的tryAcquire()实现,所以,你可以看到ReentrantLock锁里面会有非公平锁和公平锁的实现方式。
    ReentrantLock公平锁的实现代码在获取锁之前多了一个判断:!hasQueuedPredecessors(),这个是判断如果当前线程节点之前没有其他节点了,那么我们才可以尝试获取锁,这就是公平锁的体现。

    4.1.2、进入等待队列。由于上文中提到锁已经被占用了,所以执行tryAcquire失败,并且入等待队列。如果锁一直不释放,那么后续线程就会被挂起。

    • 先看下入队的过程。先看addWaiter(Node.EXCLUSIVE)
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        /**
         * 将新节点和当前线程关联并且入队列
         * @param mode 独占/共享
         * @return 新节点
         */
        private Node addWaiter(Node mode) {
            //初始化节点,设置关联线程和模式(独占 or 共享)
            Node node = new Node(Thread.currentThread(), mode);
            // 获取尾节点引用
            Node pred = tail;
            // 尾节点不为空,说明队列已经初始化过
            if (pred != null) {
                node.prev = pred;
                // 设置新节点为尾节点
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            // 尾节点为空,说明队列还未初始化,需要初始化head节点并入队新节点
            enq(node);
            return node;
        }
    }
    
    • 假如多个线程同时尝试入队列,由于队列尚未初始化,tail==null,故至少会有一个线程会走到enq(node)。我们假设同时走到了enq(node)里。
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        /**
         * 初始化队列并且入队新节点
         */
        private Node enq(final Node node) {
            //开始自旋
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    // 如果tail为空,则新建一个head节点,并且tail指向head
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    // tail不为空,将新节点入队
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    }
    

    这里体现了经典的自旋+CAS组合来实现非阻塞的原子操作。由于compareAndSetHead的实现使用了unsafe类提供的CAS操作,所以只有一个线程会创建head节点成功。假设线程B成功,之后B、C开始第二轮循环,此时tail已经不为空,两个线程都走到else里面。假设B线程compareAndSetTail成功,那么B就可以返回了,C由于入队失败还需要第三轮循环。最终所有线程都可以成功入队。

    当线程进入等待队列后,此时AQS队列如下:


    addWaiter(Node mode)进入等待队列

    4.1.3、挂起。B和C相继执行acquireQueued(final Node node, int arg)。这个方法让已经入队的线程尝试获取锁,若失败则会被挂起。

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        /**
         * 已经入队的线程尝试获取锁
         */ 
        final boolean acquireQueued(final Node node, int arg) {
            //标记是否成功获取锁
            boolean failed = true;
            try {
                //标记线程是否被中断过
                boolean interrupted = false;
                for (;;) {
                    //获取前驱节点
                    final Node p = node.predecessor();
                    //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取锁
                    if (p == head && tryAcquire(arg)) {
                        // 获取成功,将当前节点设置为head节点
                        setHead(node);
                        // 原head节点出队,在某个时间点被GC回收
                        p.next = null; // help GC
                        //获取成功
                        failed = false;
                        //返回是否被中断过
                        return interrupted;
                    }
                    // 判断获取失败后是否可以挂起,若可以则挂起
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        // 线程若被中断,设置interrupted为true
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    }
    

    code里的注释已经很清晰的说明了acquireQueued的执行流程。假设B和C在竞争锁的过程中A一直持有锁,那么它们的tryAcquire操作都会失败,因此会走到第2个if语句中。

    我们再看下shouldParkAfterFailedAcquire和parkAndCheckInterrupt都做了哪些事吧。

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        /**
         * 判断当前线程获取锁失败之后是否需要挂起.
         */ 
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            //前驱节点的状态
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                // 前驱节点状态为signal,返回true
                return true;
                
            // 前驱节点状态为CANCELLED 
            if (ws > 0) {
                // 从队尾向前寻找第一个状态不为CANCELLED的节点
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                // 将前驱节点的状态设置为SIGNAL
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
        
        /**
         * 挂起当前线程,返回线程中断状态并重置
         */ 
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }   
    }
    

    线程入队后能够挂起的前提是,它的前驱节点的状态为SIGNAL,所以shouldParkAfterFailedAcquire会先判断当前节点的前驱是否状态符合要求,若符合则返回true,然后调用parkAndCheckInterrupt,将自己挂起。如果不符合,再看前驱节点是否>0(CANCELLED),若是那么向前遍历直到找到第一个符合要求的前驱,若不是则将前驱节点的状态设置为SIGNAL。

    整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心挂起,需要去找个安心的挂起点,同时可以再尝试下看有没有机会去尝试竞争锁。

    注意:
    是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL 来决定,而不是本节点的waitStatus 决定。

    如果当前节点的prev不是head,或者争抢失败,则会将前面节点的状态设置为SIGNAL。

    如果前面的节点状态大于0,表示节点被取消,这个时候会把该节点从队列中移除掉。

    下图为尝试CAS争抢锁,但失败了,然后把head节点状态设置为SIGNAL


    acquireQueued(final Node node, int arg)

    然后再会循环一次尝试获取锁,如果获取失败了,就调用LockSupport.park(this)挂起线程。

    那么时候才会触发唤起线程呢?这个时候我们得先看看释放锁是怎么做的了。

    五、FairSync公平锁

    FairSync公平锁加锁流程(简化):

    • 1、如果锁数量为0,如果当前线程是等待队列中的头节点,基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;

    • 2、如果锁数量不为0或者当前线程不是等待队列中的头节点或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。

    5.1 公平模式下的加锁

    public class ReentrantLock implements Lock, java.io.Serializable {
    
        private final Sync sync;
    
        static final class FairSync extends Sync {
            final void lock() {
                acquire(1);
            }
        }
    }
    

    调用acquire(1);方法来获得锁,其中acquire为AQS中的方法,其实现为:

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        /**
         * 该方法主要做了三件事情:
         * 1. 调用 tryAcquire 尝试获取锁,该方法需由 AQS 的继承类实现,获取成功直接返回
         * 2. 若 tryAcquire 返回 false,则调用 addWaiter 方法,将当前线程封装成节点,
         *    并将节点放入同步队列尾部
         * 3. 调用 acquireQueued 方法让同步队列中的节点循环尝试获取锁
         */
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    }
    

    5.2 FairSync:tryAcquire(int acquires)

    public class ReentrantLock implements Lock, java.io.Serializable {
    
        private final Sync sync;
    
        static final class FairSync extends Sync {
            protected final boolean tryAcquire(int acquires) {
                //获取当前线程
                final Thread current = Thread.currentThread();
                //获取state变量值
                int c = getState();
                if (c == 0) {
                    //如果没有线程持有锁
                    //调用hasQueuedPredecessors函数判断队列中是否有优先于此线程的排队线程
                    //如果没有才去尝试竞争锁
                    
                    // hasQueuedPredecessors()和非公平锁相比,这里多了一个判断:是否有线程在等待
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        //占用锁成功,设置独占线程为当前线程
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                
                //当前线程已经占用该锁 锁重入
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                        
                    // 更新state值为新的重入次数      
                    setState(nextc);
                    return true;
                }
                //获取锁失败
                return false;
            }
        }
    }
    

    可以看到这里的逻辑和NonfairSync唯一的区别就是FairSync多了hasQueuedPredecessors函数的调用,在竞争锁之前需要先看队列中是否有优先于此线程的排队线程,如果有的话,那么FairSync不会去竞争锁,而是直接进入后面入队阻塞的逻辑。那么这里强调的优先于此线程的排队线程是什么意思呢?因为FairSync是公平锁的实现,获取锁总要有个先来后到。

    5.3 hasQueuedPredecessors

    • 判断队列中是否有优先于此线程的排队线程
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        public final boolean hasQueuedPredecessors() {
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    }
    

    步骤:

    • 1、如果h==t成立,h和t均为null或是同一个具体的节点,无后继节点,返回false。
    • 2、如果h!=t成立,head.next是否为null,如果为null,返回true。
      • 什么情况下h!=t的同时h.next==null?,有其他线程第一次正在入队时,可能会出现。见AQS的enq方法,compareAndSetHead(node)完成,还没执行tail=head语句时,此时tail=null,head=newNode,head.next=null。
    • 3、如果h!=t成立,head.next != null,则判断head.next是否是当前线程,如果是返回false,否则返回true(head节点是获取到锁的节点,但是任意时刻head节点可能占用着锁,也可能释放了锁(unlock()),未被阻塞的head.next节点对应的线程在任意时刻都是有必要去尝试获取锁)

    六、释放锁

    NonfairSync非公平锁和FairSync公平锁的释放锁代码一样。

    6.1 unlock()

    步骤:

    • 1、获取当前的锁数量,然后用这个锁数量减去解锁的数量(这里为1),最后得出结果c。

    • 2、判断当前线程是不是独占锁的线程,如果不是,抛出异常。

    • 3、如果c==0,说明锁被成功释放,将当前的独占线程置为null,锁数量置为0,返回true。

    • 4、如果c!=0,说明释放锁失败,锁数量置为c,返回false。

    • 5、如果锁被释放成功的话,唤醒距离头节点最近的一个非取消的节点。

    6.2 ReentrantLock:unlock()

    public class ReentrantLock implements Lock, java.io.Serializable {
    
        private final Sync sync;
        
        /**
         * 释放这个锁
         *1)如果当前线程持有这个锁,则锁数量被递减
         *2)如果递减之后锁数量为0,则锁被释放。
         *如果当前线程不持久有这个锁,抛出异常
         */
        public void unlock() {
            sync.release(1);
        }
    }
    
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        public final boolean release(int arg) {
            //如果成功释放锁
            if (tryRelease(arg)) {
                //获取头节点:(注意:这里的头节点就是当前正在释放锁的节点)
                Node h = head;
                //头结点存在且等待状态不是取消
                if (h != null && h.waitStatus != 0)
                    //唤醒距离头节点最近的一个非取消的节点
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    }
    

    6.3 Sync:tryRelease(int releases)

    最后我们再看下tryRelease的执行过程

    public class ReentrantLock implements Lock, java.io.Serializable {
    
        private final Sync sync;
        
        /**
         * 释放当前线程占用的锁
         * @param releases
         * @return 是否释放成功
         */
        abstract static class Sync extends AbstractQueuedSynchronizer { 
            protected final boolean tryRelease(int releases) {
                // 计算释放后state值
                int c = getState() - releases;
    
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    // 如果不是当前线程占用锁,那么抛出异常
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    // 锁被重入次数为0,表示释放成功
                    free = true;
                    // 清空独占线程
                    setExclusiveOwnerThread(null);
                }
                // 更新state值
                setState(c);
                return free;
            }
        }
    }
    

    6.4 AbstractQueuedSynchronizer:unparkSuccessor(Node node)

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        /**
         * 唤醒离头节点node最近的一个非取消的节点
         * @param node 头节点
         */ 
        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                //将ws设为0状态(即什么状态都不是)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * 获取头节点的下一个等待状态不是cancel的节点
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                /*
                 * 注意:从后往前遍历找到离头节点最近的一个非取消的节点,
                 * 从后往前遍历据说是在入队(enq())的时候,可能nodeX.next==null
                 */         
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                //唤醒离头节点最近的一个非取消的节点
                LockSupport.unpark(s.thread);
        }
    }
    

    tryRelease()具体由子类实现。一般处理流程是让state减1。

    如果释放锁成功,并且头节点waitStatus!=0,那么会调用unparkSuccessor()通知唤醒后续的线程节点进行处理。

    注意:在遍历队列查找唤醒下一个节点的过程中,如果发现下一个节点状态是CANCELLED那么就会忽略这个节点,然后从队列尾部向前遍历,找到与头结点最近的没有被取消的节点进行唤醒操作。

    唤醒之后,节点对应的线程2又从acquireQueued()方法的阻塞处醒来继续参与争抢锁。并且争抢成功了,那么会把head节点的下一个节点设置为null,让自己所处的节点变为head节点:

    这样一个AQS独占式、非中断的抢占锁的流程就结束了。

    七、完整流程

    最后我们再以另一个维度的流程来演示下这个过程。

    首先有4个线程争抢锁,线程1,成功了,其他三个失败了,分别依次入等待队列:

    线程2、线程3依次入队列:


    现在突然发生了点事情,假设线程3用的是带有超时时间的tryLock,超过了等待时间,线程3状态变为取消状态了,这个时候,线程4追加到等待队列中后,发现前一个节点的状态是1取消状态,那么会执行操作把线程3节点从队列中移除掉:

    最后,线程1释放了锁,然后把head节点ws设置为0,并且找到了离head最靠近的一个waitStatus<=0的线程并唤醒,然后参与竞争获取锁:

    最终,线程2获取到了锁,然后把自己变为了Head节点,并取代了原来的Head节点:

    接着就一直这样循环,我就不再画图了,聪明的你应该对此了如指掌了。

    八、可打断原理

    8.1 不可打断模式

    在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        private final boolean parkAndCheckInterrupt() {
            // 如果打断标记已经是 true, 则 park 会失效
            LockSupport.park(this);
            // interrupted 会清除打断标记
            return Thread.interrupted();
        }   
        
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        // 还是需要获得锁后, 才能返回打断状态
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        // 如果是因为 interrupt 被唤醒, 返回打断状态为 true
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }   
        
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                // 如果打断状态为 true
                selfInterrupt();
        }   
        
        static void selfInterrupt() {
            // 重新产生一次中断
            Thread.currentThread().interrupt();
        }   
    }
    

    8.2 可打断模式

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
                
            // 如果没有获得到锁, 进入 ㈠   
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }   
        
        // ㈠ 可打断的获取锁流程
        private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        // 在 park 过程中如果被 interrupt 会进入此
                        // 这时候抛出异常, 而不会再次进入 for (;;)    
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    }
    

    九、总结:

    9.1 公平锁与非公平锁对比

    • FairSync:lock()少了插队部分(即少了CAS尝试将state从0设为1,进而获得锁的过程)
    • FairSync:tryAcquire(int acquires)多了需要判断当前线程是否在等待队列首部的逻辑(实际上就是少了再次插队的过程,但是CAS获取还是有的)。

    ReentrantLock是基于AbstractQueuedSynchronizer实现的,AbstractQueuedSynchronizer可以实现独占锁也可以实现共享锁,ReentrantLock只是使用了其中的独占锁模式。

    相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。

    参考:
    https://blog.csdn.net/fuyuwei2015/article/details/83719444

    https://blog.51cto.com/u_3265857/2369641

    https://segmentfault.com/a/1190000014769953

    https://www.jb51.net/article/105762.htm

    https://www.cnblogs.com/java-zhao/p/5133402.html

    https://www.cnblogs.com/java-zhao/p/5131544.html

    https://www.itzhai.com/articles/aqs-and-lock-implementation-in-concurrent-packages.html

    相关文章

      网友评论

        本文标题:Java并发编程——ReentrantLock实现原理

        本文链接:https://www.haomeiwen.com/subject/kiygxrtx.html