美文网首页
7、AQS(AbstractQueuedSynchronizer

7、AQS(AbstractQueuedSynchronizer

作者: 火山_6c7b | 来源:发表于2020-05-21 22:53 被阅读0次

    1.AQS简介

    1.1 AQS解释及特点

    所谓AQS,指的是AbstractQueuedSynchronizer,它提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架,ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等并发类均是基于AQS来实现的,具体用法是通过继承AQS实现其模板方法,然后将子类作为同步组件的内部类。

    • J.U.C是基于AQS实现的,AQS是一个同步器,设计模式是模板模式。
    • 核心数据结构:双向链表 + state(锁状态)
    • 底层操作:CAS
    • AQS实际就三个元素 ,被执行的资源状态,当前执行者,后面排队等着的执行者

    1.2 结构:

    image

    AQS维护了一个volatile语义(支持多线程下的可见性)的共享资源变量state和一个FIFO线程等待队列(多线程竞争state被阻塞时会进入此队列)。

    1.3 设计思想

    同步器的核心方法是acquire和release操作,其背后的思想也比较简洁明确。

    acquire:
      while (当前同步器的状态不允许获取操作) {
    
              如果当前线程不在队列中,则将其插入队列
    
              阻塞当前线程
    
      }
    
      如果线程位于队列中,则将其移出队列
    
    release:
      更新同步器的状态
    
      if (新的状态允许某个被阻塞的线程获取成功)
    
               解除队列中一个或多个线程的阻塞状态
    
      从这两个操作中的思想中我们可以提取出三大关键操作:同步器的状态变更、线程阻塞和释放、插入和移出队列。所以为了实现这两个操作,需要协调三大关键操作引申出来的三个基本组件:
    
      ·同步器状态的原子性管理;
    
      ·线程阻塞与解除阻塞;
    
      ·队列的管理;
    

    1.4 ReentrantLock源码结构

    image

    1.41 非公平获取锁

    简单说来,AQS会把所有的请求线程构成一个CLH队列,当一个线程执行完毕(lock.unlock())时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态(park())。如下图所示


    image

    (1)假设这个时候在初始情况下,还没有多任务来请求竞争这个state,这时候如果第一个线程thread1调用了lock方法请求获得锁,首先会通过CAS的方式将state更新为1,表示自己thread1获得了锁,并将独占锁的线程持有者设置为thread1。

    final void lock() {
        if (compareAndSetState(0, 1))
            //setExclusiveOwnerThread是AbstractOwnableSynchronizer的方法,AQS继承了AbstractOwnableSynchronizer
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    

    (2)这个时候有另一个线程thread2来尝试或者锁,同样也调用lock方法,尝试通过CAS的方式将state更新为1,但是由于之前已经有线程持有了state,所以thread2这一步CAS失败(前面的thread1已经获取state并且没有释放),就会调用acquire(1)方法(该方法是AQS提供的模板方法,它会调用子类的tryAcquire方法)。非公平锁的实现中,AQS的模板方法acquire(1)就会调用NofairSync的tryAcquire方法,而tryAcquire方法又调用的Sync的nonfairTryAcquire方法,所以我们看看nonfairTryAcquire的流程。

    //NonfairSync
    protected final boolean tryAcquire(int acquires) {
       return nonfairTryAcquire(acquires);
    }
    final boolean nonfairTryAcquire(int acquires) {
       //(1)获取当前线程
       final Thread current = Thread.currentThread();
       //(2)获得当前同步状态state
       int c = getState();
       //(3)如果state==0,表示没有线程获取
       if (c == 0) {
           //(3-1)那么就尝试以CAS的方式更新state的值
           if (compareAndSetState(0, acquires)) {
               //(3-2)如果更新成功,就设置当前独占模式下同步状态的持有者为当前线程
               setExclusiveOwnerThread(current);
               //(3-3)获得成功之后,返回true
               return true;
           }
       }
       //(4)这里是重入锁的逻辑
       else if (current == getExclusiveOwnerThread()) {
           //(4-1)判断当前占有state的线程就是当前来再次获取state的线程之后,就计算重入后的state
           int nextc = c + acquires;
           //(4-2)这里是风险处理
           if (nextc < 0) // overflow
               throw new Error("Maximum lock count exceeded");
           //(4-3)通过setState无条件的设置state的值,(因为这里也只有一个线程操作state的值,即
           //已经获取到的线程,所以没有进行CAS操作)
           setState(nextc);
           return true;
       }
       //(5)没有获得state,也不是重入,就返回false
       return false;
    }
    

    总结来说就是:

    • 1、获取当前将要去获取锁的线程thread2。
    • 2、获取当前AQS的state的值。如果此时state的值是0,那么我们就通过CAS操作获取锁,然后设置AQS的线程占有者为thread2。很明显,在当前的这个执行情况下,state的值是1不是0,因为我们的thread1还没有释放锁。所以CAS失败,后面第3步的重入逻辑也不会进行
    • 3、如果当前将要去获取锁的线程等于此时AQS的exclusiveOwnerThread的线程,则此时将state的值加1,这是重入锁的实现方式。
    • 4、最终thread2执行到这里会返回false。

    ​ (3)上面的thread2加锁失败,返回false。那么根据开始我们讲到的AQS概述就应该将thread2构造为一个Node结点加入同步队列中。因为NofairSync的tryAcquire方法是由AQS的模板方法acquire()来调用的,那么我们看看该方法的源码以及执行流程。

    //(1)tryAcquire,这里thread2执行返回了false,那么就会执行addWaiter将当前线程构造为一个结点加入同步队列中
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    addWaiter方法的执行流程

    private Node addWaiter(Node mode) {
        //(1)将当前线程以及阻塞原因(是因为SHARED模式获取state失败还是EXCLUSIVE获取失败)构造为Node结点
        Node node = new Node(Thread.currentThread(), mode);
        //(2)这一步是快速将当前线程插入队列尾部
        Node pred = tail;
        if (pred != null) {
            //(2-1)将构造后的node结点的前驱结点设置为tail
            node.prev = pred;
            //(2-2)以CAS的方式设置当前的node结点为tail结点
            if (compareAndSetTail(pred, node)) {
                //(2-3)CAS设置成功,就将原来的tail的next结点设置为当前的node结点。这样这个双向队
                //列就更新完成了
                pred.next = node;
                return node;
            }
        }
        //(3)执行到这里,说明要么当前队列为null,要么存在多个线程竞争失败都去将自己设置为tail结点,
        //那么就会有线程在上面(2-2)的CAS设置中失败,就会到这里调用enq方法
        enq(node);
        return node;
    }
    

    ​ 那么总结一下add Waiter方法

    ​ 1、将当前将要去获取锁的线程也就是thread2和独占模式封装为一个node对象。

    ​ 2、尝试快速的将当前线程构造的node结点添加作为tail结点(这里就是直接获取当前tail,然后将node的前驱结点设置为tail),并且以CAS的方式将node设置为tail结点(CAS成功后将原tail的next设置为node,然后这个队列更新成功)。

    ​ 3、如果2设置失败,就进入enq方法。

    ​ 在刚刚的thread1和thread2的环境下,开始时候线程阻塞队列是空的(因为thread1获取了锁,thread2也是刚刚来请求锁,所以线程阻塞队列里面是空的)。很明显,这个时候队列的尾部tail节点也是null,那么将直接进入到enq方法。所以我们看看enq方法的实现

    private Node enq(final Node node) {
        for (;;) {
            //(4)还是先获取当前队列的tail结点
            Node t = tail;
            //(5)如果tail为null,表示当前同步队列为null,就必须初始化这个同步队列的head和tail(建
            //立一个哨兵结点)
            if (t == null) { 
                //(5-1)初始情况下,多个线程竞争失败,在检查的时候都发现没有哨兵结点,所以需要CAS的
                //设置哨兵结点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } 
            //(6)tail不为null
            else {
                //(6-1)直接将当前结点的前驱结点设置为tail结点
                node.prev = t;
                //(6-2)前驱结点设置完毕之后,还需要以CAS的方式将自己设置为tail结点,如果设置失败,
                //就会重新进入循环判断一遍
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    总结来说,即使在多线程情况下,enq方法还是能够保证每个线程结点会被安全的添加到同步队列中,因为enq通过CAS方式将结点添加到同步队列之后才会返回,否则就会不断尝试添加(这样实际上就是在并发情况下,把向同步队列添加Node变得串行化了)

    (4)在上面AQS的模板方法中,acquire()方法还有一步acquireQueued,这个方法的主要作用就是在同步队列中嗅探到自己的前驱结点,如果前驱结点是头节点的话就会尝试取获取同步状态,否则会先设置自己的waitStatus为-1,然后调用LockSupport的方法park自己。具体的实现如下面代码所示

    final boolean acquireQueued(final Node node, int arg) {
       boolean failed = true;
       try {
           boolean interrupted = false;
           //在这样一个循环中尝试tryAcquire同步状态
           for (;;) {
               //获取前驱结点
               final Node p = node.predecessor();
               //(1)如果前驱结点是头节点,就尝试取获取同步状态,这里的tryAcquire方法相当于还是调
               //用NofairSync的tryAcquire方法,在上面已经说过
               if (p == head && tryAcquire(arg)) {
                   //如果前驱结点是头节点并且tryAcquire返回true,那么就重新设置头节点为node
                   setHead(node);
                   p.next = null; //将原来的头节点的next设置为null,交由GC去回收它
                   failed = false;
                   return interrupted;
               }
               //(2)如果不是头节点,或者虽然前驱结点是头节点但是尝试获取同步状态失败就会将node结点
               //的waitStatus设置为-1(SIGNAL),并且park自己,等待前驱结点的唤醒。至于唤醒的细节
               //下面会说到
               if (shouldParkAfterFailedAcquire(p, node) &&
                   parkAndCheckInterrupt())
                   interrupted = true;
           }
       } finally {
           if (failed)
               cancelAcquire(node);
       }
    }
    

    当前的node结点的前驱结点为head,所以会调用tryAcquire()方法去获得同步状态。但是由于state被thread1占有,所以tryAcquire失败。这里就是执行acquireQueued方法的代码块(2)了。代码块(2)中首先调用了shouldParkAfterFailedAcquire方法,该方法会将同步队列中node结点的前驱结点的waitStatus为CANCELLED的线程移除,并将当前调用该方法的线程所属结点自己和他的前驱结点的waitStatus设置为-1(SIGNAL),然后返回。具体方法实现如下所示。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //(1)获取前驱结点的waitStatus
        int ws = pred.waitStatus;
        //(2)如果前驱结点的waitStatus为SINGNAL,就直接返回true
        if (ws == Node.SIGNAL)
            //前驱结点的状态为SIGNAL,那么该结点就能够安全的调用park方法阻塞自己了。
            return true;
        if (ws > 0) {
            //(3)这里就是将所有的前驱结点状态为CANCELLED的都移除
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //CAS操作将这个前驱节点设置成SIGHNAL。
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    所以shouldParkAfterFailedAcquire方法执行完毕,现在的同步队列情况大概就是这样子,即哨兵结点的waitStatus值变为-1。

    ​ 上面的执行完毕返回到acquireQueued方法的时候,在acquireQueued方法中就会进行第二次循环了,但是还是获取state失败,而当再次进入shouldParkAfterFailedAcquire方法的时候,当前结点node的前驱结点head的waitStatus已经为-1(SIGNAL)了,就会返回true,然后acquireQueued方法中就会接着执行parkAndCheckInterrupt将自己park阻塞挂起。

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    1.42 非公平释放锁

    ​ 上面说一ReentrantLock为例到了怎样去获得非公平锁,那么thread1获取锁,执行完释放锁的流程是怎样的呢。首先肯定是在finally中调用ReentrantLock.unlock()方法,所以我们就从这个方法开始看起。

    ​ (1)从下面的unlock方法中我们可以看出,实际上是调用AQS的release()方法,其中传递的参数为1,表示每一次调用unlock方法都是释放所获得的一次state。重入的情况下会多次调用unlock方法,也保证了lock和unlock是成对的。

    public void unlock() {
        sync.release(1); //这里ReentrantLock的unlock方法调用了AQS的release方法
    }
    public final boolean release(int arg) {
        //这里调用了子类的tryRelease方法,即ReentrantLock的内部类Sync的tryRelease方法
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    (2)上面看到release方法首先会调用ReentrantLock的内部类Sync的tryRelease方法。而通过下面代码的分析,大概知道tryRelease做了这些事情。

    ​ ①获取当前AQS的state,并减去1;

    ​ ②判断当前线程是否等于AQS的exclusiveOwnerThread,如果不是,就抛IllegalMonitorStateException异常,这就保证了加锁和释放锁必须是同一个线程;

    ​ ③如果(state-1)的结果不为0,说明锁被重入了,需要多次unlock,这也是lock和unlock成对的原因;

    ​ ④如果(state-1)等于0,我们就将AQS的ExclusiveOwnerThread设置为null;

    ​ ⑤如果上述操作成功了,也就是tryRelase方法返回了true;返回false表示需要多次unlock。

    protected final boolean tryRelease(int releases) {
        //(1)获取当前的state,然后减1,得到要更新的state
        int c = getState() - releases;
        //(2)判断当前调用的线程是不是持有锁的线程,如果不是抛出IllegalMonitorStateException
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        //(3)判断更新后的state是不是0
        if (c == 0) {
            free = true;
            //(3-1)将当前锁持者设为null
            setExclusiveOwnerThread(null);
        }
        //(4)设置当前state=c=getState()-releases
        setState(c);
        //(5)只有state==0,才会返回true
        return free;
    }
    

    (3)那么当tryRelease返回true之后,就会执行release方法中if语句块中的内容。从上面我们看到,

    if (tryRelease(arg)) {
        //(1)获取当前队列的头节点head
        Node h = head;
        //(2)判断头节点不为null,并且头结点的waitStatus不为0(CACCELLED)
        if (h != null && h.waitStatus != 0)
            //(3-1)调用下面的方法唤醒同步队列head结点的后继结点中的线程
            unparkSuccessor(h);
        return true;
    }
    

    (4)在获取锁的流程分析中,我们知道当前同步队列如下所示,所以判断得到head!=null并且head的waitStatus=-1。所以会执行unparkSuccessor方法,传递的参数为指向head的一个引用h.那下面我们就看看unparkSuccessor方法中处理了什么事情。

    private void unparkSuccessor(Node node) {
        //(1)获得node的waitStatus
        int ws = node.waitStatus;
        //(2)判断waitStatus是否小于0
        if (ws < 0)
            //(2-1)如果waitStatus小于0需要将其以CAS的方式设置为0
            compareAndSetWaitStatus(node, ws, 0);
    
        //(2)获得s的后继结点,这里即head的后继结点
        Node s = node.next;
        //(3)判断后继结点是否已经被移除,或者其waitStatus==CANCELLED
        if (s == null || s.waitStatus > 0) {
            //(3-1)如果s!=null,但是其waitStatus=CANCELLED需要将其设置为null
            s = null;
            //(3-2)会从尾部结点开始寻找,找到离head最近的不为null并且node.waitStatus的结点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //(4)node.next!=null或者找到的一个离head最近的结点不为null
        if (s != null)
            //(4-1)唤醒这个结点中的线程
            LockSupport.unpark(s.thread);
    }
    

    1.5 可重入锁和非可重入锁

    1.51 解释

    可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象或者class),不会因为之前已经获取过还没释放而阻塞。Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。

    之前我们说过ReentrantLock和synchronized都是重入锁,那么我们通过重入锁ReentrantLock以及非可重入锁NonReentrantLock的源码来对比分析一下为什么非可重入锁在重复调用同步资源时会出现死锁。

    1.52 过程

    首先ReentrantLock和NonReentrantLock都继承父类AQS,其父类AQS中维护了一个同步状态status来计数重入次数,status初始值为0。

    当线程尝试获取锁时,可重入锁先尝试获取并更新status值,如果status == 0表示没有其他线程在执行同步代码,则把status置为1,当前线程开始执行。如果status != 0,则判断当前线程是否是获取到这个锁的线程,如果是的话执行status+1,且当前线程可以再次获取锁。而非可重入锁是直接去获取并尝试更新当前status的值,如果status != 0的话会导致其获取锁失败,当前线程阻塞。

    释放锁时,可重入锁同样先获取当前status的值,在当前线程是持有锁的线程的前提下。如果status-1 == 0,则表示当前线程所有重复获取锁的操作都已经执行完毕,然后该线程才会真正释放锁。而非可重入锁则是在确定当前线程是持有锁的线程之后,直接将status置为0,将锁释放。

    1.53 特点
    • 可重入锁的一个优点是可一定程度避免死锁
    • AQS通过控制status状态来判断锁的状态,对于非可重入锁状态不是0则去阻塞;对于可重入锁如果是0则执行,非0则判断当前线程是否是获取到这个锁的线程,是的话把status状态+1,释放的时候,只有status为0,才将锁释放。

    1.6 synchronized和ReentrantLock的比较

    1.区别:

    • 1)Lock是一个接口,而synchronized是Java中的关键字,synchronized是内置的语言实现;

    • 2)synchronized在发生异常时,会自动释放线程占有的锁,因此不会导致死锁现象发生;而Lock在发生异常时,如果没有主动通过unLock()去释放锁,则很可能造成死锁现象,因此使用Lock时需要在finally块中释放锁;

    • 3)Lock可以让等待锁的线程响应中断,而synchronized却不行,使用synchronized时,等待的线程会一直等待下去,不能够响应中断;

    • 4)通过Lock可以知道有没有成功获取锁,而synchronized却无法办到。

    • 5)Lock可以提高多个线程进行读操作的效率。

    • 6)性能区别:在Synchronized优化以前,synchronized的性能是比ReenTrantLock差很多的,但是自从Synchronized引入了偏向锁,轻量级锁(自旋锁)后,两者的性能就差不多了,在两种方法都可用的情况下,官方甚至建议使用synchronized,其实synchronized的优化我感觉就借鉴了ReenTrantLock中的CAS技术。都是试图在用户态就把加锁问题解决,避免进入内核态的线程阻塞。

    1.7 乐观锁和悲观锁

    悲观锁

    • 当我们要对一个数据库中的一条数据进行修改的时候,为了避免同时被其他人修改,最好的办法就是直接对该数据进行加锁以防止并发。这种借助数据库锁机制,在修改数据之前先锁定,再修改的方式被称之为悲观并发控制(又名“悲观锁”,Pessimistic Concurrency Control,缩写“PCC”)。

    乐观锁

    • 乐观锁是相对悲观锁而言的,乐观锁假设数据一般情况下不会造成冲突,所以在数据进行提交更新的时候,才会正式对数据的冲突与否进行检测,如果发现冲突了,则返回给用户错误的信息,让用户决定如何去做。

    • 而 ReentrantLock 作为 Lock 的一种实现,是悲观锁。

    • ReentrantReadWriteLock 的提供了一种乐观锁的实现。

    相关文章

      网友评论

          本文标题:7、AQS(AbstractQueuedSynchronizer

          本文链接:https://www.haomeiwen.com/subject/lhixahtx.html