美文网首页
再谈ReentrantLock

再谈ReentrantLock

作者: 程序员札记 | 来源:发表于2022-03-28 09:09 被阅读0次

    经过对AQS坚持不懈地研究,我们明白了,AQS定义了一种排队方式,解决那些暂时未获取到锁的线程该何去何从的问题,但是并没有告诉我们该如何获取、释放资源,而是留给了自定义同步器去实现。那本小节我们去研究下ReentrantLock是怎么获取和释放同步资源的,ReentrantLock又分为偏向锁(默认)和非偏向锁。

    ReentrantLock有三大概念:可重入、公平锁&非公平锁、Condition等待/通知机制

    锁的可重入是怎么实现的

            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    

    setState是AQS里的方法:

        protected final void setState(int newState) {
            state = newState;
        }
    

    private volatile int state; -- 这个变量用来记录线程获取锁的状态

    ReentrantLock在获取锁的时候,如果判断出来当前线程就是锁的拥有者(current == getExclusiveOwnerThread()),就直接给这个状态state加上acquires值,

    其实就是1,所以每次重入ReentrantLock锁时+1,在释放锁时也是每次-1,直到state=0才表示锁完全释放掉。

    公平锁&非公平锁

    首先,我们要知道ReentrantLock默认是非公平锁:

        public ReentrantLock() {
            sync = new NonfairSync();
        }
    

    当然也可以在创建锁的时候指定是公平的还是非公平的:

        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    
    image.png

    NonfairSync还有个大兄弟FairSync

    我们带着疑问往下看,什么是公平的什么是非公平的:

    公平锁加锁

            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    

    这段代码主要分为两块:1)if (c==0);2)else if (current == getExclusiveOwnerThread())

    • if (c==0)
      尚没有锁获取到资源,这有可能是因为当前线程是头一个来抢锁的,也有可能是因为排在AQS首位的线程刚刚释放掉资源,保险起见还是先看看AQS里面有没有线程在等待:
      if (!hasQueuedPredecessors() && compareAndSetState(0, acquires))
        public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    

    什么情况下说明AQS里有前驱节点呢?
    head和tail不是同一个节点,且head没有next或虽然有next但next不是当前线程。
    (说明下,head的next节点就是等待队列的首个线程,head其实就是一个队列开始的标记,没有实际意义。)
    判断出来没有线程在等待获取锁,尝试用CAS去获取资源:
    compareAndSetState(0, acquires)
    看看上面的加锁,acquires会传1

    如果CAS成功,当前线程就获取到独占锁:setExclusiveOwnerThread(current)

    • else if (current == getExclusiveOwnerThread())
      当前线程本来就是资源的独占锁,说明之前已经获取到锁了,ok,直接对state累加:
      int nextc = c + acquires;

    非公平锁加锁

        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    

    也不管AQS里有没有人在排队,上去就是干,上去就compareAndSetState(0, 1),万一干成功了呢?
    万一成功了,就锁住资源呗:setExclusiveOwnerThread(Thread.currentThread())。
    万一没成功呢?秒怂,try下试试,acquire(1),跟踪下,这个方法在AQS里面定义,

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    这不就前面独占式获取同步状态的方式吗?尝试获取,失败就加入队列等着。

    AQS的tryAcquire有这么几种实现:


    image.png

    显然我们要的是非公平锁的实现

            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    

    (又跳回ReentrantLock类了)

            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    跟上面公平锁的tryAcquire()方法对比下,仅仅是在if (c==0)后面少了个!hasQueuedPredecessors(),意思是这会儿共享资源的加锁状态是空的,我不管是因为没人来抢,还是因为head刚释放掉,我上来就用CAS去抢锁。
    为什么要这么设计呢?少了个hasQueuedPredecessors()就不用再判断是不是又前驱节点了,效率上会有提升。当然我是我的想法,存在即合理,既然设计出来两种加锁方式,说明不会有绝对优劣。

    解锁

        public void unlock() {
            sync.release(1);
        }
    

    解锁就一个,没有区分公平非公平

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    如果tryRelease释放掉了锁,就通知后继节点(unparkSuccessor)。

    
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
    

    如果减完1还剩0,说明资源已经完全释放了,那就清空对资源的占有(setExclusiveOwnerThread(null))。

    如果减完1还>0,说明还没释放干净(重入锁lock了3次,释放了2次,就还剩1次),更新state值就好了,不释放资源。

    图例

    1. Lock - No contention (thread 0 acquire lock)


      image.png
    2. Thread-1 来获取锁失败


      image.png
    3. 进入acquire -> addWaiter 流程


      image.png
    4. acquireQueued ,前置虚节点 value 设置为SIGNAL


      image.png
    5. Thread-2 , Thread-3 如此类推


      image.png
    6. Thread-0 释放锁


      image.png
    7. Thread-1 获得锁


      image.png
    8. 在第7步时假设有个thread-4 来竞争则谁抢到算谁的,此时thread-4抢到了锁


      image.png
    9. 再看Reentrant


      image.png
    10. Interruput 处理


      image.png
    11. FaireSync


      image.png

    相关文章

      网友评论

          本文标题:再谈ReentrantLock

          本文链接:https://www.haomeiwen.com/subject/zfoujrtx.html