美文网首页Java并发
Java并发开篇--ReentrantLock公平锁的可重入性

Java并发开篇--ReentrantLock公平锁的可重入性

作者: 慕北人 | 来源:发表于2020-04-04 00:37 被阅读0次

    Java并发编程--ReentrantLock可重入性探索

    我们直接先看其公平锁情况下的可重入性到底是怎么回事,由于我们讨论的是公平锁的情况,而相关的代码在ReentrantLock的内部类FairSync中。

    1. lock()

    public void lock() {
        sync.lock();
    }  
    

    由于是公平锁,所以我们需要重FairSync中查看lock方法:

    final void lock() {
        acquire(1);
    }  
    

    而这里的acquire方法继承自AbstractQueuedSynchronizer

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }  
    

    首先我们先提前说一下tryAcquire返回值是一个boolean,为true说明当前线程成功获取了ReentrantLock的锁,并且ReentrantLock锁是一个独占锁,而这个if条件,如果成功获取了锁,那么acquire方法就直接返回了。

    AQS已经为该方法做了方法的实现,在FairSync中我们只要实现tryAcquire方法即可:

    protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {  // 说明当前锁还未被持有
                // hasQueuedPredecessors返回false的情况为:当前线程在等待队列的头部或者等待队列为空
                // 这就说明了:只有等待队列的头结点可以获取锁  
                // Q:什么情况下当前线程已经在头结点了,但是还没有获取锁?
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {   // 这里说明当前线程就是独占的线程
                int nextc = c + acquires;       // 持有锁的线程获取锁的次数,这也表明了可重入性
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);           // 原来可重入性是这么个意思
                return true;         // 如果current再次使用该所对象加锁,那么会直接返回true,可重入就是这么个意思
            }
            return false;
        }
    

    如注释中所说,当前线程能够成功获得锁有两种情况,分别代表首次加锁和重入锁

    • 如果c为0的话,说明当前锁还没有被任何线程独占,这时候会对队列进行判断
      • hasQueuedPredecessors方法返回false有两种情况:当前等待队列为空或者当前线程就是队列头部;此时才可以尝试加锁
      • 一个CAS操作将ReentrantLock的state属性设置为acquire值(调用的时候传递的为1)
      • setExclusiveOwnerThread方法将当前线程设置为独占锁的线程
    • 如果当前线程为独占线程,则保证可重入性:
      • 将state进行加一操作

    从这一个成功加锁的过程我们可以产生一些大胆的推断:

    1. 独占该锁的线程位于等待队列的头部
    2. state属性表示独占的线程加锁的次数,在之后解锁的时候可能也要这么多次数的unlock才可以释放锁
    3. 可重入性的保证就是一句current == getExclusiveOwnerThread()

    注意:有没有发现,对于第一个加锁的线程,它会加锁成功,但是这个第一次加锁的线程,没有被封装成一个Node节点放到队列中;所以说,持有锁的线程是队列的头结点这句话有问题:因为持有锁的线程根本不在队列中,何来头结点一说。在下文分析加锁失败的情况会证明这一论断。

    关于hasQueuedPredecessors方法:

    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }  
    

    首先对于第一次加锁的线程,此时由于h == t == null所以返回false,而对于后面的返回false的情况大都是h != t但是s != null、s.thread == Thread.currentThread()返回的false,也就是只有头结点的后继节点调用该方法时,才会返回false表示可以尝试加锁。同时这也是可重入锁中公平锁的来源,对于之前已经在队列中的节点,那么新来的节点想要加锁,该方法会返回true说明队列中在你之前还有人在等待,得前面没人等待了你才能返回false,才能尝试去加锁,保证了先来后到的公平性

    注意:对于第二个尝试加锁的线程,由于此时前面有一个人持有锁,所以它在调用lock-acquire-tryAcquire方法时由于判断state!=0且当前线程不是独占锁的线程直接判断了加锁失败,从而被添加到了队列中,这条路线中不涉及到hasQueuedPredecessors方法的调用(显然此时它是满足h==t这个判断的)

    加锁失败情况

    还记得acquire方法吗:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }  
    

    如果tryAcquire方法返回false的话,说明加锁失败,同时通过上面的代码我们知道,如果加锁失败的话,当前线程没有被执行各种处理,所以我们在分析acquireQueued方法的时候没有任何后顾之忧,它的代码没有收到tryAcquire的影响:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {   // 注意,这里的意思是头结点的后继节点tryAcquire成功,也就是获取了锁
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;             // 这里说明了,如果头结点的后继节点成功获得了锁,直接返回false
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())          // parkAndCheckInterrupt方法最后一句return Thread.currentThread().interrupted()
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }  
    

    对于这个入队操作,有几点需要说明:

    1. 只有头结点的next节点才会主动调用tryAcquire方法取申请获取锁
      1. 当头结点的next节点成功的得到了锁之后,通过setHead方法会将自己设为头结点
      2. 移除原来的头结点之后return false

    好了,接下来我们就来说一说为啥上面说持有锁的线程不在队列中,如果说上文对于首次加锁的线程没有加入队列产生怀疑的话,那么这里的setHead方法会使你幡然醒悟:

    private void setHead(Node node) {     
        head = node;
        node.thread = null;
        node.prev = null;
    }  
    

    看到了没:node.thread = null把头结点的线程置为空了!!!所以,对于独占到锁的线程来说,它此时已经不再队列中了!!所以说,只有头结点时持有锁的节点这句话不准确!!

    那么问题来了,这个头结点时怎么初始化的呢?

    头结点的初始化

    答案就在调用acquireQueue方法时的addWaiter方法:

    private Node addWaiter(Node mode) {  
        // 思路:新键当前线程的Node节点;如果tail被初始化过,则直接添加到尾部;否则执行enq操作先初始化tail再入队
        // 根据注释:mode传递的值要么是Node.SHARED要么是Node.EXCLUSIVE  
    
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }  
    

    这里的代码思路很简单,就是构建Node节点,然后插入到尾部,对于这个if判断语句,为false的情况会执行enq方法,在enq方法里面会有头结点的初始化:

    private Node enq(final Node node) {
        // 思路:获取尾部,将参数中的node节点直接加入尾部;然后CAS更新tail引用 
    
        for (;;) {
    
            // 获取尾部
            Node t = tail;
    
            // 如果tail为空,说明是第一次入队操作,通过CAS初始化节点  
            // 这里初始化只是调用了默认构造器,目的是为了第二次for循环时tail不为null
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                // 因为tail是AQS在并发环境下的共享资源,所以修改tail变量要使用CAS操作
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }  
    

    有没有发现,这个头结点的thread没有传递是参数,是一个null,这也证明了我们之前说的是正确的。

    所以我们就知道了,在第一次有线程加锁的时候,它会成功得到锁,那么当第二个线程需要等待这个锁的时候,调用addWaiter的时候会初始化链表的头结点。

    阻塞线程的挂起

    可能有的人会问,每一个阻塞的节点都有一个无限for循环自旋,那么线程没有被挂起的话岂不是很浪费cpu资源?挂起的操作就在for循环的后面

    if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())          // parkAndCheckInterrupt方法最后一句return Thread.currentThread().interrupted()
                    interrupted = true;  
    

    其中shouldParkAfterFailedAcquire(p, node)方法用来判断是否需要挂起获取锁失败的线程:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
    
            // 判断是否可以挂起的思想是:如果该节点的pred节点的waitStatus已经被设置为了SIGNAL  
            // 那么说明该节点已经料理好后事了,可以在某个时刻被唤醒,所以可以安全的挂起
            return true;
        if (ws > 0) {    // 说明pred已经被cancelled了,直接移除
         
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
    
            // 说明pred的waitStatue是0或者PROPAGATE,此时设置为SINGAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }  
    

    看到了吗,只有参数中的pred,也就是需要被挂起的节点node的前一个节点的waitStatus为SINGLE的时候,才会返回true。而我们调用addWaiter方法创建Node节点的时候,waitStatue都是默认值0,所以在该方法的后面else语句中有一个CAS操作将其设置为SINGLE,这样的话,在acquireQueued方法的for自旋中,需要被挂起的线程经历两个for循环就可以使得shouldParkAfterFailedAcquire方法返回true

    之后,真正为node执行挂起的操作位于parkAndCheckInterrupt方法

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }  
    

    这样的话,没有获取到锁的线程就真的被挂起了。

    2. unlock()

    public void unlock() {
        sync.release(1);
    }  
    

    release方法同acquire方法一样,都是在AQS中又方法实现的:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }  
    

    AQS的实现类只需要重写tryRelease方法即可。

    protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }  
    

    tryRelease方法很简单,就是将ReentrantLock的state值减去一,然后如果此时state为0说明独占的线程已经完全释放了锁,此时可以解除绑定,否则返回false。

    而真正实现释放锁后唤醒其他线程的方法位于release中的

    if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);  
    

    我们上面在分析线程的挂起的时候说到了要想挂起,那么node的前一个节点的waitStatus必须为SINGLE,而SINGLE在Node这个类中的值为-1.是小于0的,所以一定会执行到unparkSuccessor方法

    private void unparkSuccessor(Node node) {   // 通过名称,该方法是唤醒后继节点
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        Node s = node.next;  
          
        // 根据注释和代码:这里是处理node参数的next节点为null或者已经取消的情况
        // 此时从尾部开始遍历,找没有被canclled的节点唤醒  
        // PS:为何不从node开始往后找,而是从尾部开始往前找?
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
    
        // 代表参数node的next节点不为空,则唤醒其中的线程
        if (s != null)
            LockSupport.unpark(s.thread);
    }  
    

    这里的LockSupport.unpark方法就是唤起其他线程的地方,且一次只会唤醒一个线程,大部分情况就是头结点的后继节点。

    相关文章

      网友评论

        本文标题:Java并发开篇--ReentrantLock公平锁的可重入性

        本文链接:https://www.haomeiwen.com/subject/yjjrphtx.html