美文网首页
线程安全性

线程安全性

作者: 磊_5d71 | 来源:发表于2018-11-02 08:47 被阅读0次
    • 当多个线程访问某个类时,不管运行环境时采用何种调度方式或者这些进程进行如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么这个类就是线程安全的。
    • 原子性:提供了互斥访问,同一时刻只能有一个线程对它进行操作
    • 可见性:一个线程对主内存的修改可以及时的被其他线程观察到
    • 有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序。

    原子性

    图片.png

    AtomicInteger

    AtomicInteger类中的incrementAndGet方法,调用CompareAndSetInt 简称CAS
    有native修饰说明是底层的类,不是用java实现的

    • ,AtomicLong的原理是依靠底层的cas来保障原子性的更新数据,在要添加或者减少的时候,会使用死循环不断地cas到特定的值,从而达到更新数据的目的。工作内存中的值与主存中进行比较。
    • 实现原理底层采用while
      循环方式,调用较多时可能会受到影响
    package com.alan.concurrency.example.atomic;
    
    
    import com.alan.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicInteger;
    
    @Slf4j
    @ThreadSafe
    public class AtomicExample1 {
    
    
        //请求数1000
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
    
        //count里面的值相当于工作内存,有可能和底层也就是主内存的值不一致
        public static AtomicInteger count = new AtomicInteger(0);
    
    
        private static void add(){
            //用到unsafe类
            count.incrementAndGet();
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            //定义线程池ExecutorService接口
            ExecutorService executorService = Executors.newCachedThreadPool();
            //定义信号量,传入并发线程数 final修饰不允许重新赋值
            final Semaphore semaphore = new Semaphore(threadTotal);
            //定义计数器闭锁。传入请求总数
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for (int i = 0; i < clientTotal; i++) {
                //通过匿名内部类方式
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //semaphore控制并发数量
                            semaphore.acquire();
                            add();
                            semaphore.release();
                        } catch (InterruptedException e) {
                            log.error("exception",e);
                        }
                        //每次执行计数器减掉一个
                        countDownLatch.countDown();
                    }
    
                });
    
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}",count.get());
        }
    }
    

    LongAdder

    • LongAdder在AtomicLong的基础上将单点的更新压力分散到各个节点,在低并发的时候通过对base的直接更新可以很好的保障和AtomicLong的性能基本保持一致,而在高并发的时候通过分散提高了性能。
    • 缺点是LongAdder在统计的时候如果有并发更新,可能导致统计的数据有误差。
    package com.alan.concurrency.example.atomic;
    
    
    import com.alan.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.concurrent.atomic.LongAdder;
    
    @Slf4j
    @ThreadSafe
    public class AtomicExample3 {
    
    
        //请求数1000
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
    
        //count里面的值相当于工作内存,有可能和底层也就是主内存的值不一致
        public static LongAdder count = new LongAdder();
    
    
        private static void add(){
            //用到unsafe类
            count.increment();
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            //定义线程池ExecutorService接口
            ExecutorService executorService = Executors.newCachedThreadPool();
            //定义信号量,传入并发线程数 final修饰不允许重新赋值
            final Semaphore semaphore = new Semaphore(threadTotal);
            //定义计数器闭锁。传入请求总数
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for (int i = 0; i < clientTotal; i++) {
                //通过匿名内部类方式
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //semaphore控制并发数量
                            semaphore.acquire();
                            add();
                            semaphore.release();
                        } catch (InterruptedException e) {
                            log.error("exception",e);
                        }
                        //每次执行计数器减掉一个
                        countDownLatch.countDown();
                    }
    
                });
    
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("count:{}",count);
        }
    }
    

    AtomicReference

    package com.alan.concurrency.example.atomic;
    
    
    import com.alan.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.concurrent.atomic.LongAdder;
    
    @Slf4j
    @ThreadSafe
    public class AtomicExample4 {
    
        private static AtomicReference<Integer> count = new AtomicReference<>(0);
    
        public static void main(String[] args) {
            count.compareAndSet(0,2);
            count.compareAndSet(0,1);
            count.compareAndSet(1,3);
            count.compareAndSet(2,4);
            count.compareAndSet(3,5);
            log.info("count:{}",count.get());
        }
    
    }
    

    AtomicIntegerFieldUpdater

    package com.alan.concurrency.example.atomic;
    
    
    import com.alan.concurrency.annoations.ThreadSafe;
    import lombok.Getter;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
    import java.util.concurrent.atomic.AtomicReference;
    
    @Slf4j
    @ThreadSafe
    public class AtomicExample5 {
    
    
        //这里必须要使用volatile修饰符,并且非static才可以。通过lombok添加注解
        @Getter
        public volatile int count = 100;
    
        private static AtomicIntegerFieldUpdater<AtomicExample5> update = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class,"count");
    
        public static void main(String[] args) {
    
            AtomicExample5 example5 = new AtomicExample5();
    
            if(update.compareAndSet(example5,100,120)){
                log.info("update success,{}",example5.getCount());
            }
    
            if(update.compareAndSet(example5,100,120)){
                log.info("update success,{}",example5.getCount());
            }else {
                log.info("update failed,{}",example5.getCount());
            }
        }
    }
    

    AtomicStampedReference

    -ABA问题 线程1准备用CAS将变量的值由A替换为B,在此之前,线程2将变量的值由A替换为C,又由C替换为A,然后线程1执行CAS时发现变量的值仍然为A,所以CAS成功。但实际上这时的现场已经和最初不同了,尽管CAS成功,但可能存在潜藏的问题, AtomicStampedReference可以解决此问题

    AtomicBoolean

    • 让某一段代码只执行一次,绝对不会重复
    package com.alan.concurrency.example.atomic;
    
    
    import com.alan.concurrency.annoations.ThreadSafe;
    import lombok.Getter;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
    
    @Slf4j
    @ThreadSafe
    public class AtomicExample6 {
    
        //请求数5000
        public static int clientTotal = 5000;
        //同时并发执行的线程数
        public static int threadTotal = 200;
    
        private static AtomicBoolean isHappened = new AtomicBoolean(false);
    
        public static void main(String[] args) throws InterruptedException {     //定义线程池ExecutorService接口
            ExecutorService executorService = Executors.newCachedThreadPool();
            //定义信号量,传入并发线程数 final修饰不允许重新赋值
            final Semaphore semaphore = new Semaphore(threadTotal);
            //定义计数器闭锁。传入请求总数
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
    
            for (int i = 0; i < clientTotal; i++) {
                //通过匿名内部类方式
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //semaphore控制并发数量
                            semaphore.acquire();
                            test();
                            semaphore.release();
                        } catch (InterruptedException e) {
                            log.error("exception",e);
                        }
                        //每次执行计数器减掉一个
                        countDownLatch.countDown();
                    }
    
                });
    
            }
            countDownLatch.await();
            executorService.shutdown();
            log.info("isHappened:{}",isHappened.get());
        }
    
        //让某一段代码只执行一次,绝对不会重复
        private static void test(){
            if (isHappened.compareAndSet(false, true)) {
                log.info("executed");
            }
            }
    }
    
    
    图片.png

    synchronized

    • 修饰代码块:大括号括起来的代码,作用于调用的对象
    • 修饰方法:整个方法,作用于调用的对象
    • 修饰静态方法:整个静态方法,作用于所有对象
    • 修饰类:括号括起来的部分,作用于所有对象
      子类继承父类,父类的方法中含有synchronized关键字,子类继承此方法是不包含synchronized关键字的
    package com.alan.concurrency.example.sync;
    
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Slf4j
    public class SynchronizedExample1 {
    
    
        //修饰一个代码块。作用范围是大括号括起的代码
        public void test1(int   j){
            synchronized (this){
                for (int i = 0; i < 10; i++) {
                    log.info("test1 {}- {}",j,i);
                }
            }
        }
    
        //修饰一个方法。作用范围是整个方法
        public synchronized void test2(int j){
            for (int i = 0; i < 10; i++) {
                log.info("test2 {} - {}",j,i);
            }
    
        }
    
        public static void main(String[] args) {
    
            SynchronizedExample1 example1 = new SynchronizedExample1();
            SynchronizedExample1 example2 = new SynchronizedExample1();
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.execute(()->{
                example1.test2(1);
            });
            executorService.execute(()->{
                example2.test2( 2);
            });
        }
    }
    
    • 修饰静态方法和修饰类
    package com.alan.concurrency.example.sync;
    
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @Slf4j
    public class SynchronizedExample2 {
    
    
        //修饰一个类。作用范围是所有对象
        public void test1(int  j){
            synchronized (SynchronizedExample2.class){
                for (int i = 0; i < 10; i++) {
                    log.info("test1 {}- {}",j,i);
                }
            }
        }
    
        //修饰一个静态方法。作用范围是所有对象
        public static synchronized void test2(int j){
            for (int i = 0; i < 10; i++) {
                log.info("test2 {} - {}",j,i);
            }
    
        }
    
        public static void main(String[] args) {
    
            SynchronizedExample2 example1 = new SynchronizedExample2();
            SynchronizedExample2 example2 = new SynchronizedExample2();
            ExecutorService executorService = Executors.newCachedThreadPool();
            executorService.execute(()->{
                example1.test1(1);
            });
            executorService.execute(()->{
                example2.test1( 2);
            });
        }
    
    }
    

    对比

    图片.png

    可见性

    • 导致共享变量在线程中不可见的原因
      1、线程交叉执行
      2、重排序结合线程交叉执行
      3、共享变量更新后的值没有在工作内存与主存间及时更新

    可见性-synchronized方式

    图片.png

    可见性-volatile方式 (volatile不具备原子性)

    通过加入内存屏障和禁止重排序优化来实现

    • 对volatile变量写操作时,会在写操作后加入一条store屏障指令,将本地内存中的共享变量值刷新到主内存

    • 对volatile变量读操作时,会在读操作前加入一条load屏障指令,从主内存中读取共享变量。

    • volatile写


      图片.png
    • volatile读


      图片.png

      volatile 一般用于保证变量状态


      图片.png

    volatile 也可以用于double check

    有序性

    • Java内存模型中,允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,确会影响到多线程并发执行的正确性。
    • volatile、synchronized、lock

    有序性原则

    图片.png
    图片.png
    图片.png
    图片.png
    • 总结


      图片.png

    相关文章

      网友评论

          本文标题:线程安全性

          本文链接:https://www.haomeiwen.com/subject/sojfxqtx.html