美文网首页
深入理解CAS机制

深入理解CAS机制

作者: 逅弈 | 来源:发表于2018-02-06 20:30 被阅读807次

    逅弈 欢迎转载,注明原创出处即可,谢谢!

    要实现一个网站访问量的计数器,可以通过一个Long类型的对象,并加上synchronized内置锁的方式。但是这种方式使得多线程的访问变成了串行的,同一时刻只能有一个线程可以更改long的值,那么为了能够使多线程并发的更新long的值,我们可以使用J.U.C包中的Atomic原子类。这些类的更新是原子的,不需要加锁即可实现并发的更新,并且是线程安全的。

    可是Atomic原子类是怎么保证并发更新的线程安全的呢?让我们看一下AtomicLong的自增方法incrementAndGet():

    public final long incrementAndGet() {
        // 无限循环,即自旋
        for (;;) {
            // 获取主内存中的最新值
            long current = get();
            long next = current + 1;
            // 通过CAS原子更新,若能成功则返回,否则继续自旋
            if (compareAndSet(current, next))
                return next;
        }
    }
    private volatile long value;
    public final long get() {
        return value;
    }
    

    可以发现其内部保持着一个volatile修饰的long变量,volatile保证了long的值更新后,其他线程能立即获得最新的值。
    在incrementAndGet中首先是一个无限循环(自旋),然后获取long的最新值,将long加1,然后通过compareAndSet()方法尝试将long的值有current更新为next。如果能更新成功,则说明当前还没有其他线程更新该值,则返回next,如果更新失败,则说明有其他线程提前更新了该值,则当前线程继续自旋尝试更新。

    CAS的基本思想是认为当前环境中的并发并没有那么高,比较乐观的看待整个并发,只需要在更新某个值时先检查下该值有没有发生变化,如果没有发生变化则更新,否则放弃更新。

    CAS的操作其底层是通过调用sun.misc.Unsafe类中的CompareAndSwap的方法保证线程安全的。Unsafe类中主要有下面三种CompareAndSwap方法:

    public final native boolean compareAndSwapObject(Object obj, long offset, Object expect, Object update);
    
    public final native boolean compareAndSwapInt(Object obj, long offset, int expect, int update);
    
    public final native boolean compareAndSwapLong(Object obj, long offset, long expect, long update);
    

    可以看到这些方法都是native的,需要调用JNI接口,也即通过操作系统来保证这些方法的执行。

    以上原子更新操作中除了CAS之外还有一个自旋(无限循环),那么什么是自旋呢?为什么要用自旋呢?下面我们来了解一下自旋

    自旋锁

    • 简述

    跟互斥锁一样,一个线程要想访问被自旋锁保护的共享资源,必须先得到锁,在访问完共享资源后,必须释放锁。

    如果在获取自旋锁时,没有线程保持该锁,那么将立即得到锁;如果在获取自旋锁时锁已经有保持者,那么获取锁的操作将自旋在那里,直到该自旋锁的保持者释放了锁。

    • 优点

    自旋锁比较适用于锁使用者保持锁时间比较短的情况,比如执行一个变量的自增操作。

    正是由于自旋锁使用者一般保持锁时间非常短,因此选择自旋而不是睡眠是非常必要的,自旋锁的效率远高于互斥锁,因为线程的睡眠、唤醒需要操作系统的支持,开销比较大,因此当一个操作保持锁的时间非常短时,不需要将线程挂起或睡眠,而是让线程执行一个忙循环,等到自旋锁的持有者释放了锁之后,当前线程将会获得锁。

    • 缺点

    递归死锁

    试图递归地获得自旋锁必然会引起死锁:递归程序的持有实例在第二个实例循环,以试图获得相同自旋锁时,不会释放此自旋锁

    过多占用cpu资源

    如果不加限制,由于申请者一直在循环等待,因此自旋锁在锁定的时候,如果不成功,不会睡眠,会持续的尝试。

    单cpu的时候自旋锁会让其它process动不了。因此,一般自旋锁实现会有一个参数限定最多持续尝试次数,超出后, 自旋锁放弃当前time slice。 等下一次机会

    虽然CAS可以高效的对某些共享变量进行并发的更改,但是他也是有缺点的,其中之一就是ABA问题。当要更改的值从A变为B,之后又变为A,则检查时可能会发现没有发生变化,实际上已经发生了变化。解决方法是变更之前加上版本号,如1A,2B,3A。可通过AtomicStampedReference来解决ABA问题,这个类的compareAndSet方法,将首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值,否则不予更新。

    除此之外,在并发量非常高的情况下,CAS失败的几率将变得非常高,重试的次数也会跟着增加,越多线程重试,CAS失败的几率就越高,变成恶性循环。因此在并发量非常高的环境中,如果仍然想通过原子类来更新的话,可以使用AtomicLong的替代类:LongAdder。

    将单一value的更新压力分担到多个value中去,降低单个value的“热度”,分段更新,这样,线程数再多也会分担到多个value上去更新,只需要增加value的个数就可以降低value的 “热度”,这样AtomicLong中的恶性循环就可以解决了。

    在LongAdder中cells就是这个“段”,cell中的value就是存放更新值的,这样,当我需要总数时,把cell中的value都累加一下不就可以了么

    让我们看一下LongAdder更新的原则:

    1.当并发低时先采用CAS进行更新,如果更新成功即返回
    2.当并发高且CAS更新失败时,则进入分段更新

    LongAdder的部分代码实现:

    /**
     * Adds the given value.
     *
     * @param x the value to add
     */
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        // 当并发低时先采用CAS进行add,如果更新成功即返回
        // 当并发高且CAS更新失败时,则进入分段更新
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                // 找到cells数组中该值对应的cell对象
                (a = as[getProbe() & m]) == null ||
                // 使用cell对象的cas方法进行更新
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }
    
    /**
     * Handles cases of updates involving initialization, resizing,
     * creating new Cells, and/or contention. See above for
     * explanation. This method suffers the usual non-modularity
     * problems of optimistic retry code, relying on rechecked sets of
     * reads.
     *
     * @param x the value
     * @param fn the update function, or null for add (this convention
     * avoids the need for an extra field or function in LongAdder).
     * @param wasUncontended false if CAS failed before call
     */
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
    

    需要注意的是,虽然AtomicLong等原子类的更新是原子的,但是多个原子操作合并后的操作却不是原子的,也即:原子+原子!=原子,下面将用一个例子来说明该问题:

    private CountDownLatch latch;
    
    private class Discovery{
    
        private Map<String,SlaveNode> slaveNodeMap;
    
        private AtomicInteger slaveIndex;
    
        public Discovery(){
            slaveNodeMap = new HashMap<String,SlaveNode>();
            SlaveNode slaveNode1 = new SlaveNode("127.0.0.1",8081);
            SlaveNode slaveNode2 = new SlaveNode("127.0.0.1",8082);
            slaveNodeMap.put(slaveNode1.getId(),slaveNode1);
            slaveNodeMap.put(slaveNode2.getId(),slaveNode2);
            slaveIndex = new AtomicInteger(0);
        }
    
        public SlaveNode discover() {
            if (slaveNodeMap.size() == 0) {
                System.err.println("No available SlaveNode!");
                return null;
            }
            SlaveNode[] nodes = new SlaveNode[]{};
            nodes = slaveNodeMap.values().toArray(nodes);
            // 通过CAS循环获取下一个可用服务
            // 当当前索引为数组的长度是,将索引值更新为0
            slaveIndex.compareAndSet(nodes.length,0);
            System.out.println("currentIndex=" + slaveIndex + ",currentThread=" + Thread.currentThread().getName());
            // 根据数组的下标获取可用的服务,之后将索引通过原子方式加1
            return nodes[slaveIndex.getAndIncrement()];
        }
    
    }
    
    @Test
    public void testConcurrentDiscover(){
        int loopTimes = 300;
    
        latch = new CountDownLatch(loopTimes);
        Discovery discovery = new Discovery();
        class Runner implements Runnable{
            @Override
            public void run() {
                Object object = discovery.discover();
                System.out.println(String.format("object={%s},currentThread={%s}",(object!=null?object.toString():"null"),Thread.currentThread().getName()));
                latch.countDown();
            }
        }
        for(int i=0;i<loopTimes;i++){
            new Thread(new Runner()).start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    

    该方法执行的可能会成功,即多线程交替获得SlaveNode,但也会出现错误,slaveIndex的值会超过数组的长度,问题就出在这段代码:

    slaveIndex.compareAndSet(nodes.length,0);
    return nodes[slaveIndex.getAndIncrement()];
    

    这两个操作本身都是原子的,但是合并在一起就不是原子的了,因此会出现错误,解决的方法还是对整个执行的过程加锁。

    更多原创好文,请关注「逅弈逐码」

    相关文章

      网友评论

          本文标题:深入理解CAS机制

          本文链接:https://www.haomeiwen.com/subject/aelnzxtx.html