美文网首页
4、Java并发编程入门与高并发面试-线程安全性

4、Java并发编程入门与高并发面试-线程安全性

作者: 安安汐而 | 来源:发表于2020-05-23 10:52 被阅读0次

    慕课网 Jimin老师 Java并发编程入门与高并发面试 学习笔记
    Java并发编程入门与高并发面试

    线程安全性定义:
    当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的

    原子性:提供了互斥访问,同一时刻只能有一一个线程来对它进行操作
    可见性: 一个线程对主内存的修改可以及时的被其他线程观察到
    有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果- -般杂乱无序

    原子性-Atomic包
    ◆AtomicXXX : CAS. Unsafe.compareAndSwapInt
    package com.huhao.concurrency.example.count;
    
    import com.huhao.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 代码模拟并发测试
     * Atomic包演示
     */
    @Slf4j
    @ ThreadSafe
    public class CountExample2 {
    
        //请求总数
        public static int clientTotal = 5000;
    
        //同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static AtomicInteger count = new AtomicInteger(0);
    
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService exec = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int index = 0; index < clientTotal; index++) {
                exec.execute(() -> {
                    try {
                        //线程请求,如果线程数已经到了,可能会阻塞,等有线程释放了再执行
                        semaphore.acquire();
                        add();
                        //add执行完后,释放当前线程
                        semaphore.release();
                    } catch (InterruptedException e) {
                        log.error("exception", e);
                        e.printStackTrace();
                    }
    
                    countDownLatch.countDown();
                });
            }
            //保证线程减到0
            countDownLatch.await();
            //关闭线程池
            exec.shutdown();
            log.error("count:{}", count);//count:4952
        }
    
        private static void add() {
            count.incrementAndGet();
        }
    }
    

    原因详解:

    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
    
    /**
     * eg:2+1=3
     * @param var1 :当前count对象
     * @param var2:当前的值  2
     * @param var4 :加的值   1
     * 
     * var5是底层方法获取到的值。如果没有别的线程处理更改的时候,正常应该var5=2,
     * compareAndSwapInt 来判断,直到var2与var5相同时,才会var5+var4=2+1,返回3
     * @return the updated value
     */
    public final int getAndAddInt(Object var1, long var2, int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1, var2);
        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    
        return var5;
    }
    
    AtomicLong、LongAdder

    由于getAndAddInt方法,一直循环比较,在线程数少的时候,比较的成功率比较高,在多的时候,失败的次数会增多,导致一直循环,效率较低
    并发量大的时候,使用LongAdder效率会高一点。如果小的话,也可以用AtomicInteger

    AtomicReference 、AtomicReferenceFieldUpdater
    package com.huhao.concurrency.example.atomic;
    
    import com.huhao.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicReference;
    import java.util.concurrent.atomic.LongAdder;
    
    /**
     * 代码模拟并发测试
     * AtomicReference包演示
     */
    @Slf4j
    @ThreadSafe
    public class AtomicExampleAtomicReference {
    
        private static AtomicReference<Integer> count = new AtomicReference<>(0);
    
        public static void main(String[] args) {
            count.compareAndSet(0, 2);//count=2
            count.compareAndSet(0, 1);//count=2,不是0,则不执行
            count.compareAndSet(1, 3);//count=2,不是0,则不执行
            count.compareAndSet(2, 4);//count=4
            count.compareAndSet(3, 5);//count=4,不是0,则不执行
            log.info("count:{}", count.get());//count:4
        }
    }
    
    package com.huhao.concurrency.example.atomic;
    
    import com.huhao.concurrency.annoations.ThreadSafe;
    import lombok.Getter;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
    import java.util.concurrent.atomic.AtomicReference;
    
    /**
     * 代码模拟并发测试
     * AtomicReferenceFieldUpdater包演示
     */
    @Slf4j
    @ThreadSafe
    public class AtomicExampleAtomicExampleAtomicReference {
    
        //AtomicExampleAtomicExampleAtomicReference的count字段,必须用volatile修饰
        private static AtomicIntegerFieldUpdater<AtomicExampleAtomicExampleAtomicReference>
                updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExampleAtomicExampleAtomicReference.class, "count");
    
        @Getter
        public volatile int count = 100;
    
        public static void main(String[] args) {
    
            AtomicExampleAtomicExampleAtomicReference example5 = new AtomicExampleAtomicExampleAtomicReference();
    
            if (updater.compareAndSet(example5, 100, 120)) {
                log.info("update success 1,{}", example5.getCount());   // update success 1,120
                if (updater.compareAndSet(example5, 100, 120)) {
                    log.info("update success 2,{}", example5.getCount());
                } else {
                    log.info("update failed,{}", example5.getCount());  // update failed,120
                }
            }
        }
    }
    
    AtomicStampReference : CAS的ABA问题

    aba问题:cas操作时候,其他线程将变量值A,从A改到B,又改回A。本线程在比较的时候,发现A变量没有变,于是就讲A值进行了交换操作,其实该值以及被其他线程更新过,与最初设计初衷不符。
    解决:在变量更新时候,将变量版本号加1

    AtomicBoolean

    只执行一次

    
    import com.huhao.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    /**
     * 代码模拟并发测试
     * AtomicBoolean包演示
     */
    @Slf4j
    @ThreadSafe
    public class AtomicExampleAtomicBoolean {
    
        //请求总数
        public static int clientTotal = 5000;
    
        //同时并发执行的线程数
        public static int threadTotal = 200;
    
        public static AtomicBoolean isHappened = new AtomicBoolean();
    
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService exec = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            for (int index = 0; index < clientTotal; index++) {
                exec.execute(() -> {
                    try {
                        //线程请求,如果线程数已经到了,可能会阻塞,等有线程释放了再执行
                        semaphore.acquire();
                        test();
                        //add执行完后,释放当前线程
                        semaphore.release();
                    } catch (InterruptedException e) {
                        log.error("exception", e);
                        e.printStackTrace();
                    }
    
                    countDownLatch.countDown();
                });
            }
            //保证线程减到0
            countDownLatch.await();
            //关闭线程池
            exec.shutdown();
            log.error("count:{}", isHappened.get());
        }
    
        private static void test() {
            if (isHappened.compareAndSet(false, true)) {
                //5000次线程,只会执行一次
                log.info("exec");
            }
        }
    }
    

    原子性-synchronized

    ◆修饰代码块:大括号括起来的代码,作用于调用的对象
    ◆修饰方法:整个方法,作用于调用的对象
    ◆修饰静态方法:整个静态方法,作用于所有对象
    ◆修饰类:括号括起来的部分,作用于所有对象

    package com.huhao.concurrency.example.sync;
    
    import com.huhao.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    /**
     *  Synchronized测试修饰代码块和方法
     */
    @Slf4j
    @ThreadSafe
    public class SynchronizedExample {
    
        /**
         * 一般的循环,多线程测下看看对不对
         */
        private void notSyncBlock() {
            for (int i = 0; i < 10; i++) {
                log.info("notSyncBlock - {}", i);
            }
        }
    
        /**
         * 修饰代码块
         * 大括号括起来的代码,作用于调用的对象
         */
        private void synchronizedBlock() {
            synchronized (this) {
                for (int i = 0; i < 4; i++) {
                    log.info("test1 - {}", i);
                }
            }
        }
    
        /**
         * 修饰代码块
         * 大括号括起来的代码,作用于调用的对象
         */
        private void synchronizedBlock(int j) {
            synchronized (this) {
                for (int i = 0; i < 4; i++) {
                    log.info("synchronizedBlock {} - {}", j, i);
                }
            }
        }
    
        /**
         * 修饰方法
         * 整个方法,作用于调用的对象
         */
        private synchronized void synchronizedMethod() {
            for (int i = 0; i < 10; i++) {
                log.info("test2 - {}", i);
            }
        }
    
        //请求总数
        public static int clientTotal = 4;
    
        //同时并发执行的线程数
        public static int threadTotal = 3;
    
        public static void main(String[] args) throws InterruptedException {
            SynchronizedExample synchronizedExample = new SynchronizedExample();
            SynchronizedExample synchronizedExample2 = new SynchronizedExample();
    
            ExecutorService exec = Executors.newCachedThreadPool();
            final Semaphore semaphore = new Semaphore(threadTotal);
            final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
            /**
             * 线程同步执行测试是不是同步一个个的执行
             */
            for (int index = 0; index < clientTotal; index++) {
                exec.execute(() -> {
                    try {
                        //线程请求,如果线程数已经到了,可能会阻塞,等有线程释放了再执行
                        semaphore.acquire();
                        /**
                         *  没有加sync的,
                         *  notSyncBlock - 0
                         *  notSyncBlock - 0
                         *  notSyncBlock - 0
                         *  notSyncBlock - 1
                         *  notSyncBlock - 1
                         *  notSyncBlock - 1
                         *  notSyncBlock - 2
                         *  notSyncBlock - 2
                         *  notSyncBlock - 2
                         *  notSyncBlock - 3
                         */
    //                    synchronizedExample.notSyncBlock();
    
    
                        /**
                         * 加了sync的,同步执行
                         * test1 - 0
                         * test1 - 1
                         * test1 - 2
                         * test1 - 3
                         * test1 - 0
                         * test1 - 1
                         * test1 - 2
                         * test1 - 3
                         * test1 - 0
                         * test1 - 1
                         * test1 - 2
                         * test1 - 3
                         * test1 - 0
                         */
    //                    synchronizedExample.synchronizedBlock();
    
                        /**
                         * example和example2会同时调用,但是本身是顺序执行的
                         * 同一个对象,是同步执行
                         * 不同对象间,不干扰
                         */
    //                    synchronizedExample.synchronizedBlock(0);
    //                    synchronizedExample2.synchronizedBlock(1);
    //                    synchronizedExample2.synchronizedBlock(2);
    
                        //add执行完后,释放当前线程
                        semaphore.release();
                    } catch (InterruptedException e) {
                        log.error("exception", e);
                        e.printStackTrace();
                    }
    
                    countDownLatch.countDown();
                });
            }
            //保证线程减到0
            countDownLatch.await();
            //关闭线程池
            exec.shutdown();
    //        synchronizedExample.synchronizedMethod();
        }
    }
    
    package com.huhao.concurrency.example.sync;
    
    import com.huhao.concurrency.annoations.ThreadSafe;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * Synchronized 测试修饰静态方法和类
     */
    @Slf4j
    @ThreadSafe
    public class SynchronizedExample2 {
    
        /**
         * 一般的循环,多线程测下看看对不对
         */
        public void notSyncBlock() {
            for (int i = 0; i < 10; i++) {
                log.info("notSyncBlock - {}", i);
            }
        }
    
        /**
         * 修饰类
         */
        public static void synchronizedBlock() {
            synchronized (SynchronizedExample2.class) {
                for (int i = 0; i < 4; i++) {
                    log.info("test1 - {}", i);
                }
            }
        }
    
        /**
         * 修饰静态方法
         */
        public static synchronized void synchronizedMethod() {
            for (int i = 0; i < 10; i++) {
                log.info("test2 - {}", i);
            }
        }
    
        //请求总数
        public static int clientTotal = 4;
    
        //同时并发执行的线程数
        public static int threadTotal = 3;
    
        public static void main(String[] args) throws InterruptedException {
        }
    }
    

    对比:

    ◆synchronized :不可中断锁,适合竞争不激烈,可读性好
    ◆Lock :可中断锁,多样化同步,竞争激烈时能维持常态
    ◆Atomic :竞争激烈时能维持常态,比Lock性能好;只能同步一个值

    可见性

    导致共享变量在线程间不可见的原因
    ◆线程交叉执行
    ◆重排序结合线程交叉执行
    ◆共享变量更新后的值没有在工作内存与主存间及时更新

    可见性- synchronized

    JMM关于synchronized的两条规定:
    ◆线程解锁前 ,必须把共享变量的最新值刷新到主内存
    ◆线程加锁时 ,将清空工作内存中共享变量的值,从而使用共享变量时需要从主内存中重新读取最新的值(注意,
    加锁与解锁是同一把锁)

    可见性- volatile

    通过加入内存屏障和禁止重排序优化来实现
    ◆对volatile变量写操作时 ,会在写操作后加入-条store屏障指令,将本地内存中的共享变量值刷新到主内存
    ◆对volatile变量读操作时 ,会在读操作前加入-条load屏障指令,从主内存中读取共享变量

    相关文章

      网友评论

          本文标题:4、Java并发编程入门与高并发面试-线程安全性

          本文链接:https://www.haomeiwen.com/subject/filoohtx.html