美文网首页
AQS原理(一):基于ReentrantLock

AQS原理(一):基于ReentrantLock

作者: 放开那个BUG | 来源:发表于2018-11-26 15:09 被阅读7次

    AQS介绍

    在使用者的角度上,AQS的功能分为两类:独占功能和共享功能,它的所有子类中,要么实现并使用了它独占功能的 API,要么使用了共享锁的功能,而不会同时使用两套 API,就算是 ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套 API 来实现的

    独占锁

    解读AQS可以从使用了它独占控制功能的子类 ReentrantLock 说起,分析 ReentrantLock 的同时看一看 AQS 的实现,再推理出 AQS 独特的设计思路和实现方式。最后,再看其共享控制功能的实现。

    一般用ReentrantLock,都是这么使用:

            reentrantLock.lock()
            //do something
            reentrantLock.unlock()
    

    ReentrantLock保证 do something 在同一个时间只有一个线程执行这段代码,其他线程被挂起,直到获取锁。所以,ReentrantLock 实现的就是一个独占锁的功能:有且只有一个线程获取到锁,其余线程全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。没错,ReentrantLock 使用的就是 AQS 的独占 API 实现的。

    我们从 ReentrantLock 的实现开始一起看看重入锁是怎么实现的。

    竞争锁

    找到ReentrantLock类的lock()方法:

        public void lock() {
            sync.lock();
        }
    

    ReentrantLock 内部有代理类完成具体操作,ReentrantLock 只是封装了统一的一套 API 而已。并且,ReentrantLock 又分为公平锁和非公平锁,所以,ReentrantLock 内部只有两个 sync 的实现:


    公平锁:每个线程抢占锁的顺序为先后调用 lock 方法的顺序依次获取锁,类似于排队吃饭。

    非公平锁:每个线程抢占锁的顺序不定,谁运气好,谁就获取到锁,和调用 lock 方法的先后顺序无关,类似于堵车时,加塞的那些 XXXX。

    ReentrantLock或者AQS的实现简述如下:有一个被volatile修饰的标志位被叫做key,有没有线程拿走了锁,还需要一个线程安全的队列,维护一堆被挂起的线程。当锁被归还时,能通知这些被挂起的线程,可以来竞争锁。

    而公平锁和非公平锁,唯一的区别是获取锁的时候是先进入队列排队再获取锁,还是先获取锁,不成功再进入队列。

    公平锁
    首先我们看一下ReentrantLock中的公平锁的实现:

            final void lock() {
                acquire(1);
            }
    

    acquire()方法调用的是AQS的acquire()方法:

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    此方法先尝试获取锁,获取不到则创建一个waiter(当前线程)后放入到队列中。

    点击tryAcquire()方法:

        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    

    留空了,Doug Lea 是想留给子类去实现(既然要给子类实现,应该用抽象方法,但是 Doug Lea 没有这么做,原因是 AQS 有两种功能,面向两种使用场景,需要给子类定义的方法都是抽象方法了,会导致子类无论如何都需要实现另外一种场景的抽象方法,显然,这对子类来说是不友好的。)

    既然是子类的tryAcquire()方法,我们查看FairSync的tryAcquire()方法(FairSync几次Sync,而Sync继承AQS):

            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

    首先调用了AQS的getState()方法,因为在 AQS 里面有个叫 state 的标志位 :

        /**
         * The synchronization state.
         */
        private volatile int state;
    
        /**
         * Returns the current value of synchronization state.
         * This operation has memory semantics of a {@code volatile} read.
         * @return current state value
         */
        protected final int getState() {
            return state;
        }
    

    那个state就是我们前面所说的key。

    继续FairSync的tryAcquire()方法:

            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();// 获取当前线程 
                int c = getState();// 获取父类 AQS 中的标志位 
                if (c == 0) {
                     // 如果队列中没有其他线程  说明没有线程正在占有锁!
                    if (!hasQueuedPredecessors() &&
                        // 修改一下状态位,注意:这里的 acquires 是在 lock 的时候传递来 
                        //的,从上面的图中可以知道,这个值是写死的 1
                        compareAndSetState(0, acquires)) {  
                       // 如果通过 CAS 操作将状态为更新成功则代表当前线程获取锁,因            
                      //此,将当前线程设置到 AQS 的一个变量中,说明这个线程拿走了锁。
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 如果不为 0 意味着,锁已经被拿走了,但是,因为 ReentrantLock 是重        
               //入锁,是可以重复 lock,unlock 的,只要成对出现行。一次。这里还要再判 
                //断一次 获取锁的线程是不是当前请求锁的线程。
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;  // 如果是的,累加在 state 字段上就可以了。
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

    到此,如果如果获取锁,tryAcquire 返回 true,反之,返回 false,回到 AQS 的 acquire 方法。如果没有获取到锁,按照我们的描述,应该将当前线程放到队列中去,只不过,在放之前,需要做些包装。

    先看 addWaiter 方法:

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

    用当前线程去构造一个Node对象,node是一个表示 Node 类型的字段,仅仅表示这个节点是独占的,还是共享的,或者说,AQS 的这个队列中,哪些节点是独占的,哪些是共享的。

    创建好节点后,将节点加入到队列尾部,此处,在队列不为空的时候,先尝试通过 cas 方式修改尾节点为最新的节点,如果修改失败,意味着有并发,这个时候才会进入 enq 中死循环,“自旋”方式修改。

    将线程的节点接入到队里中后,当然还需要做一件事: 将当前线程挂起!这个事,由 acquireQueued 来做。

    在解释 acquireQueued 之前,我们需要先看下 AQS 中队列的内存结构,我们知道,队列由 Node 类型的节点组成,其中至少有两个变量,一个封装线程,一个封装节点类型。

    而实际上,它的内存结构是这样的(第一次节点插入时,第一个节点是一个空节点,代表有一个线程已经获取锁,事实上,队列的第一个节点就是代表持有锁的节点):


    黄色结点为队列默认的头节点,每次有线程竞争失败,进入队列后其实都是插入到队列的结点(tail 后面)后面。这个从 enq ()方法可以看出来,上文中有提到 enq ()方法为将节点插入队列的方法:

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    接下来看AQS的acquireQueued()方法:

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                 // 如果当前的节点是 head 说明他是队列中第一个“有效的”节点,因此尝试获取,上文中有提到这个类是交给子类去扩展的。
                        setHead(node);// 成功后,将上图中的黄色节点移除,Node1 变成头节点。
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) && 
                    // 否则,检查前一个节点的状态为,看当前获取锁失败的线程是否需要挂起。
                        parkAndCheckInterrupt()) 
                   // 如果需要,借助 JUC 包下的 LockSopport 类的静态方法 Park 挂起当前线程。知道被唤醒。
                        interrupted = true;
                }
            } finally {
                if (failed) // 如果有异常 
                    cancelAcquire(node);// 取消请求,对应到队列操作,就是将当前节点从队列中移除。
            }
        }
    

    上面的代码有几处需要说明:
    1.Node结点中,除了存储当前线程,结点类型,队列中前后元素的变量,还有一个叫waitStatus的变量,用于描述结点的状态。
    原因是:AQS的队列中,在有并发时,会存取一定数量的结点,每个结点中都有表示线程状态的量,如果有些线程等不及获取锁,需要放弃竞争,退出队列;有些线程等待一些条件满足后才能执行(这里的描述很像某个 J.U.C 包下的工具类,ReentrankLock 的 Condition,事实上,Condition 同样也是 AQS 的子类)等等,总之,各个线程有各个线程的状态,但总需要一个变量来描述它,这个变量就叫 waitStatus, 它有四种状态:

            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
    

    分别表示:

    • 1.结点取消
    • 2.结点等待触发
    • 3.结点等待条件
    • 4.结点状态需要向后传播
      只有当前节点的前一个节点为 SIGNAL 时,才能当前节点才能被挂起。
    1. 对线程的挂起及唤醒操作是通过使用 UNSAFE 类调用 JNI 方法实现的。当然,还提供了挂起指定时间后唤醒的 API,在后面我们会讲到。

    到此为止,一个线程对于锁的一次竞争才告于段落,结果有两种,要么成功获取到锁(不用进入到 AQS 队列中),要么,获取失败,被挂起,等待下次唤醒后继续循环尝试获取锁,值得注意的是,AQS 的队列为 FIFO 队列,所以,每次被 CPU 假唤醒,且当前线程不是出在头节点的位置,也是会被挂起的。AQS 通过这样的方式,实现了竞争的排队策略。

    释放锁

    释放锁的流程简述如下:因为获取锁的线程的结点在AQS的头节点位置,所有先应该释放锁,再将头节点移除,然后让后一个做头节点,并通知它来竞争锁。

    我们可以先从ReentrantLock的FairSync来说明:

        public void unlock() {
            sync.release(1);
        }
    
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    unlock()方法调用了AQS中的release()方法,同样传入了参数1。

    同样,release()方法为空方法,子类自己实现逻辑

    protected final boolean tryRelease(int releases) {
                int c = getState() - releases; 
                if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果释放的线程和获取锁的线程不是同一个,抛出非法监视器状态异常。
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {// 因为是重入的关系,不是每次释放锁 c 都等于 0,直到最后一次释放锁时,才通知 AQS 不需要再记录哪个线程正在获取锁。
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
     }
    

    释放锁,成功后,继续执行unparkSuccessor()方法:

        private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    值得注意的是,寻找的顺序是从队列尾部开始往前去找的最前面的一个 waitStatus 小于 0 的节点。

    到此,ReentrantLock 的 lock 和 unlock 方法已经基本解析完毕了,唯独还剩下一个非公平锁 NonfairSync 没说,其实,它和公平锁的唯一区别就是获取锁的方式不同,一个是按前后顺序一次获取锁,一个是抢占式的获取锁,那 ReentrantLock 是怎么实现的呢?再看两段代码:

        /**
         * Sync object for non-fair locks
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    

    非公平锁的 lock 方法的处理方式是: 在 lock 的时候先直接 cas 修改一次 state 变量(尝试获取锁),成功就返回,不成功再排队,从而达到不排队直接抢占的目的。

        /**
         * Sync object for fair locks
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);
            }
    
              .......
          }
    

    而对于公平锁:则是老老实实的开始就走 AQS 的流程排队获取锁。如果前面有人调用过其 lock 方法,则排在队列中前面,也就更有机会更早的获取锁,从而达到“公平”的目的。

    参考资料

    https://www.infoq.cn/article/jdk1.8-abstractqueuedsynchronizer

    相关文章

      网友评论

          本文标题:AQS原理(一):基于ReentrantLock

          本文链接:https://www.haomeiwen.com/subject/bmcmqqtx.html