美文网首页
AQS应用——ReentrantLock源码分析

AQS应用——ReentrantLock源码分析

作者: Walkerc | 来源:发表于2020-04-04 18:06 被阅读0次
    前言

    本文通过可重入锁ReentrantLock的源码分析,加深对aqs和ReentrantLock的理解
    关于AQS相关的知识可以参考我的另一篇文章Java并发——AQS源码解析

    先从使用上入手

    构造方法
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    

    可以看到初始化了一个sync对象,而sync继承了aqs

    abstract static class Sync extends AbstractQueuedSynchronizer
    

    默认初始化的是非公平锁,构造方法传递true即为公平锁
    先以默认的非公平锁为例,后面再总结公平锁

    加锁
    public void lock() {
        sync.lock();
    }
    
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
    
        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                // 加锁成功,设置当前持有锁的线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // aqs的模板方法
                // 加锁失败,说明其他线程先持有了锁,acquire获取锁或者加入等待队列
                acquire(1);
        }
    
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    

    非公平锁tryAcquire

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // 状态为0说明还没有线程持有锁
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 当前线程已经持有锁
        else if (current == getExclusiveOwnerThread()) {
            // 重入锁,状态 +1
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    上面两个条件都不满足,直接返回false,进入等待队列

    释放锁
    public void unlock() {
        sync.release(1);
    }
    
    protected final boolean tryRelease(int releases) {
        // 状态减1
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        // 是否释放锁的标志
        boolean free = false;
        // 状态为0,则清空当前线程信息,标志位置为0
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        // 否则更新状态,返回false
        setState(c);
        return free;
    }
    

    因为是可重入锁,同一个线程,每次加锁状态+1,释放锁,状态-1,这也就是为什么对于同一个线程,lock和unlock调用的次数一定要一致

    超时和响应中断

    aqs提供了超时和响应中断的功能,ReentrantLock也提供了对应的方法
    超时

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        // 直接调用aqs提供的方法
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    

    线程等待超时或被中断,方法会立即返回
    响应中断

    public void lockInterruptibly() throws InterruptedException {
        // 同样也是直接调用aqs提供的获取锁响应中断方法
        sync.acquireInterruptibly(1);
    }
    

    线程被中断会方法立即返回
    关于aqs这两个方法的具体分析可以参考文章开头提到的AQS源码分析

    newCondition

    对于ReentrantLock我们通常也会使用其condition对象进行线程间的通信
    作用类似于wait() 和 notify()

    final ConditionObject newCondition() {
        return new ConditionObject();
    }
    

    newCondition初始化了AQS中的ConditionObject对象
    ConditionObject有如下两个属性,我们可以知道,其内部也是维护了一个链表

    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
    

    直接从我们最常用的await和signal方法入手分析
    ConditionObject#await
    后续会涉及到两个队列,提前有个概念

    • sync同步队列:指的是AQS中等待获取锁的node队列
    • condition队列:指的是调用condition#await方法,等待signal唤醒的node队列
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 添加到condition队列
        Node node = addConditionWaiter();
        // await方法要完全释放锁
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 如果不在sync队列(signal方法会将node放到sync队列中),则阻塞线程
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 线程被唤醒,重新获取锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    

    然后我们按照方法细节再进行分析
    ConditionObject#addConditionWaiter
    添加线程到condition队列

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // 如果尾节点取消的话,则移出队列,获取最后一个未取消的节点
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        // 第一次调用await会走到这里,node状态为condition,表示在condition队列
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    
    // 从头到尾遍历,移除已经取消的node节点
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            }
            else
                trail = t;
            t = next;
        }
    }
    

    假设此时有三个线程调用了condition的await方法阻塞,等待唤醒,那么此时condition队列的状态如下



    看到与aqs同步队列的区别没

    • 没有空的head节点,firstWaiter代表第一个节点,lastWaiter代表最后一个节点
    • waitStatus为Node.CONDITION
    • 虽然同样是Node对象,但是并没有赋值prev和next属性,实际上是单向链表,用nextWaiter连接

    ConditionObject#fullyRelease
    完全释放锁,调用await方法,就代表要完全释放当前持有的锁(可重入)

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            // 获取当前锁状态
            int savedState = getState();
            // 完全释放锁
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    

    ConditionObject#isOnSyncQueue
    判断node是否在同步队列中,如果在同步队列则不阻塞线程

    final boolean isOnSyncQueue(Node node) {
        // 如果状态为condition或者前驱节点为null,则一定在condition队列中
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        // 前后节点都有了,那一定在同步队列了
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }
    
    // 在aqs同步队列中,从tail开始向前找node,找到则返回true
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }
    

    如此看来,只有prev存在而next为null的时候,才会走到最后findNodeFromTail
    注释的意思是说prev不为空,不能代表已经在同步队列中,因为会通过CAS将自己设置为tail,可能会失败,再回顾一下入队操作

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 这里将prev指向之前的tail,prev已经不为空了
                node.prev = t;
                // cas将自己设置为tail,这里成功了才是入队列成功
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    阻塞前的方法都分析完成,接下来先分析下signal方法,便于更好的理解await方法
    ConditionObject#signal
    唤醒condition队列第一个节点(未取消的)

    public final void signal() {
        // 判断是否持有锁
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        // 第一个节点不为空,执行通知
        if (first != null)
            doSignal(first);
    }
    
    private void doSignal(Node first) {
        do {
            // 如果只有一个节点,将lastWaiter置为null
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            // 队列中移除第一个节点
            first.nextWaiter = null;
        // 如果node转换成功,则结束循环,否则继续往后进行唤醒
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    
    // 将node从condition队列中转换到同步队列中
    final boolean transferForSignal(Node node) {
        // 修改状态为0,如果失败说明被取消,返回false接着向后找
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
    
        // 加入同步队列,返回前驱节点p
        Node p = enq(node);
        int ws = p.waitStatus;
        // 如果前驱节点状态大于0(已取消)或者CAS设置状态为SIGNAL失败,则直接唤醒线程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    

    当一个线程node在condition队列中await时,如果被其他线程signal唤醒,那么该node就需要加入aqs同步队列中等待再次获取锁,用来执行await之后的代码

    所以signal方法概括来说就是,将condition队列中的第一个节点移除,并将其加入aqs同步队列中,用图来表示会更直观


    这里最令人迷惑的就是transferForSignal方法
    最最迷惑的就是该方法中最后的条件判断,唤醒操作

    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
    

    这个操作首先是node加入aqs同步队列后,判断它的前驱节点,也就是上图转移到aqs同步队列之后的node-1的状态

    • 如果 > 0 就直接唤醒Thread-1(刚转移到同步队列的线程)
    • 如果 <= 0,但是cas设置其状态为signal时失败,也直接唤醒Thread-1

    思考了好久才有眉目,这里为什么满足这两个条件要提前唤醒呢?

    从正确性上来看,即使这里不唤醒也不会出问题,因为调用signal的线程一定会调用unlock方法,而unlock方法就会唤醒aqs同步队列中的第一个非取消node,所以最终一定会传递下去,唤醒刚刚加入队尾的node-2(Thread-1)

    那我们回头继续看下await方法被唤醒之后的操作

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 添加到condition队列
        Node node = addConditionWaiter();
        // await方法要完全释放锁
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 如果不在sync队列(signal方法会将node放到sync队列中),则阻塞线程
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 线程被唤醒,重新获取锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        // 还有其他节点的话,遍历清除取消的节点
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        // 中断后处理
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    

    被唤醒之后,加入aqs同步队列,跳出while循环,调用acquireQueued方法
    该方法可以看aqs源码回忆一下,主要就这几个步骤

    • 如果是第一个等待的节点,就尝试获取锁
    • 如果不是,则会去判断前驱节点的状态,如果取消则删除,向前找非取消的节点
    • 确保前驱节点未取消,且状态为signal,则将当前线程阻塞

    如果transferForSignal中,判断前驱节点已经取消,或者无法设置为signal状态,不提前唤醒
    那么等调用signal方法的线程unlock之后,去唤醒aqs同步队列的节点
    当一直唤醒到transferForSignal中转移的节点之前时,还是要执行acquireQueued方法
    处理前驱取消的节点,再设置状态,而acquireQueued方法的调用是不需要持有独占锁的
    所以这里为了提高并发性能,让acquireQueued方法和调用signal之后的操作同时进行
    多看几遍源码就能理解,其实就是让如下两段代码并发执行


    被唤醒之后的aqs处理逻辑 signal唤醒之后的业务逻辑

    唤醒之后对中断异常的处理

    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
    }
    
    final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }
    

    1、如果阻塞过程中线程没被中断,则返回0,且加入同步队列中,退出循环
    2、被中断则修改状态为0,并加入到aqs同步队列中
    3、设置状态失败会自旋,直到加入到aqs同步队列中
    这里返回的中断模式有2种,主要是用来做不同的中断后处理

    /** Mode meaning to reinterrupt on exit from wait */
    private static final int REINTERRUPT =  1;
    /** Mode meaning to throw InterruptedException on exit from wait */
    private static final int THROW_IE    = -1;
    
    /**
     * Throws InterruptedException, reinterrupts current thread, or
     * does nothing, depending on mode.
     */
    private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }
    

    第1种情况是在唤醒后中断,唤醒后状态会被设置为0,所以cas会失败,自旋就是在等待加入aqs同步队列之后再返回,否则await方法的循环会退出,可能还没有调用enq方法入队
    Thread.interrupted()会清除中断状态,唤醒后重新interrupt

    第2种情况就是在唤醒前中断,状态一定是CONDITION,最后返回-1
    阻塞时中断,抛出中断异常

    公平锁

    与非公平锁的区别
    lock

    // 公平锁
    final void lock() {
        acquire(1);
    }
    
    // 非公平锁
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    

    首先从lock方法我们可以看到,公平锁的区别就是每次获取锁都直接调用acquire去排队,而非公平锁,先尝试cas获取锁,如果竞争到就持有锁,不管队列是否有其他等待的线程
    tryAcquire

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    如果当前没有线程持有锁,那么要先判断队列中有没有其他线程在等待获取,不能直接尝试获取锁

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    h != t 说明有其他等待节点
    (s = h.next) == null 这个条件什么时候成立呢?
    头尾不相等,但是头的next为空,那就只有一种情况就是某个节点正在加入队列过程中
    还记得enq入队方法么?

    Node t = tail;
    ...
    node.prev = t;
    if (compareAndSetTail(t, node)) {
        t.next = node;
        return t;
    }
    

    先将node的prev指向原来的tail,再通过cas更新tail引用,最后才将前一个节点的next连起来
    所以在t.next = node;执行前,cas成功后,会出现头的next为空
    这种情况就直接返回true

    如果不是的话,那么需要判断是不是当前线程在等待

    总结

    ReentrantLock源码分析就结束了,回顾总结一下

    • ReentrantLock是jdk层面提供的独占锁,基于AQS实现
    • 提供了公平与非公平两种方式
      • 公平:lock时要判断前面有没有其他线程在等待,有先后顺序
      • 非公平:lock时直接尝试cas竞争锁,不管前面是否有其他线程等待
    • 可重入锁,同一个线程lock和unlock方法调用次数一定要对应,保证正确释放锁
    • 通过AQS的ConditionObject对象,提供了线程间通信的方法
    • await时加入condition队列,释放锁
    • signal时唤醒节点,从condition队列转移到aqs同步队列中,重新竞争锁
    • 基于AQS,同样的提供了响应中断,获取锁超时方法

    相关文章

      网友评论

          本文标题:AQS应用——ReentrantLock源码分析

          本文链接:https://www.haomeiwen.com/subject/yxygactx.html