转:CAS 和 AQS 原理

作者: Java旅行者 | 来源:发表于2017-08-27 20:26 被阅读305次

    1. CAS

    1.1 概念,什么是 CAS

    CAS,compare and swap的缩写,中文翻译成比较并交换。CAS指令在Intel CPU上称为CMPXCHG指令,它的作用是将指定内存地址的内容与所给的某个值相比,如果相等,则将其内容替换为指令中提供的新值,如果不相等,则更新失败。

    从内存领域来说这是乐观锁,因为它在对共享变量更新之前会先比较当前值是否与更新前的值一致,如果是,则更新,如果不是,则无限循环执行(称为自旋),直到当前值与更新前的值一致为止,才执行更新。

    1.2 CAS 的应用

    CAS 有 3 个操作数,内存值 V,旧的预期值 A,要修改的新值 B。当且仅当预期值 A 和内存值 V 相同时,将内存值 V 修改为 B,否则什么都不做。

    1.3 CAS 的缺点是什么

    CAS虽然很高效的解决原子操作,但是CAS仍然存在三大问题。ABA问题,循环时间长开销大和只能保证一个共享变量的原子操作

    1. ABA问题。因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是A,变成了B,又变成了A,
      那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。
      在变量前面追加上版本号,每次变量更新的时候把版本号加一,那么A-B-A 就会变成1A-2B-3A。

      从Java1.5开始JDK的atomic包里提供了一个类 AtomicStampedReference 来解决ABA问题。
      这个类的compareAndSet方法作用是首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,
      则以原子方式将该引用和该标志的值设置为给定的更新值。

    1. 循环时间长开销大。自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。如果JVM能支持处理器提供的pause指令那么效率会有一定的提升,
      pause指令有两个作用,第一它可以延迟流水线执行指令(de-pipeline),使CPU不会消耗过多的执行资源,
      延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零。
      第二它可以避免在退出循环的时候因内存顺序冲突(memory order violation)而引起CPU流水线被清空(CPU pipeline flush),从而提高CPU的执行效率。
    1. 只能保证一个共享变量的原子操作。当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,
      但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁,
      或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i=2,j=a,合并一下ij=2a,然后用CAS来操作ij。
      从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,你可以把多个变量放在一个对象里来进行CAS操作。

    1.4 CAS 的原理

    CAS 通过调用 JNI 的代码实现的。JNI:java Native Interface 为 JAVA 本地调用,允许 java 调用其他语言。而compareAndSwapInt就是借助C来调用CPU底层指令实现的。

    下面从分析比较常用的CPU(intel x86)来解释CAS的实现原理。下面是sun.misc.Unsafe类的compareAndSwapInt()方法的源代码:

    public final native boolean compareAndSwapInt(Object o, long offset,
                                                  int expected,
                                                  int x);
    

    结合对应于intel x86处理器的源代码的片段分析,可知程序会根据当前处理器的类型来决定是否为cmpxchg指令添加lock前缀。如果程序是在多处理器上运行,就为cmpxchg指令加上lock前缀(lock cmpxchg)。反之,如果程序是在单处理器上运行,就省略lock前缀(单处理器自身会维护单处理器内的顺序一致性,不需要lock前缀提供的内存屏障效果)。

    2. AQS

    具体细节看文章:

    http://www.cnblogs.com/everSeeker/p/5582007.html , 主要讲 reentrantlock

    http://www.cnblogs.com/waterystone/p/4920797.html , 主要讲 AQS 源码,未读完

    http://www.jianshu.com/p/6afaef97264a , 锁的获取和释放流程,大致的讲解。

    2.1 概述

    AbstractQueuedSynchronizer(简称AQS),队列同步器,是用来构建锁或者其他同步组建的基础框架。该类主要包括:

    1. 模式分为共享和独占。

    2. volatile int state,用来表示锁的状态。state = 0 表示锁空闲,>0 表示锁已被占用。

    3. FIFO双向队列,用来维护等待获取锁的线程。

    image

    独占模式的锁:ReentrantLock

    共享模式的锁:Semaphore,CountDownLatch

    AQS 部分代码说明如下:

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    
        static final class Node {
            /** 共享模式,表示可以多个线程获取锁,比如读写锁中的读锁 */
            static final Node SHARED = new Node();
            /** 独占模式,表示同一时刻只能一个线程获取锁,比如读写锁中的写锁 */
            static final Node EXCLUSIVE = null;
    
            volatile Node prev;
            volatile Node next;
            volatile Thread thread;
        }
    
        /** AQS类内部维护一个FIFO的双向队列,负责同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等
            构造成一个节点Node并加入同步队列;当同步状态释放时,会把首节点中线程唤醒,使其再次尝试同步状态 */
        private transient volatile Node head;
        private transient volatile Node tail;
    
        /** 状态,主要用来确定lock是否已经被占用;在ReentrantLock中,state=0表示锁空闲,>0表示锁已被占用;可以自定义,改写tryAcquire(int acquires)等方法即可  */
        private volatile int state;
    }
    

    这里主要说明下双向队列,通过查看源码分析,队列是这个样子的:

    head -> node1 -> node2 -> node3(tail)

    注意:head初始时是一个空节点(所谓的空节点意思是节点中没有具体的线程信息),之后表示的是获取了锁的节点。因此实际上head->next(即node1)才是同步队列中第一个可用节点。

    AQS的设计基于模版方法模式,使用者通过继承AQS类并重写指定的方法,可以实现不同功能的锁。可重写的方法主要包括:

    image

    不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

    • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

    • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

    • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

    • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

    • tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false。

    一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

    以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

    再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。

    两个重要的状态

    1. AQS的state

    state可以理解有多少线程获取了资源,即有多少线程获取了锁,初始时state=0表示没有线程获取锁。

    独占锁时,这个值通常为1或者0,如果独占锁可重入时,即一个线程可以多次获取这个锁时,每获取一次,state就加1。一旦有线程想要获得锁,就可以通过对state进行CAS增量操作,即原子性的增加state的值,其他线程发现state不为0,这时线程已经不能获得锁(独占锁),就会进入AQS的队列中等待。释放锁是仍然是通过CAS来减小state的值,如果减小到0就表示锁完全释放(独占锁)

    Node 中的waitStatus

    Node的正常状态是0。对于处在队列中的节点来说,前一个节点有唤醒后一个节点的任务,所以对与当前节点的前一个节点来说,如果waitStatus > 0, 则节点处于cancel状态,应踢出队列,如果waitStatus = 0, 则将waitStatus改为-1(signal)。因此队列中节点的状态应该为-1,-1,-1,0

    2.2 源码详解

    acquire(int)

    此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
    }
    

    函数流程如下

    1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回

    2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式

    3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false

    4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上

    流程图:

    image

    release(int)

    此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。下面是release()的源码:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;//找到头结点
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);//唤醒等待队列里的下一个线程
            return true;
        }
        return false;
    }
    

    release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了,如果已经彻底释放资源(state=0),要返回true,否则返回false。

    acquireShared(int)

    此方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。下面是acquireShared()的源码:

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    

    这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:

    1. tryAcquireShared()尝试获取资源,成功则直接返回。

    2. 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。

    跟独占模式比,还有一点需要注意的是:当前线程获取资源成功后,如果还有剩余资源,那么还会唤醒后面的线程来尝试获取资源。

    releaseShared()

    此方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。下面是releaseShared()的源码:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//尝试释放资源
            doReleaseShared();//唤醒后继结点
            return true;
        }
        return false;
    }
    

    跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于可重入的考量;而共享模式下的releaseShared()则没有这种要求,一是共享的实质--多线程可并发执行;二是共享模式基本也不会重入吧(至少我还没见过),所以自定义同步器可以根据需要决定返回值。

    相关文章

      网友评论

      本文标题:转:CAS 和 AQS 原理

      本文链接:https://www.haomeiwen.com/subject/beeidxtx.html