美文网首页并发编程Java并发学习笔记程序员
慕课网高并发实战(四)- 线程安全性

慕课网高并发实战(四)- 线程安全性

作者: 9c0ddf06559c | 来源:发表于2018-03-18 19:42 被阅读910次

    定义

    当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要额外的同步或协同,这个类都能表现出正确行为,那么就称这个类是线程安全的

    线程安全性的三个方面
    • 原子性-Atomic包

    image.png
    1.AtomicXXX:CAS 、Unsafe.compareAndSwapInt

    看一下AtomicInteger.getAndIncrement的源码

    /**
         * Atomically increments by one the current value.
         *
         * @return the previous value
         */
        public final int getAndIncrement() {
             // 主要是调用了unsafe的方法 
             //     private static final Unsafe unsafe = Unsafe.getUnsafe();
            return unsafe.getAndAddInt(this, valueOffset, 1);
        }
    
    /**
    *  获取底层当前的值并且+1
    * @param var1 需要操作的AtomicInteger 对象
    * @param var2 当前的值 
    * @param var4 要增加的值
    */
    public final int getAndAddInt(Object var1, long var2, int var4) {
            int var5;
            do {
                // 获取底层的该对象当前的值
                var5 = this.getIntVolatile(var1, var2);
                // 获取完底层的值和自增操作之间,可能系统的值已经又被其他线程改变了
                //如果又被改变了,则重新计算系统底层的值,并重新执行本地方法
            } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); 
    
            return var5;
        }
    
    /**
    * 本地的CAS方法核心
    * @param var1 需要操作的AtomicInteger 对象
    * @param var2 当前本地变量中的的值 
    * @param var4 当前系统从底层传来的值
    * @param var5 要更新后的值
    * @Return 如果当前本地变量的值(var2)与底层的值(var4)不等,则返回false,否则更新为var5的值并返回True
    */
        public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
    
    
    2.AtomicLong、LongAdder

    我们看到AtomicInteger在执行CAS操作的时候,是用死循环的方式,如果竞争非常激烈,那么失败量就会很高,性能会受到影响

    再看一下1.8以后的LongAdder

    public void add(long x) {
            Cell[] as; long b, v; int m; Cell a;
            if ((as = cells) != null || !casBase(b = base, b + x)) {
                boolean uncontended = true;
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[getProbe() & m]) == null ||
                    !(uncontended = a.cas(v = a.value, v + x)))
                    longAccumulate(x, null, uncontended);
            }
        }
    
    补充知识点,jvm对long,double这些64位的变量拆成两个32位的操作

    LongAdder的设计思想:核心是将热点数据分离,将内部数据value分成一个数组,每个线程访问时,通过hash等算法映射到其中一个数字进行技术,而最终计数结果为这个数组的求和累加,
    其中热点数据value会被分离成多个热点单元的数据cell,每个cell独自维护内部的值,当前value的实际值由所有的cell累积合成,从而使热点进行了有效的分离,提高了并行度
    LongAdder 在低并发的时候通过直接操作base,可以很好的保证和Atomic的性能基本一致,在高并发的场景,通过热点分区来提高并行度

    缺点:在统计的时候如果有并发更新,可能会导致结果有些误差

    3.AtomicReference、AtomicReferenceFieldUpdater

    AtomicReference: 用法同AtomicInteger一样,但是可以放各种对象

    @Slf4j
    @ThreadSafe
    public class AtomicExample4 {
    
        public static AtomicReference<Integer> count = new AtomicReference<>(0);
    
        public static void main(String[] args) throws InterruptedException {
            // 2
            count.compareAndSet(0,2);  
            // no
            count.compareAndSet(0,1);
            // no
            count.compareAndSet(1,3);
            // 4
            count.compareAndSet(2,4);
            // no
            count.compareAndSet(3,5);
            log.info("count:{}",count.get());
        }
    
    }
    
    AtomicReferenceFieldUpdater
    @Slf4j
    @ThreadSafe
    public class AtomicExample5 {
    
        @Getter
        private volatile int count = 100;
    
        /**
         * AtomicIntegerFieldUpdater 核心是原子性的去更新某一个类的实例的指定的某一个字段
         * 构造函数第一个参数为类定义,第二个参数为指定字段的属性名,必须是volatile修饰并且非static的字段
         */
        private static AtomicIntegerFieldUpdater<AtomicExample5> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class,"count");
    
    
        public static void main(String[] args) throws InterruptedException {
            AtomicExample5 example5 = new AtomicExample5();
    
            // 第一次 count=100 -> count->120 返回True
            if(updater.compareAndSet(example5,100,120)){
                log.info("update success 1:{}",example5.getCount());
            }
    
            // count=120 -> 返回False
            if(updater.compareAndSet(example5,100,120)){
                log.info("update success 2:{}",example5.getCount());
            }else {
                log.info("update field:{}",example5.getCount());
    
            }
        }
    
    }
    
    
    5.AtomicStampReference:CAS的ABA问题

    ABA问题:在CAS操作的时候,其他线程将变量的值A改成了B由改成了A,本线程使用期望值A与当前变量进行比较的时候,发现A变量没有变,于是CAS就将A值进行了交换操作,这个时候实际上A值已经被其他线程改变过,这与设计思想是不符合的

    解决思路:每次变量更新的时候,把变量的版本号加一,这样只要变量被某一个线程修改过,该变量版本号就会发生递增操作,从而解决了ABA变化

     /**
         * Atomically sets the value of both the reference and stamp
         * to the given update values if the
         * current reference is {@code ==} to the expected reference
         * and the current stamp is equal to the expected stamp.
         *
         * @param expectedReference the expected value of the reference
         * @param newReference the new value for the reference
         * @param expectedStamp the expected value of the stamp(上面提到的版本号)
         * @param newStamp the new value for the stamp
         * @return {@code true} if successful
         */
        public boolean compareAndSet(V   expectedReference,
                                     V   newReference,
                                     int expectedStamp,
                                     int newStamp) {
            Pair<V> current = pair;
            return
                expectedReference == current.reference &&
                expectedStamp == current.stamp &&
                ((newReference == current.reference &&
                  newStamp == current.stamp) ||
                 casPair(current, Pair.of(newReference, newStamp)));
        }
    
    6.AtomicLongArray

    可以指定更新一个数组指定索引位置的值

    /**
         * Atomically sets the element at position {@code i} to the given value
         * and returns the old value.
         *
         * @param i the index
         * @param newValue the new value
         * @return the previous value
         */
        public final long getAndSet(int i, long newValue) {
            return unsafe.getAndSetLong(array, checkedByteOffset(i), newValue);
        }
    ...
    ...
    /**
         * Atomically sets the element at position {@code i} to the given
         * updated value if the current value {@code ==} the expected value.
         *
         * @param i the index
         * @param expect the expected value
         * @param update the new value
         * @return {@code true} if successful. False return indicates that
         * the actual value was not equal to the expected value.
         */
        public final boolean compareAndSet(int i, long expect, long update) {
            return compareAndSetRaw(checkedByteOffset(i), expect, update);
        }
    
    7.AtomicBoolean(平时用的比较多)
    compareAndSet方法也值得注意,可以达到同一时间只有一个线程执行这段代码
    /**
         * Atomically sets the value to the given updated value
         * if the current value {@code ==} the expected value.
         *
         * @param expect the expected value
         * @param update the new value
         * @return {@code true} if successful. False return indicates that
         * the actual value was not equal to the expected value.
         */
        public final boolean compareAndSet(boolean expect, boolean update) {
            int e = expect ? 1 : 0;
            int u = update ? 1 : 0;
            return unsafe.compareAndSwapInt(this, valueOffset, e, u);
        }
    
    • 原子性-锁

      • synchronized:依赖JVM (主要依赖JVM实现锁,因此在这个关键字作用对象的作用范围内,都是同一时刻只能有一个线程进行操作的)
      • Lock:依赖特殊的CPU指令,代码实现,ReentrantLock

    修饰的内容分类

    修饰内容
    /**
     * @author gaowenfeng
     * @date
     */
    @Slf4j
    public class SyncronizedExample1 {
        /**
         * 修饰一个代码块,作用范围为大括号括起来的
         */
        public void test1(){
            synchronized (this){
                for (int i = 0; i < 10; i++) {
                    log.info("test1-{}",i);
                }
            }
        }
    
        /**
         * 修改方法,作用范围是整个方法,作用对象为调用这个方法的对象
         * 若子类继承父类调用父类的synchronized方法,是带不上synchronized关键字的
         * 原因:synchronized 不属于方法声明的一部分
         * 如果子类也想使用同步需要在方法上声明
         */
        public synchronized void test2(){
            for (int i = 0; i < 10; i++) {
                log.info("test2-{}",i);
            }
        }
    
        public static void main(String[] args) {
            SyncronizedExample1 example1 = new SyncronizedExample1();
            SyncronizedExample1 example2 = new SyncronizedExample1();
    
            // 使用线程池模拟一个对象的两个进程同时调用一段sync代码的执行过程
            ExecutorService executorService = Executors.newCachedThreadPool();
    
            // 线程pool-1-thread-1,pool-1-thread-2 交叉输出
            executorService.execute(()-> example1.test1());
            executorService.execute(()-> example2.test1());
    
    
            // 线程pool-1-thread-1 先从0-9输出,然后pool-1-thread-2 从0到9顺序输出
            // executorService.execute(()-> example1.test1());
            // executorService.execute(()-> example1.test1());
    
        }
    }
    
    @Slf4j
    public class SyncronizedExample2 {
    
        /**
         * 修饰类,括号包起来的代码
         * 作用对象为这个类的所有对象
         */
        public static void test1(){
            synchronized (SyncronizedExample2.class){
                for (int i = 0; i < 10; i++) {
                    log.info("test1-{}",i);
                }
            }
        }
    
    
        /**
         * 修饰一个静态方法,作用对象为这个类的所有对象
         */
        public static synchronized void test2(){
            for (int i = 0; i < 10; i++) {
                log.info("test2-{}",i);
            }
        }
    
        public static void main(String[] args) {
            SyncronizedExample2 example1 = new SyncronizedExample2();
            SyncronizedExample2 example2 = new SyncronizedExample2();
    
            // 使用线程池模拟一个对象的两个进程同时调用一段sync代码的执行过程
            ExecutorService executorService = Executors.newCachedThreadPool();
    
            // 线程pool-1-thread-1 先从0-9输出,然后pool-1-thread-2 从0到9顺序输出
            executorService.execute(()-> example1.test1());
            executorService.execute(()-> example1.test1());
    
            // 线程pool-1-thread-1 先从0-9输出,然后pool-1-thread-2 从0到9顺序输出
    //        executorService.execute(()-> example1.test2());
    //        executorService.execute(()-> example2.test2());
        }
    }
    
    原子性对比
    • 可见性

    导致共享变量在线程中不可见的原因

    • 线程交叉执行
    • 重排序结合线程交叉执行
    • 共享变量更新后的值没有在工作内存与主内存间及时更新
    java提供了synchronized和volatile 两种方法来确保可见性

    JMM(java内存模型)关于synchronized的两条规定

    • 线程解锁前,必须把共享变量的最新值刷新到主内存

    • 线程加锁时,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值(注意,加锁和解锁是同一把锁)

    • 可见性-volatile:通过加入 内存屏障和禁止重排序优化来实现
      • 对volatile 变量写操作时,会在写操作后加入一条store屏障指令,将本地内存中的共享变量值刷新到主内存
      • 对volatile变量读操作时,会在读操作前加入一条load屏障指令,从主内存中读取共享变量
    volatile写示意图 volatile读示意图
    /**
     * 并发测试
     * @author gaowenfeng
     */
    @Slf4j
    @NotThreadSafe
    public class CountExample4 extends AbstractExample{
    
        /** 请求总数 */
        public static int clientTotal = 5000;
        /** 同时并发执行的线程数 */
        public static int threadTotal = 50;
    
        public volatile static int count = 0;
    
        public static void main(String[] args) throws InterruptedException {
            new CountExample4().test();
        }
    
    
        /**
         * 本质上应该是这个方法线程不安全
         *
         * volatile只能保证 1,2,3的顺序不会被重排序
         * 但是不保证1,2,3的原子执行,也就是说还是有可能有两个线程交叉执行1,导致结果不一致
         */
        @Override
        protected void add() {
            // 1.取内存中的count值
            // 2.count值加1
            // 3.重新写会主存
            count++;
        }
    
        @Override
        protected void countLog() {
            log.info("count:{}",count);
        }
    
    
    }
    
    
    • volatile使用条件

    1.对变量写操作不依赖于当前值
    2.该变量没有包含在具有其他变量的不必要的式子中

    综上,volatile特别适合用来做线程标记量,如下图


    volatile使用场景
    • 有序性

    有序性
    Happens-before原则,先天有序性,即不需要任何额外的代码控制即可保证有序性,java内存模型一个列出了八种Happens-before规则,如果两个操作的次序不能从这八种规则中推倒出来,则不能保证有序性
    1-2

    第一条规则要注意理解,这里只是程序的运行结果看起来像是顺序执行,虽然结果是一样的,jvm会对没有变量值依赖的操作进行重排序,这个规则只能保证单线程下执行的有序性,不能保证多线程下的有序性

    3-4 5-6 7-8
    • 总结

    总结

    相关文章

      网友评论

        本文标题:慕课网高并发实战(四)- 线程安全性

        本文链接:https://www.haomeiwen.com/subject/gapjqftx.html