美文网首页Java 杂谈
JDK 1.8 atomic 包

JDK 1.8 atomic 包

作者: 阿波罗程序猿 | 来源:发表于2018-08-03 15:07 被阅读1次

只谈谈,不全覆盖

CAS

悲观锁,假设执行当前区域代码都会产生冲突,为了解决冲突,线程A获取锁时,其它线程会被阻塞处于停顿状态,直到锁释放(可能会造成死锁)。CAS概念属于一种乐观锁的策略(或者称为无锁),假设执行当前区域代码都不会发生冲突,不会发生冲突就自然没有线程阻塞,也不会产生死锁问题。如果出现了冲突,CAS(compare and swap) 比较和替换就会不断的去重试,直到当前操作没有冲突。

CAS 的 ABA 问题

  1. 线程1从内存M位置取出A
  2. 线程2从内存M位置取出A
  3. 线程2做了预期值比较,将A替换为B并放到M位置
  4. 线程2从内存M位置取出B
  5. 线程2做了预期值比较,将B替换为A并放到M位置
  6. 线程1做了预期值比较,将A替换为C并放到M位置

此时线程1影响了线程2的状态,也就发生了ABA的问题。所以为了解决乐观锁并发时造成的ABA问题,都会使用AtomicStampedReference 类或者AtomicMarkableReference 类

volatile

volatile从主内存中加载到线程工作内存中的值是最新的。也就是说它解决的是多线程并发变量读时的可见性问题,但无法保证访问变量的原子性。而且volatile只能修饰变量。

原子基本类型

  • AtomicBoolean 保证布尔值的原子性
  • AtomicInteger 保证整型的原子性
  • AtomicLong 保证长整型的原子性

原子数组

  • AtomicIntegerArray 保证整型数组的原子性
  • AtomicLongArray 保证长整型数组原子性

原子字段

  • AtomicIntegerFieldUpdater 保证整型的字段更新
  • AtomicLongFieldUpdater 保证长整型的字段更新

使用原子字段类时,所声明的字段类型必须为volatile

使用方法如下:

    private int sum = 100;
    private volatile int sum1 = 100;
    // 当 sum 或 sum1 为100 时只允许有一个线程进入
    private void atomic4() {
        AtomicIntegerFieldUpdater<T> tAtomicIntegerFieldUpdater =  AtomicIntegerFieldUpdater.newUpdater(T.class, "sum1");
        T t = new T();
        for (int i = 0; i < 10; i++) {
            singleThreadPool.execute(() -> {
                if (sum == 100) {
                    System.out.println(Thread.currentThread().getName() + " :" + "已修改");
                }
            });
        }

        System.out.println("=====");

        for (int i = 0; i < 10; i++) {
            singleThreadPool.execute(() -> {
                if(tAtomicIntegerFieldUpdater.compareAndSet(t, 100, 120)){
                    System.out.print(Thread.currentThread().getName() + " :" + "tAtomicIntegerFieldUpdater 已修改");
                }
            });
        }
        singleThreadPool.shutdown();
    }

原子引用类型

  • AtomicReferenceArray 保证引用数组的原子性
  • AtomicReferenceFieldUpdater 保证引用类型的字段更新
  • AtomicStampedReference 可以解决CASABA问题,类似提供版本号
  • AtomicMarkableReference 可以解决CASABA问题,提供是或否进行判断

原子累加器 JDK 1.8 新增

原有的 Atomic系列类通过CAS来保证并发时操作的原子性,但是高并发也就意味着CAS的失败次数会增多,失败次数的增多会引起更多线程的重试,最后导致AtomicLong的效率降低。新的四个类通过减少并发,将单一value的更新压力分担到多个value中去,降低单个value的“热度”以提高高并发情况下的吞吐量

  • DoubleAccumulator
  • DoubleAdder
  • LongAccumulator
  • LongAdder

实例应用

  • 只贴了代码片段。验证累加器、整型、数组的原子性
package concurrent;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author 张博
 */
public class ThreadFactoryBuilder implements ThreadFactory {

    private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    ThreadFactoryBuilder() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
        if (t.isDaemon()) {
            t.setDaemon(false);
        }
        if (t.getPriority() != Thread.NORM_PRIORITY) {
            t.setPriority(Thread.NORM_PRIORITY);
        }
        return t;
    }
}

private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder();
    private static ExecutorService singleThreadPool = new ThreadPoolExecutor(1000, 5000,
            10, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

    // 阻塞主线程,循环5000次后通知主线程关闭
    private CountDownLatch begin = new CountDownLatch(5000);
    // 模拟并发量一次200
    private Semaphore semaphore = new Semaphore(200);
    private static int count = 0;
    private void atomic7() {
        DoubleBinaryOperator doubleBinaryOperator = (x, y) -> x + y;
        DoubleAccumulator doubleAccumulator = new DoubleAccumulator(doubleBinaryOperator, count);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        int[] ints = new int[5000];
        AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5000);
        for (int i = 0; i < 5000; i++) {
            singleThreadPool.execute(() -> {
                try {
                    // 允许200个线程进入,模拟提供稳定并发量
                    semaphore.acquire();
                    count();
                    atomicCount(doubleAccumulator);
                    atomicIntegerCount(atomicInteger);
                    array(ints);
                    atomicArray(atomicIntegerArray);
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 每执行一次减1
                begin.countDown();
            });
        }
        try {
            // 没到0一直等待,直到模拟并发结束
            begin.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        singleThreadPool.shutdown();
        System.out.println(count);
        System.out.println(doubleAccumulator.get());
        System.out.println(atomicInteger.get());
        System.out.println(Arrays.toString(ints));
        System.out.println("=========================================================================================================");
        System.out.println(atomicIntegerArray.toString());
        for (int i = 0; i < atomicIntegerArray.length(); i++) {
            if ((atomicIntegerArray.get(i) - 5000) != 0) {
                System.out.println(atomicIntegerArray.get(i));
            }
        }
    }

    /**
     * 时间:2018/8/3 上午11:48
     * @apiNote 线程不安全的累加
     */
    private void count() {
        count++;
    }

    /**
     * 时间:2018/8/3 上午11:48
     * @apiNote 原子累加
     */
    private void atomicCount(DoubleAccumulator doubleAccumulator) {
        doubleAccumulator.accumulate(1);
    }

    /**
     * 时间:2018/8/3 上午11:48
     * @apiNote 原子的i++
     */
    private void atomicIntegerCount(AtomicInteger atomicInteger) {
        atomicInteger.incrementAndGet();
    }

    /**
     * 时间:2018/8/3 上午11:48
     * @apiNote 线程不安全的数组操作
     */
    private void array(int[] ints) {
        for(int k = 0; k < 5000; k++) {
            ints[k] += 1;
        }
    }

    /**
     * 时间:2018/8/3 上午11:48
     * @apiNote 原子的数组操作
     */
    private void atomicArray(AtomicIntegerArray atomicIntegerArray) {
        for(int k = 0; k < 5000; k++) {
            atomicIntegerArray.addAndGet(k, 1);
        }
    }

使用AtomicStampedReference解决CASABA问题

    /**
     * 时间:2018/8/3 上午11:58
     * @apiNote 模拟并发导致 CAS 的 ABA 问题
     */
    private void aba() {
        // 原子引用类型
        AtomicReference<String> stringAtomicReference = new AtomicReference<>("A");
        // 原子时间戳引用
        AtomicStampedReference<String> stampedReference = new AtomicStampedReference<>("A", 0);

        // 线程1
        singleThreadPool.execute(() -> {
            System.out.println(Thread.currentThread().getName() + " : -> stringAtomicReference " + stringAtomicReference.compareAndSet("A", "B"));
            System.out.println(Thread.currentThread().getName() + " : -> stringAtomicReference " + stringAtomicReference.compareAndSet("B", "A"));
            System.out.println("=====");
        });

        // 线程2
        singleThreadPool.execute(() -> {
            System.out.println(Thread.currentThread().getName() + " : -> stringAtomicReference " + stringAtomicReference.compareAndSet("A", "C"));
            System.out.println("=====");
        });

        // 线程3
        singleThreadPool.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " :" + stampedReference.compareAndSet("A", "B", stampedReference.getStamp(), stampedReference.getStamp() + 1));
            System.out.println(Thread.currentThread().getName() + " :" + stampedReference.compareAndSet("B", "A", stampedReference.getStamp(), stampedReference.getStamp() + 1));
            System.out.println("=====");
        });

        // 线程4
        singleThreadPool.execute(() -> {
            // 模拟并发时与线程3同时得到内存中的A的时间戳
            int stamp = stampedReference.getStamp();
            System.out.println(Thread.currentThread().getName() + " : 线程4 处理 cas 之前" + stamp);
            // 线程4休眠2秒,模拟让线程3已经操作完成 A -> B -> A 的 CAS
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            // 得到线程3操作完的时间戳
            System.out.println(Thread.currentThread().getName() + " :" + stampedReference.getStamp());
            // 线程4进行 A -> C 的 CAS 操作。这时会失败。解决 ABA 问题
            System.out.println(Thread.currentThread().getName() + " :" + stampedReference.compareAndSet("A", "C", stamp, stamp + 1));
        });

        singleThreadPool.shutdown();
    }

相关文章

网友评论

    本文标题:JDK 1.8 atomic 包

    本文链接:https://www.haomeiwen.com/subject/hubhvftx.html