美文网首页
(七)Java并发编程之ReentrantLock

(七)Java并发编程之ReentrantLock

作者: 陪安东尼的漫长岁月 | 来源:发表于2019-11-27 12:56 被阅读0次

    ReentrantLock 是上文提到的 AQS 其中的一个实现类,是一个可重入的互斥锁,和 synchronized 有相同的基本行为和语义,但是具有扩展功能。它由上一次成功锁定并且尚未解锁的线程拥有。

    ReentrantLock 源码初探 (JDK11)

    • ReentrantLock 之构造器
        // 创建公平锁
        public ReentrantLock() {
            sync = new NonfairSync();
        }
        // 创建公平锁(true) or 非公平锁(false)   
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    
    • ReentrantLock 之内部类 Sync
        /** AQS 的实现类, 重写 AQS 中的 protected 方法
        有两个实现类  1、NonfairSync 非公平锁,2、FairSync公平锁 */
        abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -5179523762034025860L;
            
            /** 尝试获取非公平锁 */
            @ReservedStackAccess 
            final boolean nonfairTryAcquire(int acquires) { // 
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) { // 判断state状态是否为0,不为0直接加锁
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current); //独占状态锁持有者指向当前线程
                        return true;
                    }
                }   // state状态不为0 但是锁被当前线程持有 则state+1
                else if (current == getExclusiveOwnerThread()) { 
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;   //加锁失败
            }
            /** 释放锁,公平锁和非公平锁的释放操作是相同的 */
            @ReservedStackAccess
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
            /** 判断持有独占锁的线程是否是当前线程 */
            protected final boolean isHeldExclusively() {
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
        
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
    
            final Thread getOwner() {
                return getState() == 0 ? null : getExclusiveOwnerThread();
            }
            // 获取锁重入次数
            final int getHoldCount() {
                return isHeldExclusively() ? getState() : 0;
            }
    
            final boolean isLocked() {
                return getState() != 0;
            }
    
            private void readObject(java.io.ObjectInputStream s)
                throws java.io.IOException, ClassNotFoundException {
                s.defaultReadObject();
                setState(0); // reset to unlocked state
            }
        }
    
    • ReentrantLock 之内部类 NonfairSync,FairSync
      非公平锁加锁逻辑由内部类Sync实现,一上来就会尝试获取锁资源;公平锁加锁逻辑由本身重写实现,在获取锁资源之前会判断队列中是否有正在等待的节点,如果没有才会尝试获取锁资源。他们之间的重入逻辑则是相同的,如果是当前节点的线程再次请求锁资源则会对该节点的 state + 1,表示重入的次数。
        /** 非公平锁 */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
            // 重写AQS的方法逻辑,由AQS的 acquire(int arg) 方法调用
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires); // 这个方法由Sync实现
            }
        }
        /** 公平锁 */ 
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
            @ReservedStackAccess
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                // 分配锁资源逻辑(这里只有分配成功的操作,对于分配失败的逻辑,在AQS中实现)
                if (c == 0) {
                    // 队列中没有等待节点,则对当前线程分配锁资源
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 重入逻辑 (对当前state的值 +1 )
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    
    • java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
    // 判断队列中是否有等待节点    
    public final boolean hasQueuedPredecessors() {
            Node h, s;
            if ((h = head) != null) {
                // 当 head 的下一个节点为 null 或者 是等待被剔除状态的时候
                if ((s = h.next) == null || s.waitStatus > 0) {
                    s = null; // traverse in case of concurrent cancellation
                   // 从 tail 节点往前遍历,获取最接近 head 的等待节点
                    for (Node p = tail; p != h && p != null; p = p.prev) {
                        if (p.waitStatus <= 0) // 过滤掉队列中撤销等待的节点
                            s = p;
                    }
                }
                // 该等待节点可以被唤醒(当前访问线程不是该等待节点的线程)
                if (s != null && s.thread != Thread.currentThread())
                    return true;
            }
            // 队列中没有等待节点
            return false;
        }
    

    加锁解锁过程

    写到这里,对 ReentrantLock 所实现的逻辑有了一个大概的了解,但是可以发现,上面只有对加锁,解锁进行了操作,但是我们一直提及的节点,队列在上面并没有体现,那么我们加锁失败是怎么操作的呢,传说中的入队出队又是一个什么情况呢,接下来就对 ReentrantLock 的 lock() , unloc() 操作对整个加锁解锁逻辑串联进行梳理。
    这里我们看一下非公平锁的操作吧。

    1. 入列操作
    入列操作.png
    • java.util.concurrent.locks.ReentrantLock.lock
        public void lock() {
            // 这里的 acquire 方法由 AQS 实现
            sync.acquire(1);
        }
    
    • java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire
        public final void acquire(int arg) {
            /** !tryAcquire(arg) 代表获取锁的状态(修改stat,设置线程),获取成功返回true取反则跳出 accquire;
                获取失败返回false取反则进行后续 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 操作*/
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
    • java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
        // 获取锁失败会进行addWaiter操作;将当前线程对应的节点添加到队列中,指向tail
        private Node addWaiter(Node mode) {
            Node node = new Node(mode);
            // 自旋将当前获取锁失败的线程对应的节点添加到队列中,并指向tail。
            for (;;) {
                Node oldTail = tail;
                if (oldTail != null) {
                    node.setPrevRelaxed(oldTail); 
                    if (compareAndSetTail(oldTail, node)) {
                        oldTail.next = node;
                        // 返回的是当前请求线程对应的节点
                        return node;
                    }
                } else {
                    // 初始化同步队列的操作,初始化结束后 head 和 tail 指向的是同一个节点
                    initializeSyncQueue();
                }
            }
        }
        private final void initializeSyncQueue() {
            Node h;
            if (HEAD.compareAndSet(this, null, (h = new Node())))
                tail = h;
        }
       // 初始化独占模式的节点
       /** java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
        .Node(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node) */
       Node(Node nextWaiter) {
           this.nextWaiter = nextWaiter;
           THREAD.set(this, Thread.currentThread());
      }
    
    • java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
        // 这里的 node 为当前线程在队列中对应的节点,arg为获取锁时的标记状态(1)
        final boolean acquireQueued(final Node node, int arg) {
            boolean interrupted = false;  // 线程中断标记
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    /** 如果node的前一个节点为 head 则尝试获取锁,
                        这里是下面中断的线程被唤醒后,重新进入循环,会执行这一段代码
                        这里并没有使用CAS来设置头结点,因为 tryAcquire 里面的CAS操作,
                        只能有一个线程进入到if里面的代码块*/
                    if (p == head && tryAcquire(arg)) {
                        // 加锁成功,将当前节点设置为头节点,代表当前节点出队。
                        setHead(node);  
                        p.next = null; // help GC
                        return interrupted;
                    }
                    // 判断是否可以挂起(通过当前节点的前一个节点的waitStatus属性判断是否可以被唤醒)
                    // 如果该线程被唤醒,则继续循环执行上面的加锁操作,唤醒后将该节点剔除队列
                    if (shouldParkAfterFailedAcquire(p, node))
                        // 执行挂起操作
                        interrupted |= parkAndCheckInterrupt();
                }
            } catch (Throwable t) {
                // 以上操作抛出异常的时候,撤销本次请求
                cancelAcquire(node);
                if (interrupted)
                    selfInterrupt();
                throw t;
            }
        }
    
    • java.util.concurrent.locks.AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire
        // 这里传入的 node 为当前访问线程对应的节点,
        // 确保前节点状态是可以被唤醒
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                // 如果前一个节点是等待状态,则返回true执行挂起操作
                return true;
            if (ws > 0) {
                /** 头节点的 waitStatus 值是0,这里是从当前节点(tail)往前遍历,
                    取最近的一个等待状态的节点 or head节点*/
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
              /** waitStatus必须为0或PROPAGATE。CAS设置属性之后重新进入判断*/
               pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
            }
            return false;
        }
    
    • java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt
        // 对当前线程执行挂起操作
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    
    • java.util.concurrent.locks.AbstractQueuedSynchronizer#cancelAcquire
    // 在执行挂起操作发生异常时,取消正在获取资源的请求 
    // 这里的node是当前请求线程对应的节点
    private void cancelAcquire(Node node) {
            if (node == null)
                return;
            node.thread = null;
    
            // 从当前节点(也就是尾节点)向前遍历,找到最近的一个等待的节点退出循环
            Node pred = node.prev;
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
            // 这里的 predNext 是距离尾节点最近的等待节点的下一个节点(不一定是当前节点)
            Node predNext = pred.next;
            // 标记当前节点为取消状态,其余节点则可以跳过该节点
            node.waitStatus = Node.CANCELLED;
    
            // 如果当前节点是尾节点,则移除自己
            if (node == tail && compareAndSetTail(node, pred)) {
                pred.compareAndSetNext(predNext, null);
            } else {
                int ws;
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                    // 这里是将前一个等待节点 next 设置尾当前节点的 next,将当前 node 剔除;
                    // 假如中间有撤销节点的话,这样操作也会将其过滤掉;
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        pred.compareAndSetNext(predNext, next);
                } else {
                    // 当前节点是head节点 or 当前节点 waitStatus 为 PROPAGATE 时进入当前逻辑
                    // 唤醒当前节点的后继节点
                    unparkSuccessor(node);
                }
                node.next = node; // help GC
            }
        }
    
    1. 出列操作
      同步队列(CLH)遵循FIFO,首节点是获取同步状态的节点,首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点。设置首节点是通过获取同步状态成功的线程来完成的(获取同步状态是通过CAS来完成),只能有一个线程能够获取到同步状态,因此设置头节点的操作并不需要CAS来保证,只需要将首节点设置为其原首节点的后继节点并断开原首节点的next(等待GC回收)应用即可。
    出列操作.png
    • java.util.concurrent.locks.AbstractQueuedSynchronizer#release
        // tryRelease 修改实现类的state修改和节点线程解绑,成功返回true,从头节点开始出队
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    • java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
        private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            if (ws < 0)
                node.compareAndSetWaitStatus(ws, 0);
            Node s = node.next;
            // 这里是判断下一个节点是否撤销等待,
            // 如果撤销的话,找下一个等待节点(从tail开始往前找最远的一个等待节点)
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node p = tail; p != node && p != null; p = p.prev)
                    if (p.waitStatus <= 0)
                        s = p;
            }
            // 对其执行唤醒操作
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    总结:AQS将获取锁,释放锁的操作交由子类去实现,这里是由ReentrantLock的公平锁,非公平锁来实现;其本身实现了共享模式和独占模式的入列出列操作,这里的代码看的是独占模式的入队出队操作。获取锁失败则将当前线程的对应的节点添加到tail,添加时要保证它的前一节点是可以被唤醒的,添加成功后将当前线程挂起;唤醒操作是对head的后继节点进行唤醒,唤醒后会重复执行入队操作中的头节点获取锁的逻辑,获取成功,即可跳出循环。

    相关文章

      网友评论

          本文标题:(七)Java并发编程之ReentrantLock

          本文链接:https://www.haomeiwen.com/subject/rayaictx.html