美文网首页
ReentrantLock源码解析

ReentrantLock源码解析

作者: 千载不变灬 | 来源:发表于2020-05-04 20:33 被阅读0次

    谈到并发,不得不说AbstractQueuedSynchronizer(以下简称AQS)。AQS即是抽象的队列式的同步器,内部定义了很多锁相关的方法,我们熟知的ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore等都是基于AQS来实现。

    AQS中实现同步器功能,主要就通过一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源阻塞时会进入此队列)。

    volatile的state保证多线程的可见性,对state的操作都是通过CAS来保证其并发修改的安全性,当state为1代表当前对象锁已经被占有,其他线程来cas加锁时则会失败,加锁失败的线程会被放入一个FIFO的等待队列中,然后该线程会被UNSAFE.park()操作挂起,等待其他获取锁的线程释放锁才能够被唤醒。

    下面,我们主要选取面试热点ReentrantLock,来一起讲一下AQS中的实现原理。

    一、ReentrantLock数据结构

    ReentrantLock的底层数据结构就是一个FIFO双向队列,队列节点为AQS中的内部静态类Node。数据结构如下:

    FIFO队列

    二、ReentrantLock源码分析

    ReentrantLock总共有三个内部类,NonfairSync和FairSync则继承自Sync,Sync继承自AQS。

    本文通过线程一,二,三竞争锁,来分析RL的源码,主要有下面四个小结

    • 1.线程一上锁成功时AQS内部实现
    • 2.线程二/三加锁失败时AQS中等待队列的数据模型
    • 3.线程一释放锁及线程二获取锁实现原理
    • 4.通过线程场景来讲解公平锁具体实现原理

    1.线程一上锁

    使用时,调用ReentrantLock对象的lock方法获取锁。进入源码查看,实际是调用sync.lock。该lock方式是抽象类Sync中的抽象方法。

    final void lock() {
        if (compareAndSetState(0, 1))//该方法直接尝试CAS指令获取锁,若失败,再通过acquire获取锁
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //公平锁只有else分支中的逻辑,没有if中cas抢锁步骤,这是第一处差别
            acquire(1);
    }
    

    首先会尝试通过AQS的compareAndSetState获取锁,在该方法中是使用Unsafe来实现CAS功能,尝试设置当前对象的stateOffset的值为1。在还没有任何线程获取锁之前,sataeOffset就是为0。
    Unsafe在业务代码中是不推荐使用的,但也有办法可以强行使用,这个就不在这里展开。

    因此,当线程一进来时,通过compareAndSetState成功将state设置为1,直接获取到锁,并设置对象独占锁线程为当前线程。

    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    

    2.线程二/三抢锁

    (1)线程二抢占锁失败
    当线程二进入时,compareAndSetState失败,进入else分支,所以我们继续看acquire的逻辑

        public final void acquire(int arg) {
            if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    首先调用tryAcquire,tryAcquire中继续调用nonfairTryAcquire,这里是JAVA的偏向锁实现:

            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {//c==0代表当前没有线程持有锁
                    if (compareAndSetState(0, acquires)) {
                    //这里是非公平锁和公平锁的第二处差别
                    //非公平锁并没有等待队列中的第一个元素,而是直接尝试获取锁
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //这里是偏向锁实现,如果持有线程是当前线程,则对state进行+1操作,注意这里不是CAS,可以节省时间
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    

    nonfairTryAcquire()方法中首先会获取state的值。

    如果state为0,则执行CAS操作,尝试更新state值为1,如果更新成功则代表当前线程加锁成功。如果加锁失败直接返回false。

    如果不为0则说明当前对象的锁已经被其他线程所占有,接着判断占有锁的线程是否为当前线程,如果是则累加state值,这就是可重入锁和偏向锁的具体实现,累加state值,释放锁的时候也要依次递减state值。

    以线程二为例,因为线程一已经将state修改为1,所以线程二通过CAS修改state的值不会成功。加锁失败,返回false。

    线程二执行tryAcquire()后由于是false,接着执行addWaiter(Node.EXCLUSIVE)逻辑,将自己加入到一个FIFO等待队列的尾部,代码实现如下:

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            //如果尾节点不为空,则设置新进入队列的线程的prev节点为尾节点
            //同时将自己通过CAS设置为尾节点
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //如果尾节点为空,则进入enq方法
            enq(node);
            return node;
        }
    

    这段代码首先构造一个和当前线程绑定的node节点,Node为双向链表。此时等待对内中的tail指针为空,因此直接执行enq(node)将当前线程加入等待队列尾部:

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                //如果尾节点为空,说明当前线程是第一个等待线程,新建立一个dummy节点为首节点
                //然后将首节点的引用赋给tail
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    //若是不为空,设置自己这个节点为tail节点
                    //把原来tail节点的next节点设置为当前节点
                    //自己这个节点的prev节点设置为原来tail节点
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    第一遍循环时tail指针为空,进入if逻辑,使用CAS操作设置head指针,将head指向一个新创建的Node节点。此时AQS中数据:


    可以看到此时head、tail、t都指向第一个Node元素。
    继续循环,第二次进来时,head和tail是同一个节点,因此进入else逻辑,把线程二对应的node节点的前置节点挂载到head节点后面,并把线程二节点设置为tail节点。此时AQS中的数据:


    addWaiter执行完后,会返回当前线程创建的节点信息。
    继续往后执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)逻辑,此时传入的参数为线程二对应的Node节点信息:

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    

    acquireQueued()这个方法会先判断当前传入的Node对应的前置节点是否为head,如果是则尝试加锁。加锁成功过则将当前节点设置为head节点,然后空置之前的head节点,方便后续被垃圾回收掉。

    如果加锁失败或者Node的前置节点不是head节点,就会通过shouldParkAfterFailedAcquire方法将前置节点(对于线程二来说前置节点即是head节点)的waitStatus变为了SIGNAL=-1,最后执行parkAndChecknIterrupt方法,调用LockSupport.park()挂起当前线程:


    (2)线程三抢占锁失败
    线程三抢占和线程二抢占其实很类似了,前面一样,直接看addwaiter:

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            //如果尾节点不为空,则设置新进入队列的线程的prev节点为尾节点
            //同时将自己通过CAS设置为尾节点
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //如果尾节点为空,则进入enq方法
            enq(node);
            return node;
        }
    

    此时,tail不为null,则直接将线程三绑定的node设置为tail节点,并直接返回线程三的node节点。

    然后继续进入acquireQueued,此时线程三前节点不是head节点,因此执行shouldParkAfterFailedAcquire方法将前置节点(对于线程三来说前置节点即是线程二节点)的waitStatus变为了SIGNAL=-1,最后执行parkAndChecknIterrupt方法,调用LockSupport.park()挂起当前线程:


    3.线程一释放锁及线程二获取锁原理

    线程一释放锁线程二获取锁:

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    线程一在unlock的时候,实际调用了ReentrantLock的release方法,首先会调用tryRelease。tryRelease中完成的事情其实比较简单,尝试state减去1,如果为0,则将当前锁持有线程设置为null,并返回true。

    tryRelease返回为true后,则进入if判断,获取head节点,当head节点不为0的时候,进入unparkSuccessor方法,传入head节点。将head节点的waitStatus设置为0,然后唤醒head的next节点,也就是线程二,进行unpark。此时的AQS中的状态:

    此时线程二被唤醒,线程二接着之前被park的地方继续执行,继续执行acquireQueued()方法。

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    重新进入循环,此时获取前置节点为head,并且由于线程一已经释放了锁,因此抢锁成功。进入if判断,设置head节点为线程二的节点,原先的head节点的next设置为空,方便下次垃圾回收。至此,线程二抢锁成功。此时AQS中的数据:


    线程二释放锁线程三获取锁:
    流程和上面大致相同,被唤醒的线程三会再次尝试加锁。此时AQS中的数据:


    总结一下release的流程:
    1.修改state的值为0,设置锁的持有线程为null;
    2.唤醒被park的线程x;
    3.线程x CAS修改state的值,设置锁的持有线程为线程x;
    4.线程x设置成功,则获取到锁。

    4.公平锁具体实现原理

    非公平锁和公平锁主要有两处代码实现有差异:

    (1)公平锁直接走了lock逻辑,非公平锁则还会尝试CAS抢锁。

    非公平锁:



    公平锁:



    这就会导致一个什么问题呢?线程二在释放锁了之后,唤醒了线程三,此时刚好有个线程四来进行CAS抢锁,则有可能被线程四竞争到,线程三需要继续挂起。

    (2)再看下公平锁的tryAcquire方法:

            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    //与非公平锁相比,这里多了一个判断!hasQueuedPredecessors()
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    public final boolean hasQueuedPredecessors() {
        Node t = tail;
        Node h = head;
        Node s;
        //前一个判断为flase时,head和tail都为null或者为同一个节点;为true时,则head和tail不一样,也就是FIFO里面至少有两个节点;
        //后一个判断为false时,说明head后置节点不为null,并且后置节点即为当前线程;其它情况都为true
        //返回false代表队列中没有节点或者仅有一个节点是当前线程创建的节点。返回true则代表队列中存在等待节点,当前线程需要入队等待。
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    先判断head是否等于tail,如果队列中只有一个Node节点或者没有,那么head会等于tail,直接返回false,代表没有等待节点。此时当前线程会尝试获取锁。

    如果head和tail不相等,说明队列中有等待节点,此时h的后置节点肯定不为空,因此主要看h的后置节点是否为当前持有锁的线程,如果是当前线程,也会返回flase。

    总结来说就是:
    1.如果FIFO中只有一个节点或者没有节点,则当前线程可以尝试获取锁;
    2.如果FIFO中有至少两个节点,但当前线程为head后置节点线程,则当前线程也可以尝试获取锁;

    总结下公平和非公平锁的代码实现区别:
    1.在调用lock后,非公平锁首先会调用CAS进行一次抢锁,如果这个时候恰巧锁没有被占用,那么就有可能直接就抢占到锁。
    2.在 CAS 抢锁失败后,公平锁会判断前驱节点中是否有线程处于等待状态,如果有则不去抢锁。
    如果这两次 CAS 都失败,那非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。

    相对来说,非公平锁会有更好的性能,因为它可以减少CPU唤醒线程的开销,整体的吞吐效率会高点,吞吐量会比较大。当然,非公平锁让线程获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态,各有利弊。

    不过相比性能而言,饥饿问题可以暂时忽略,这可能就是ReentrantLock默认创建非公平锁的原因之一了。

    三、总结

    这里用多个线程依次加锁/释放锁来展示了ReentrantLock的实现方式和实现原理,还有公平锁与非公平锁的实现原理,而ReentrantLock底层就是基于AQS实现的,所以对AQS也有了深刻的理解。

    相关文章

      网友评论

          本文标题:ReentrantLock源码解析

          本文链接:https://www.haomeiwen.com/subject/uasxbqtx.html