美文网首页
ReentrantLock源码分析及AQS原理

ReentrantLock源码分析及AQS原理

作者: ZMXQQ233 | 来源:发表于2020-09-07 19:34 被阅读0次

    ReentrantLock源码分析及AQS原理

    ReentrantLock源码分析

    ReentrantLock(可重入互斥锁)。可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象或者class),不会因为之前已经获取过还没释放而阻塞。

    我们从构造函数开始逐步分析。

    ReentrantLock的两个构造函数,默认使用的是非公平sync对象

    public ReentrantLock() {
            sync = new NonfairSync();
    }
        
    public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
    }
    

    看下继承自Sync的非公平同步类NonfairSync

    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * 非公平锁:线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾排队等待
             */
            final void lock() {
                if (compareAndSetState(0, 1))//CAS
                    setExclusiveOwnerThread(Thread.currentThread());//设置所有者线程为当前线程
                else
                    acquire(1);//失败后尝试获取锁。
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    

    compareAndSetState(0, 1)。0是期望值,1是更新值。只要期待值与当前同步状态state相等,则把state更新为update(更新值)。

    这里state就是当前线程获得锁的状态。先看下state的说明。

    private volatile int state;//state是Volatile修饰的,用于保证一定的可见性和有序性。
    
    1. State初始化的时候为0,表示没有任何线程持有锁。
    2. 当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁时,就会多次+1,这里就是可重入的概念。
    3. 解锁也是对这个字段-1,一直到0,此线程对锁释放。

    所以此处的compareAndSetState(0, 1)就是为了判断当前线程state是否为0,是0则获取锁成功,并更新state为1.

    protected final boolean compareAndSetState(int expect, int update) {
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    

    如果获取锁失败呢?也就是当前线程的state不为0,那就会执行acquire(1);

    //arg代表获取锁的次数
    public final void acquire(int arg) {
            //tryAcquire(arg)为true,代表获取锁成功(当前线程为独占线程)。
            //获取锁失败后,再执行acquireQueued将线程排队。
            if (!tryAcquire(arg) &&
                //addWaiter方法其实就是把对应的线程以Node的数据结构形式加入到双端队列里,返回的是一个包含该线程的Node。而这个Node会作为参数,进入到acquireQueued方法中。acquireQueued方法可以对排队中的线程进行“获锁”操作。
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                //中断当前线程
                selfInterrupt();
        }
    

    这几个方法都是由ReentrantLock中的(非)公平同步类调用,而(Non)fairSync继承自Sync类,Sync类继承自AQS,所以这些方法都在AQS中定义。例如上面的tryAcquire方法就被ReentrantLock重写了,直接调用AQS的tryAcquire会抛出UnsupportedOperationException异常。

    在这里插入图片描述

    可以看到ReentrantLock中的公平同步类和非公平同步类都重写了tryAcquire。

    所以acquire调用的就是NonfairSync中已经重写了的tryAcquire

    //NonfairSync类里最后一个方法,当前acquires=1
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    final boolean nonfairTryAcquire(int acquires) {
                //当前线程
                final Thread current = Thread.currentThread();
                //当前state值
                int c = getState();
                //如果没有线程获取锁
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        //获取锁成功,并将当前线程设置为独占线程
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //如果当前线程为独占线程,则将state加1。
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    查看addWaiter方法前,要先来看下Node节点,也就是未获取到锁的线程加入到了哪种队列中去。

    这就涉及到AQS核心思想如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体---虚拟双向队列(FIFO)---实现的,将暂时获取不到锁的线程加入到队列中。

    AQS使用一个Volatile的int类型的state来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。

    AQS中基本的数据结构——Node,虚拟双向队列(FIFO)中的节点。

    static final class Node {
        /** 标志线程以共享/独占方式等待锁 */
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
    
        /** 表示线程获取锁的请求已经取消 */
        static final int CANCELLED =  1;
        /** 表示线程已经准备好,就等资源释放了(被唤醒了) */
        static final int SIGNAL    = -1;
        /** 表示节点在等待队列中,节点线程等待唤醒 */
        static final int CONDITION = -2;
        /** 当前线程处在SHARED情况下,该字段才会使用(其他操作介入,也要确保传播继续) */
        static final int PROPAGATE = -3;
        
        //当前节点在队列中的状态
        volatile int waitStatus;
        //前驱指针
        volatile Node prev;
        //后继指针
        volatile Node next;
        //表示处于该节点的线程
        volatile Thread thread;
        //指向下一个处于CONDITION状态的节点
        Node nextWaiter;
        
        /** 返回前驱节点,没有则抛出空指针异常 */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        
        /** 几个Node构造函数 */
        //用来初始化头节点或SHARED标志
        Node() {    
        }
    
        //用于addWaiter方法
        Node(Thread thread, Node mode) {    
            this.nextWaiter = mode;
            this.thread = thread;
        }
    
        //用于Condition
        Node(Thread thread, int waitStatus) { 
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
    //队列头节点
    private transient volatile Node head;
    //队列尾节点
    private transient volatile Node tail;
    

    现在再来看addWaiter方法是如何把线程加入到双端队列的。

    //这里的mode就是Node.EXCLUSIVE,表示独占模式
    private Node addWaiter(Node mode) {
        //通过当前的线程和锁模式新建一个节点。
        Node node = new Node(Thread.currentThread(), mode);
        //pred指向尾节点tail
        Node pred = tail;
        //如果尾节点不为空,则将当前节点插入到尾节点后面(设为尾节点),并返回node
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明已经被别的线程修改),则进入enq初始化head节点,并将node设为尾节点。
        enq(node);
        return node;
    }
    

    注意双向链表中,第一个节点(头节点)为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。

    回到上面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,acquireQueued会把放入队列中的线程不断去获取锁,直到获取成功或者不再需要获取(中断)。

    那么队列中的线程什么时候可以获取到锁?一直获取不到又会怎样呢?

    final boolean acquireQueued(final Node node, int arg) {
        // 标记是否成功拿到资源
        boolean failed = true;
        try {
            // 标记等待过程中是否中断过
            boolean interrupted = false;
            // 开始自旋,要么获取锁,要么中断
            for (;;) {
                // 获取当前节点的前驱节点
                final Node p = node.predecessor();
                // 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点)
                if (p == head && tryAcquire(arg)) {
                    // 获取锁成功,头指针移动到当前node
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)//未成功拿到资源,设置当前节点状态为CANCELLED
                cancelAcquire(node);
        }
    }
    
    // 靠前驱节点判断当前线程是否应该被阻塞,true阻塞,false不阻塞
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前驱结点的节点状态
        int ws = pred.waitStatus;
        // 说明前驱结点处于唤醒状态,此时需要阻塞当前节点,返回true
        if (ws == Node.SIGNAL)
            return true; 
        // 通过枚举值我们知道waitStatus>0是取消状态
        if (ws > 0) {
            do {
                // 循环向前查找取消节点,把取消节点从队列中剔除
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            //直到前面某一个节点非取消节点,将非取消节点连接当前节点
            pred.next = node;
        } else {//前驱节点waitStatus为0或-3。
            // 设置前驱节点等待状态为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    //parkAndCheckInterrupt主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    前面的两个问题得到解决:只要前驱节点是头节点,就尝试获取锁。前驱节点不是头节点,或获取锁失败,则判断是否需要阻塞当前线程。如果前驱节点状态是SIGNAL,表明前驱节点准备获取资源,所以当前节点阻塞;如果前驱节点是取消状态CANCELLED(不再获取资源),移除所有取消状态的前驱节点,继续自旋尝试获取锁;如果前驱节点waitStatus为0(默认值)或-3,则更改前驱节点状态为SIGNAL,继续自旋。

    参考链接 : 美团技术团队:从ReentrantLock的实现看AQS的原理及应用

    相关文章

      网友评论

          本文标题:ReentrantLock源码分析及AQS原理

          本文链接:https://www.haomeiwen.com/subject/xdilektx.html