美文网首页Java 杂谈
JDK 1.8 atomic 包

JDK 1.8 atomic 包

作者: 阿波罗程序猿 | 来源:发表于2018-08-03 15:07 被阅读1次

    只谈谈,不全覆盖

    CAS

    悲观锁,假设执行当前区域代码都会产生冲突,为了解决冲突,线程A获取锁时,其它线程会被阻塞处于停顿状态,直到锁释放(可能会造成死锁)。CAS概念属于一种乐观锁的策略(或者称为无锁),假设执行当前区域代码都不会发生冲突,不会发生冲突就自然没有线程阻塞,也不会产生死锁问题。如果出现了冲突,CAS(compare and swap) 比较和替换就会不断的去重试,直到当前操作没有冲突。

    CAS 的 ABA 问题

    1. 线程1从内存M位置取出A
    2. 线程2从内存M位置取出A
    3. 线程2做了预期值比较,将A替换为B并放到M位置
    4. 线程2从内存M位置取出B
    5. 线程2做了预期值比较,将B替换为A并放到M位置
    6. 线程1做了预期值比较,将A替换为C并放到M位置

    此时线程1影响了线程2的状态,也就发生了ABA的问题。所以为了解决乐观锁并发时造成的ABA问题,都会使用AtomicStampedReference 类或者AtomicMarkableReference 类

    volatile

    volatile从主内存中加载到线程工作内存中的值是最新的。也就是说它解决的是多线程并发变量读时的可见性问题,但无法保证访问变量的原子性。而且volatile只能修饰变量。

    原子基本类型

    • AtomicBoolean 保证布尔值的原子性
    • AtomicInteger 保证整型的原子性
    • AtomicLong 保证长整型的原子性

    原子数组

    • AtomicIntegerArray 保证整型数组的原子性
    • AtomicLongArray 保证长整型数组原子性

    原子字段

    • AtomicIntegerFieldUpdater 保证整型的字段更新
    • AtomicLongFieldUpdater 保证长整型的字段更新

    使用原子字段类时,所声明的字段类型必须为volatile

    使用方法如下:

        private int sum = 100;
        private volatile int sum1 = 100;
        // 当 sum 或 sum1 为100 时只允许有一个线程进入
        private void atomic4() {
            AtomicIntegerFieldUpdater<T> tAtomicIntegerFieldUpdater =  AtomicIntegerFieldUpdater.newUpdater(T.class, "sum1");
            T t = new T();
            for (int i = 0; i < 10; i++) {
                singleThreadPool.execute(() -> {
                    if (sum == 100) {
                        System.out.println(Thread.currentThread().getName() + " :" + "已修改");
                    }
                });
            }
    
            System.out.println("=====");
    
            for (int i = 0; i < 10; i++) {
                singleThreadPool.execute(() -> {
                    if(tAtomicIntegerFieldUpdater.compareAndSet(t, 100, 120)){
                        System.out.print(Thread.currentThread().getName() + " :" + "tAtomicIntegerFieldUpdater 已修改");
                    }
                });
            }
            singleThreadPool.shutdown();
        }
    

    原子引用类型

    • AtomicReferenceArray 保证引用数组的原子性
    • AtomicReferenceFieldUpdater 保证引用类型的字段更新
    • AtomicStampedReference 可以解决CASABA问题,类似提供版本号
    • AtomicMarkableReference 可以解决CASABA问题,提供是或否进行判断

    原子累加器 JDK 1.8 新增

    原有的 Atomic系列类通过CAS来保证并发时操作的原子性,但是高并发也就意味着CAS的失败次数会增多,失败次数的增多会引起更多线程的重试,最后导致AtomicLong的效率降低。新的四个类通过减少并发,将单一value的更新压力分担到多个value中去,降低单个value的“热度”以提高高并发情况下的吞吐量

    • DoubleAccumulator
    • DoubleAdder
    • LongAccumulator
    • LongAdder

    实例应用

    • 只贴了代码片段。验证累加器、整型、数组的原子性
    package concurrent;
    
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author 张博
     */
    public class ThreadFactoryBuilder implements ThreadFactory {
    
        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        ThreadFactoryBuilder() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
        }
    
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
    
    
    private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder();
        private static ExecutorService singleThreadPool = new ThreadPoolExecutor(1000, 5000,
                10, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
    
        // 阻塞主线程,循环5000次后通知主线程关闭
        private CountDownLatch begin = new CountDownLatch(5000);
        // 模拟并发量一次200
        private Semaphore semaphore = new Semaphore(200);
        private static int count = 0;
        private void atomic7() {
            DoubleBinaryOperator doubleBinaryOperator = (x, y) -> x + y;
            DoubleAccumulator doubleAccumulator = new DoubleAccumulator(doubleBinaryOperator, count);
            AtomicInteger atomicInteger = new AtomicInteger(0);
            int[] ints = new int[5000];
            AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5000);
            for (int i = 0; i < 5000; i++) {
                singleThreadPool.execute(() -> {
                    try {
                        // 允许200个线程进入,模拟提供稳定并发量
                        semaphore.acquire();
                        count();
                        atomicCount(doubleAccumulator);
                        atomicIntegerCount(atomicInteger);
                        array(ints);
                        atomicArray(atomicIntegerArray);
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 每执行一次减1
                    begin.countDown();
                });
            }
            try {
                // 没到0一直等待,直到模拟并发结束
                begin.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            singleThreadPool.shutdown();
            System.out.println(count);
            System.out.println(doubleAccumulator.get());
            System.out.println(atomicInteger.get());
            System.out.println(Arrays.toString(ints));
            System.out.println("=========================================================================================================");
            System.out.println(atomicIntegerArray.toString());
            for (int i = 0; i < atomicIntegerArray.length(); i++) {
                if ((atomicIntegerArray.get(i) - 5000) != 0) {
                    System.out.println(atomicIntegerArray.get(i));
                }
            }
        }
    
        /**
         * 时间:2018/8/3 上午11:48
         * @apiNote 线程不安全的累加
         */
        private void count() {
            count++;
        }
    
        /**
         * 时间:2018/8/3 上午11:48
         * @apiNote 原子累加
         */
        private void atomicCount(DoubleAccumulator doubleAccumulator) {
            doubleAccumulator.accumulate(1);
        }
    
        /**
         * 时间:2018/8/3 上午11:48
         * @apiNote 原子的i++
         */
        private void atomicIntegerCount(AtomicInteger atomicInteger) {
            atomicInteger.incrementAndGet();
        }
    
        /**
         * 时间:2018/8/3 上午11:48
         * @apiNote 线程不安全的数组操作
         */
        private void array(int[] ints) {
            for(int k = 0; k < 5000; k++) {
                ints[k] += 1;
            }
        }
    
        /**
         * 时间:2018/8/3 上午11:48
         * @apiNote 原子的数组操作
         */
        private void atomicArray(AtomicIntegerArray atomicIntegerArray) {
            for(int k = 0; k < 5000; k++) {
                atomicIntegerArray.addAndGet(k, 1);
            }
        }
    

    使用AtomicStampedReference解决CASABA问题

        /**
         * 时间:2018/8/3 上午11:58
         * @apiNote 模拟并发导致 CAS 的 ABA 问题
         */
        private void aba() {
            // 原子引用类型
            AtomicReference<String> stringAtomicReference = new AtomicReference<>("A");
            // 原子时间戳引用
            AtomicStampedReference<String> stampedReference = new AtomicStampedReference<>("A", 0);
    
            // 线程1
            singleThreadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " : -> stringAtomicReference " + stringAtomicReference.compareAndSet("A", "B"));
                System.out.println(Thread.currentThread().getName() + " : -> stringAtomicReference " + stringAtomicReference.compareAndSet("B", "A"));
                System.out.println("=====");
            });
    
            // 线程2
            singleThreadPool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " : -> stringAtomicReference " + stringAtomicReference.compareAndSet("A", "C"));
                System.out.println("=====");
            });
    
            // 线程3
            singleThreadPool.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " :" + stampedReference.compareAndSet("A", "B", stampedReference.getStamp(), stampedReference.getStamp() + 1));
                System.out.println(Thread.currentThread().getName() + " :" + stampedReference.compareAndSet("B", "A", stampedReference.getStamp(), stampedReference.getStamp() + 1));
                System.out.println("=====");
            });
    
            // 线程4
            singleThreadPool.execute(() -> {
                // 模拟并发时与线程3同时得到内存中的A的时间戳
                int stamp = stampedReference.getStamp();
                System.out.println(Thread.currentThread().getName() + " : 线程4 处理 cas 之前" + stamp);
                // 线程4休眠2秒,模拟让线程3已经操作完成 A -> B -> A 的 CAS
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                // 得到线程3操作完的时间戳
                System.out.println(Thread.currentThread().getName() + " :" + stampedReference.getStamp());
                // 线程4进行 A -> C 的 CAS 操作。这时会失败。解决 ABA 问题
                System.out.println(Thread.currentThread().getName() + " :" + stampedReference.compareAndSet("A", "C", stamp, stamp + 1));
            });
    
            singleThreadPool.shutdown();
        }
    

    相关文章

      网友评论

        本文标题:JDK 1.8 atomic 包

        本文链接:https://www.haomeiwen.com/subject/hubhvftx.html