美文网首页
JUC并发编程引导学习(超长篇)

JUC并发编程引导学习(超长篇)

作者: 读书小旺仔 | 来源:发表于2020-04-02 14:33 被阅读0次

    JUC并发编程学习

    1、什么是JUC

    juc是Java并发编程的一个工具包,包名为java.util.concurrent

    2、进程和线程

    • 进程是资源分配的基本单位,线程是CPU调度的最小单位
    • 一个进程由一个或多个线程组成
    • 进程的资源由系统进行分配,线程间共享进程的所有资源
    • 进程有自己的独立空间,线程没有自己的独立空间
    • 进程切换比线程切换开销大

    3、线程的六种状态

    通过Thread.state源码可知,线程共有6种状态:新建、运行、等待、延时等待、阻塞、结束

    public enum State {
            
              //新建
            NEW,
    
            //运行
            RUNNABLE,
    
            //阻塞
            BLOCKED,
    
            //等待
            WAITING,
    
            //延时等待
            TIMED_WAITING,
    
            //结束
            TERMINATED;
        }
    

    4、wait和sleep区别

    • wait对象是Object,sleep对象是Thread(this.wait(),Thread.sleep())
    //线程休眠推荐使用TimeUnit,可以直观地显示休眠时间单位
    TimeUnit.SECONDS.sleep(2);
    TimeUnit.DAYS.sleep(1);
    //TimeUnit本质调用的也还是Thread.sleep(),但Thread.sleep()参数以毫秒为单位,使用枚举类型TimeUnit,能清晰表述时间
    
    
    • sleep休眠期间不会释放锁,wait等待期间会释放锁
    • wait和notify结合使用,等待后需要notify唤醒,sleep单独使用,时间过了就可以
    • sleep需要捕获异常

    5、Lock

    1、锁的概念

    ​ 锁是拿来控制多个线程对共享资源的访问。访问共享资源时,只允许一个线程拥有锁,业务结束后再释放锁,下一个线程再获取锁...以此保证线程并发安全。

    2、传统的synchronized

    package com.zxw.jucproject.module.demo;
    
    /**
     * 使用传统的synchronized解决并发问题
     */
    public class Demo1 {
    
        public static void main(String[] args) {
    
            Ticket ticket = new Ticket();
    
            new Thread(()->{
                for (int i = 0; i < 20; i++){
                    ticket.sendTicket();
                }
            },"A").start();
    
            new Thread(()->{
                for (int i = 0; i < 20; i++){
                    ticket.sendTicket();
                }
            },"B").start();
    
            new Thread(()->{
                for (int i = 0; i < 20; i++){
                    ticket.sendTicket();
                }
            },"C").start();
    
    
        }
    
    }
    
    /**
     * 资源类
     */
    class Ticket{
    
        /**
         * 总票数量
         */
        private int ticketCount = 30;
        //synchronized 是一个关键字
        public synchronized void sendTicket(){
           if (ticketCount > 0){
                ticketCount--;
                System.out.println(Thread.currentThread().getName()+"顾客购买了票,票还剩余:"+ticketCount);
            }
        }
    }
    
    

    3、Lock锁ReentrantLock

    package com.zxw.jucproject.module.demo;
    
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * 使用Lock锁解决并发问题
     */
    public class Demo2 {
    
        public static void main(String[] args) {
    
            Ticket2 ticket = new Ticket2();
    
            new Thread(()->{
                for (int i = 0; i < 20; i++){
                    ticket.sendTicket();
                }
            },"A").start();
    
            new Thread(()->{
                for (int i = 0; i < 20; i++){
                    ticket.sendTicket();
                }
            },"B").start();
    
            new Thread(()->{
                for (int i = 0; i < 20; i++){
                    ticket.sendTicket();
                }
            },"C").start();
    
    
        }
    
    }
    
    /**
     * 资源类
     */
    class Ticket2{
    
        /**
         * 总票数量
         */
        private int ticketCount = 30;
        //Lock是一个对象
        //可重入锁
        //ReetrantLock()默认是一个非公平锁,new ReentrantLock(ture)可实现公平锁
        private Lock lock = new ReentrantLock();
    
        public  void sendTicket(){
            //加锁
            lock.lock();
            try {
                if (ticketCount > 0){
                    ticketCount--;
                    System.out.println(Thread.currentThread().getName()+"顾客购买了票,票还剩余:"+ticketCount);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //解锁
                lock.unlock();
            }
        }
    }
    
    

    4、Lock和synchronized的区别

    • Lock是一个对象,synchronize是关键字
    • synchronized是非公平锁,lock默认是非公平锁,但构造方法参数填true可实现公平锁
    • Lock需要手动释放锁,synchronized自动释放锁
    • Lock在线程阻塞时可以尝试解锁(lock.tryLock()),synchronized则必须等待
    • 在代码量比较复杂的情况下,通常使用Lock锁进行精准锁住关键业务代码。synchronized通常使用简单的业务代码

    5、生产者和消费者问题

    生产者和消费者问题是典型的多线程并发协作问题,通常是指生产者和消费者共同操作一个资源,并且该资源只有在生产者生产后,消费者才自动去消费。消费者消费完生产者的数据后,生产者才生产。

    传统的synchronized。

    有一个公共资源x=0,实现依次A线程+1,B线程-1 10次

    如果资源判断是使用if时,会出现虚假唤醒,导致该模式无法实现4个线程依次执行+1-1+1-1

    public class Demo3 {
    
        public static void main(String[] args){
    
            ResourceNumber resourceNumber = new ResourceNumber();
            new Thread(()->{
                for (int i = 0; i < 10; i++){
                    try {
                        resourceNumber.increment();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"A").start();
    
            new Thread(()->{
                for (int i = 0; i < 10; i++){
                    try {
                        resourceNumber.decrement();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"B").start();
        }
    
    }
    
    /**
     * 资源类
     * 线程之间通信主要三步走
     * 1、判断
     * 2、执行
     * 3、通知
     */
    class ResourceNumber{
    
        private int num = 0;
    
        public synchronized void increment() throws InterruptedException {
    
            //如果num不等于0,则线程等待(1、判断)
            if (num != 0){
                this.wait();
            }
            //2、执行
            num++;
            System.out.println(Thread.currentThread().getName()+":"+num);
            //唤醒所有等待的线程(3、通知)
            this.notifyAll();
        }
    
        public synchronized void decrement() throws InterruptedException {
    
            if (num != 1){
                this.wait();
            }
            num--;
            System.out.println(Thread.currentThread().getName()+":"+num);
            this.notifyAll();
        }
    
    }
    
    

    传统的synchronized。

    有一个公共资源x=0,实现依次四个线程依次+1-1+1-1

    但无法实现A线程+1之后B线程-1之后C线程+1之后D线程-1

    public class Demo3 {
    
        public static void main(String[] args){
    
            ResourceNumber resourceNumber = new ResourceNumber();
            new Thread(()->{
                for (int i = 0; i < 10; i++){
                    try {
                        resourceNumber.increment();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"A").start();
    
            new Thread(()->{
                for (int i = 0; i < 10; i++){
                    try {
                        resourceNumber.decrement();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"B").start();
            new Thread(()->{
                for (int i = 0; i < 10; i++){
                    try {
                        resourceNumber.increment();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"C").start();
    
            new Thread(()->{
                for (int i = 0; i < 10; i++){
                    try {
                        resourceNumber.decrement();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"D").start();
        }
    
    }
    
    /**
     * 资源类
     * 线程之间通信主要三步走
     * 1、判断
     * 2、执行
     * 3、通知
     */
    class ResourceNumber{
    
        private int num = 0;
    
        public synchronized void increment() throws InterruptedException {
    
            //如果num不等于0,则线程等待(1、判断)
            while (num != 0){
                this.wait();
            }
            //2、执行
            num++;
            System.out.println(Thread.currentThread().getName()+":"+num);
            //唤醒所有等待的线程(3、通知)
            this.notifyAll();
        }
    
        public synchronized void decrement() throws InterruptedException {
    
            while (num != 1){
                this.wait();
            }
            num--;
            System.out.println(Thread.currentThread().getName()+":"+num);
            this.notifyAll();
        }
    
    }
    
    

    Lock锁实现精准唤醒线程,实现依次A+1,B-1,C+1,D-1

    package com.zxw.jucproject.module.demo;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * 生产者和消费者问题-即多线程并发协作的问题。
     * 使用Lock ReenstrantLock精准唤醒线程
     */
    public class Demo4 {
    
        public static void main(String[] args){
    
            ResourceNumber2 resourceNumber = new ResourceNumber2();
            new Thread(()->{
                for (int i = 0; i < 10; i++){
                    try {
                        resourceNumber.incrementA();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"A").start();
    
            new Thread(()->{
                for (int i = 0; i < 10; i++){
                    try {
                        resourceNumber.decrementB();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"B").start();
            new Thread(()->{
                for (int i = 0; i < 10; i++){
                    try {
                        resourceNumber.incrementC();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"C").start();
    
            new Thread(()->{
                for (int i = 0; i < 10; i++){
                    try {
                        resourceNumber.decrementD();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            },"D").start();
        }
    
    }
    
    /**
     * 资源类
     * 线程之间通信主要三步走
     * 1、判断
     * 2、执行
     * 3、通知
     */
    class ResourceNumber2{
    
        private int num = 0;
    
        private Lock lock = new ReentrantLock();
        Condition A = lock.newCondition();
        Condition B = lock.newCondition();
        Condition C = lock.newCondition();
        Condition D = lock.newCondition();
    
    
        public void incrementA() throws InterruptedException {
    
            lock.lock();
            try {
                //如果num不等于0,则线程等待(1、判断)
                while (num != 0){
                    A.await();
                }
                //2、执行
                num++;
                System.out.println(Thread.currentThread().getName()+":"+num);
                //唤醒所有等待的线程(3、通知)
                B.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void decrementB() throws InterruptedException {
            lock.lock();
            try {
                while (num != 1){
                    B.await();
                }
                num--;
                System.out.println(Thread.currentThread().getName()+":"+num);
                C.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void incrementC() throws InterruptedException {
            lock.lock();
            try {
                //如果num不等于0,则线程等待(1、判断)
                while (num != 0){
                    C.await();
                }
                //2、执行
                num++;
                System.out.println(Thread.currentThread().getName()+":"+num);
                //唤醒所有等待的线程(3、通知)
                D.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        public void decrementD() throws InterruptedException {
            lock.lock();
            try {
                while (num != 1){
                    D.await();
                }
                num--;
                System.out.println(Thread.currentThread().getName()+":"+num);
                A.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
    }
    
    

    6、八锁问题

    解决八锁问题只需要记住以下关键信息

    • 被synchronized修饰的方法,方法调用者即为锁拥有者
    • 被static修复的方法,方法所属对象即为锁拥有者
    • 被static和synchronized修饰的方法,方法所属对象即为锁拥有者
    • 普通方法,不受锁的控制

    若多个线程的锁拥有者为同一个,则其中一个获得锁,还未解锁,其余线程则等待;若多个线程的锁拥有者不为同一个,则互不影响;若线程无锁,则也不会受到其他线程锁的影响

    7、并发不安全的集合类

    list、set、map并发情况下都不安全

    list需要用CopyOnWriteArrayList实现线程安全

    set需要用CopyOnWriteArraySet实现线程安全

    map需要用concurrentHashMap实现线程安全

    CopyOnWriteArrayList和CopyOnWriteArraySet底层除了加锁外,还在操作时会新建一个数组,在新建的数组上进行操作,然后替换之前的数组,保证数据安全。

    8、ReadWriteLock读写锁

    package com.zxw.jucproject.module.lock;
    
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentMap;
    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    /**
    * 读写锁WriteReadLock(Interface)
    * 独立锁(写锁):只允许一个线程执行,保持事务的原子性
    * 共享锁(读锁):可以允许多个线程执行
    */
    public class ReadWriteLockDemo {
    
     public static void main(String[] args) {
    
         //Catch aCatch = new Catch();
         CatchLock aCatch = new CatchLock();
    
         //写数据线程
         for (int i = 0; i < 10; i++){
             final int tmp = i;
             new Thread(()->{
                 aCatch.put(String.valueOf(tmp),String.valueOf(tmp));
             },"线程A"+i).start();
         }
         //读数据线程
         for (int i = 0; i < 10; i++){
             final int tmp = i;
             new Thread(()->{
                 aCatch.get(String.valueOf(tmp));
             },"线程B"+i).start();
         }
     }
    
    
    }
    
    /**
    * 该资源调用可以看出:
    * 写数据过程没有保证原子性,还没结束写入数据,其他线程就开始写入了。所以需要加锁
    */
    class Catch{
    
     private ConcurrentMap<String,String> map = new ConcurrentHashMap<>();
    
     //写数据
     public void put(String key ,String value){
         System.out.println(Thread.currentThread().getName()+":-> 开始写入数据(key="+key+",value="+value+")");
         map.put(key,value);
         System.out.println(Thread.currentThread().getName()+":-> 结束写入数据");
     }
     //读数据
     public void get(String key){
         System.out.println(Thread.currentThread().getName()+":-> 开始读取数据(key="+key+")");
         String value = map.get(key);
         System.out.println(Thread.currentThread().getName()+":-> 结束读取数据(value="+value+")");
     }
    
    }
    
    /**
    *
    * 读写锁控制写资源时保证原子性,读资源时,随意读取资源
    */
    class CatchLock{
    
     private ConcurrentMap<String,String> map = new ConcurrentHashMap<>();
     private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    
     //写数据
     public void put(String key ,String value){
    
         readWriteLock.writeLock().lock();
         try {
             System.out.println(Thread.currentThread().getName()+":-> 开始写入数据(key="+key+",value="+value+")");
             map.put(key,value);
             System.out.println(Thread.currentThread().getName()+":-> 结束写入数据");
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
             readWriteLock.writeLock().unlock();
         }
     }
     //读数据
     public void get(String key){
    
         readWriteLock.readLock().lock();
    
         try {
             System.out.println(Thread.currentThread().getName()+":-> 开始读取数据(key="+key+")");
             String value = map.get(key);
             System.out.println(Thread.currentThread().getName()+":-> 结束读取数据(value="+value+")");
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
             readWriteLock.readLock().unlock();
         }
     }
    
    }
    

    9、阻塞队列

    1、首先什么时候队列会阻塞?

    • 想把数据放入队列,但是队列满了,则会导致阻塞
    • 想从队列中取出数据,但是队列是空的,则会导致阻塞

    2、阻塞后会采取什么操作

    • 抛出异常
    • 不抛出异常,返回false或者自定义信息
    • 无限期阻塞进程,直到操作成功为止
    • 设定阻塞后等待时间,等时间一过还没操作成功则放弃

    3、具体实现方法

    插入方法 移除方法 阻塞后操作
    Add(e) Poll 抛出异常
    offer(e) Poll 不抛出异常,丢弃数据,返回false
    put(e) take(e) 一直等待
    offer(e,t,unit) Poll 等待t时间后丢弃数据
    package com.zxw.jucproject.module.queue;
    
    import java.util.ArrayDeque;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 阻塞队列
     */
    public class BlockQueueDemo {
    
        public static void main(String[] args) throws InterruptedException {
    
           // commonQueueDemo();
    
           // blockQueueDemo01();
    
           // blockQueueDemo02();
    
           //  blockQueueDemo03();
    
            blockQueueDemo04();
    
        }
    
    
        /**
         * 当队列阻塞后设置阻塞等待时间,超过等待时间后,抛弃数据
         * @throws InterruptedException
         */
        public static void blockQueueDemo04() throws InterruptedException {
    
            ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);
            System.out.println(queue.offer("a",5, TimeUnit.SECONDS));
            System.out.println(queue.offer("b"));
            System.out.println(queue.offer("c"));
            System.out.println(queue.offer("d",5, TimeUnit.SECONDS));
    
            System.out.println(queue.poll());
            System.out.println(queue.poll());
            System.out.println(queue.poll());
            System.out.println(queue.poll());
    
        }
    
        /**
         * 队列阻塞后一直等待,直到能成功把数据插入即可
         *保留一个问题:
         * 试了好多种方法,好像队列阻塞后,就没有方法让它唤醒嘛?使用remove移除数据或者clean清除数据都不好使??
         */
        public static void blockQueueDemo03() throws InterruptedException {
    
            ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);
            queue.put("a");
            queue.put("b");
            queue.put("c");
            queue.put("d");
    
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
    
        }
    
        /**
         * 队列阻塞后不抛出异常,丢弃数据,返回false状态
         *
         */
        public static void blockQueueDemo02(){
    
            ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);
            System.out.println(queue.offer("a"));
            System.out.println(queue.offer("b"));
            System.out.println(queue.offer("c"));
            System.out.println(queue.offer("d"));
    
            System.out.println(queue.poll());
            System.out.println(queue.poll());
            System.out.println(queue.poll());
            System.out.println(queue.poll());
    
        }
    
        /**
         * 队列阻塞后抛出异常
         * Exception in thread "main" java.lang.IllegalStateException: Queue full
         */
        public static void blockQueueDemo01(){
    
            ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);
            queue.add("a");
            queue.add("b");
            queue.add("c");
            queue.add("d");
    
            System.out.println(queue.poll());
            System.out.println(queue.poll());
            System.out.println(queue.poll());
            System.out.println(queue.poll());
    
        }
        /**
         * 普通队列实现,普通队列的存储都是可扩充的。
         * 看源码学习扩容机制
         */
        public static void commonQueueDemo(){
    
            ArrayDeque<String> deque = new ArrayDeque<>(3);
            deque.add("a");
            deque.add("b");
            deque.add("c");
            deque.add("d");
    
            System.out.println(deque.poll());
            System.out.println(deque.poll());
            System.out.println(deque.poll());
            System.out.println(deque.poll());
        }
    }
    
    
    package com.zxw.jucproject.module.queue;
    
    
    import java.util.concurrent.SynchronousQueue;
    import java.util.concurrent.TimeUnit;
    
    /**
     * SynchronousQueue只有一个元素队列,每一个插入操作都得等待另一个线程的删除操作。
     * 注意:
     *     1、当队列中已经存在一个元素即队列已满时,再插入数据后队列会阻塞
     *     2、当队列不存在数据时,去读取数据,队列不会阻塞,读取null
     */
    public class SynchronousQueueDemo {
    
        public static void main(String[] args) {
    
            SynchronousQueue<Object> queue = new SynchronousQueue<>();
    
            new Thread(()->{
                try {
                    System.out.println("放入a");
                    queue.put("a");
                    System.out.println("放入b");
                    queue.put("b");
                    System.out.println("放入c");
                    queue.put("c");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
            new Thread(()->{
                try {
    
                TimeUnit.SECONDS.sleep(3);
                System.out.println("取出"+queue.poll());
    
                TimeUnit.SECONDS.sleep(3);
                System.out.println("取出"+queue.poll());
    
                TimeUnit.SECONDS.sleep(3);
                System.out.println("取出"+queue.poll());
                System.out.println("取出"+queue.poll());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
        }
    }
    
    

    10、线程池

    池技术广泛应用、譬如线程池、连接池、内存池等,池技术主要是为了充分利用资源,避免资源浪费,以及减少资源连接和关闭时间消耗,提高程序性能

    线程池10个问题:https://www.cnblogs.com/konck/p/9473681.html

    1、线程池模型介绍

    image-20200305213739805.png

    2、Executors弊端

    //单例线程池,只有一个线程
    ExecutorService threadPool1 = Executors.newSingleThreadExecutor();
    
    /* 源码
    *  new LinkedBlockingQueue<Runnable>()创建Integer.MAX_VALUE大的队列空间
    *  若出现大量请求,则堆积的大量请求队列会耗费非常大的内存,甚至OOM
    **/
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
    //根据需要变化的线程池
    ExecutorService threadPool2 = Executors.newCachedThreadPool();
    
    /**
    * 源码
    * new SynchronousQueue<Runnable>()是容量只有1个的阻塞队列,所以不会堆积大量请求,但是
    * 最大线程数(maximumPoolSize) = Integer.MAX_VALUE ,若出现大量请求,会创造大量的线程,导致OOM
    */
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
    //自定义线程数线程池
    ExecutorService threadPool3 = Executors.newFixedThreadPool(5);
    //造成OOM的原因和Executors.newSingleThreadExcutor()一样
    

    3、ThreadPoolExecutor源码

     
     /**
         * 创建线程池初始方法
         *
         * @param corePoolSize 核心线程池数量
         * @param maximumPoolSize 最大线程池数量-(cpu核数)
         * @param keepAliveTime 超时等待时长
         * @param unit 超时等待时长单位
         * @param workQueue 阻塞队列
         * @param threadFactory 线程工厂
         * @param handler 拒绝策略
         */
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    4、应用ThreadPoolExecutor

    package com.zxw.jucproject.module.pool;
    import java.util.concurrent.*;
    
    /**
     * 线程池介绍
     *
     */
    public class ThreadPoolDemo {
    
    
        public static void main(String[] args) {
    
            //核心线程3个,每一个线程执行大约100ms,客户最多等待3s,那么阻塞队列数量可这么设定=3s/3/100ms=10个
    
            //new ThreadPoolExecutor.AbortPolicy() 拒绝策略,当总线程数和阻塞队列都满后,会抛出异常
            //ThreadPoolExecutor thread = new ThreadPoolExecutor(3,4,3L, TimeUnit.SECONDS,new LinkedBlockingDeque<>(10),Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    
            //线程阻塞后使用当前方法的进程来调用即main方法
           // ThreadPoolExecutor thread = new ThreadPoolExecutor(3,4,3L, TimeUnit.SECONDS,new LinkedBlockingDeque<>(10),Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
    
            //new ThreadPoolExecutor.DiscardOldestPolicy() 丢弃最旧的数据,执行刚拒绝的
           //ThreadPoolExecutor thread = new ThreadPoolExecutor(3,4,3L, TimeUnit.SECONDS,new LinkedBlockingDeque<>(10),Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
    
            //阻塞的直接丢弃
            ThreadPoolExecutor thread = new ThreadPoolExecutor(3,4,3L, TimeUnit.SECONDS,new LinkedBlockingDeque<>(10),Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
    
            try {
                for (int i = 0; i < 100; i++){
                    final int tmp = i;
                    thread.execute(()->{
                        System.out.println(Thread.currentThread().getName()+":"+tmp);
                    });
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                thread.shutdown();
            }
    
        }
    
        public static void main01(String[] args) {
    
            /**
             * 不推荐使用Excutors创建线程池,会出现内存泄漏情况OOM
             */
            //单线程池
            ExecutorService threadPool1 = Executors.newSingleThreadExecutor();
            //根据需要变化的线程池
            ExecutorService threadPool2 = Executors.newCachedThreadPool();
            //自定义线程数线程池
            ExecutorService threadPool3 = Executors.newFixedThreadPool(5);
    
            try {
    //        for (int i = 0; i < 10; i++){
    //            threadPool1.execute(()->{
    //                System.out.println(Thread.currentThread().getName());
    //            });
    //        }
    
    
    //        for (int i = 0; i < 10; i++){
    //            threadPool2.execute(()->{
    //                System.out.println(Thread.currentThread().getName());
    //            });
    //        }
    
    
                for (int i = 0; i < 10; i++){
                    threadPool3.execute(()->{
                        System.out.println(Thread.currentThread().getName());
                    });
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //使用结束后需要关闭
                threadPool3.shutdown();
            }
    
    
        }
    
    
    }
    

    5、拒绝策略源码剖析

    1、拒绝任务后抛出异常

        /**
         * 任务拒绝后抛出异常,数据也丢失
         */
        public static class AbortPolicy implements RejectedExecutionHandler {
            /**
             * Creates an {@code AbortPolicy}.
             */
            public AbortPolicy() { }
    
            /**
             * Always throws RejectedExecutionException.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             * @throws RejectedExecutionException always
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    

    2、直接丢弃拒绝任务

        /**
         * 直接丢弃拒绝任务
         */
        public static class DiscardPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code DiscardPolicy}.
             */
            public DiscardPolicy() { }
    
            /**
             * Does nothing, which has the effect of discarding task r.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }
    
    

    3、丢弃在队列中等待最久的数据

      /**
         * 丢弃等待最久的数据
         */
        public static class DiscardOldestPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code DiscardOldestPolicy} for the given executor.
             */
            public DiscardOldestPolicy() { }
    
            /**
             * Obtains and ignores the next task that the executor
             * would otherwise execute, if one is immediately available,
             * and then retries execution of task r, unless the executor
             * is shut down, in which case task r is instead discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }
    
    

    4、使用当前方法的线程运行

      /**
         * 使用当前方法的线程运行拒绝任务,通常为main线程
         */
        public static class CallerRunsPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code CallerRunsPolicy}.
             */
            public CallerRunsPolicy() { }
    
            /**
             * Executes task r in the caller's thread, unless the executor
             * has been shut down, in which case the task is discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
    

    6、最大线程池如何设定

    • CPU密集型: cpu核数就是最大的线程数
    • IO密集型: 譬如50个IO都是常用的IO资源,要设置大于常用的IO线程数

    11、四大函数式接口

    4大函数式接口,通常可以用lambda表达式或者方法的引用赋值对象

    lambda表达式更简单,使用lambda代替方法的引用赋值对象

    1、Function-apply

    package com.zxw.jucproject.module.function;
    
    
    import java.util.function.Function;
    
    /**
     * function要求两个参数,一个输入参数,一个输出参数 apply()
     */
    public class FunctionDemo {
    
        public static void main(String[] args) {
    
            //使用传统的引用对象赋值
            Function function = new Function<String,Integer>() {
                @Override
                public Integer apply(String str) {
                    return str.length();
                }
            };
            System.out.println(function.apply("hello world"));
    
            //函数式接口使用lambda表达式
            Function<String,Integer> function1 = (s)->{
                return s.length();
            };
            System.out.println(function1.apply("hello world"));
    
        }
    }
    
    

    2、Consumer-accept

    package com.zxw.jucproject.module.function;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.function.Consumer;
    
    /**
     * Consumer 要求一个参数,输入参数,没有返回值 方法:accept()
     */
    public class ConsumeDemo {
    
        public static void main(String[] args) {
    
            List<String> list = new ArrayList<>();
    
            //使用传统的引用对象赋值
            Consumer consumer = new Consumer<String>() {
                @Override
                public void accept(String o) {
                    list.add(o);
                }
            };
            consumer.accept("hello");
            System.out.println(list);
    
            //使用lambda表达式赋值
            Consumer<String> consumer1 = (s) -> {list.add(s);};
            consumer1.accept("world");
            System.out.println(list);
        }
    }
    
    

    3、Predicate-test

    package com.zxw.jucproject.module.function;
    
    import java.util.function.Predicate;
    
    /**
     * Predicate 要求一个参数,输入参数,返回Boolean类型 test()
     */
    public class PredicateDemo {
    
        public static void main(String[] args) {
    
            //使用传统的引用赋值
            Predicate predicate = new Predicate<String>() {
                @Override
                public boolean test(String o) {
                    return o.length()==5;
                }
            };
            System.out.println(predicate.test("hello"));
    
            //使用lambda表达式赋值
            Predicate<String> predicate1 = (s)->{return s.length() == 5;};
            System.out.println(predicate1.test("world"));
        }
    }
    
    

    4、Supplier-get

    package com.zxw.jucproject.module.function;
    
    import java.util.function.Supplier;
    
    /**
     * Supplier 不需要参数,有返回值 get()
     */
    public class SupplierDemo {
    
        public static void main(String[] args) {
    
            //使用传统的引用赋值
            Supplier supplier = new Supplier() {
                @Override
                public Object get() {
                    return "hello";
                }
            };
    
            System.out.println(supplier.get());
    
            //使用lambda表达式
            Supplier supplier1 = ()->{return "world";};
            System.out.println(supplier1.get());
    
        }
    }
    
    

    12、Stream流式计算

    注意看源码,尤其是参数和返回值

    package com.zxw.jucproject.module.stream;
    
    import java.util.*;
    import java.util.stream.Collectors;
    
    public class StreamDemo {
    
        /**
         *
         * @param args
         */
        public static void main(String[] args) {
    
            /**
             * 给出一个任务,筛选出
             *  成绩大于等于60分并且
             *  年龄小于16岁并且
             *  性别是女孩
             *  的同学,并且按成绩从高到低排名
             *
             *  用一行表达式解决
             */
            Student s1 = new Student("张三",15,"abc",60,"man");
            Student s2 = new Student("里斯",18,"del",80,"women");
            Student s3 = new Student("小屋",11,"ef",67,"women");
            Student s4 = new Student("老刘",16,"xa",45,"man");
            Student s5 = new Student("小巴",13,"axk",50,"women");
            Student s6 = new Student("老旧",9,"sej",92,"women");
    
            List<Student> students = Arrays.asList(s1, s2, s3, s4, s5, s6);
            /**
             * 学习:
             * Student::getScore 是lambda的缩写   等价于 (s)->{return s.getSocre();}
             * filter : 筛选出满足条件的对象,返回仍是Stream<Student>
             * sorted: 排序。Comparator.comparing(Student::getScore)根据分数正向排序,返回Stream<Student>
             * map: 对指定属性进行操作,返回指定属性的类型:.map(s->{return s.getCourse().toUpperCase();})将返回Stream<String> 看源码可知
             * 
             */
            List<Student> list = students.stream().filter(s -> (s.getScore() >= 60 && s.getSex().equals("women") && s.getAge() < 16)).sorted(Comparator.comparing(Student::getScore).reversed()).collect(Collectors.toList());
            list.forEach(s->{System.out.println(s.toString());});
    
    
        }
    }
    
    

    13、分支合并ForkJoin

    ForkJoin实现原理递归,大数据量下计算比普通方法效率高,但不如流计算

    package com.zxw.jucproject.module.forkjoin;
    
    import java.util.concurrent.RecursiveTask;
    
    public class ForkJoinDemo extends RecursiveTask<Long> {
    
        private Long start;
        private Long end;
        private final static Long tempFlag = 10000L;
    
        ForkJoinDemo(Long start,Long end){
            this.start = start;
            this.end  = end;
        }
    
        @Override
        protected Long compute() {
            //临界值
            if ((end-start)<tempFlag){
                long sum = 0L;
                for (Long i = start; i <= end; i++){
                    sum+=i;
                }
                return sum;
            }else{
                Long middle = (end+start)/2;
                ForkJoinDemo left = new ForkJoinDemo(start, middle);
                //压入队列
                left.fork();
                ForkJoinDemo right = new ForkJoinDemo(middle + 1, end);
                //压入队列
                right.fork();
                return left.join()+right.join();
            }
        }
    }
    
    

    普通方法、forkjoin方法、流计算方法测试比较性能

    package com.zxw.jucproject.module.forkjoin;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.stream.LongStream;
    
    public class MyTest {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            test1();
            test2();
            test3();
           // test4();
        }
    
        //普通方法计算
        private static void test3() {
    
            Long sum = 0L;
            Long start = System.currentTimeMillis();
            //1_0000_0000是语法糖,便于识别
            for (Long i = 0L; i <= 1_0000_0000L;i++){
                sum+=i;
            }
            Long end = System.currentTimeMillis();
            System.out.println("耗时--->"+(end-start)+",结果--->"+sum);
        }
    
        //forkjoin递归
        private static void test2() throws ExecutionException, InterruptedException {
    
            Long start = System.currentTimeMillis();
    
            ForkJoinDemo test = new ForkJoinDemo(0L, 1_0000_0000L);
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            ForkJoinTask<Long> submit = forkJoinPool.submit(test);
            Long sum = submit.get();
            Long end = System.currentTimeMillis();
            System.out.println("耗时--->"+(end-start)+",结果--->"+sum);
        }
    
        //流计算
        private static void test1() {
    
            Long start = System.currentTimeMillis();
            long sum = LongStream.rangeClosed(0L, 1_0000_0000L).sum();
            Long end = System.currentTimeMillis();
            System.out.println("耗时--->"+(end-start)+",结果--->"+sum);
    
        }
    
        // forkjoin  6000
        private static void test4() throws ExecutionException, InterruptedException {
    
            long start = System.currentTimeMillis();
    
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            ForkJoinWork forkJoinWork = new ForkJoinWork(0L, 1_0000_0000L);
            ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinWork);
            Long sum = submit.get();
    
            long end = System.currentTimeMillis();
    
            System.out.println("times:"+(end-start)+" r=>"+sum);
        }
    }
    
    

    14、异步回调

    前端有异步请求(ajax),两步走:1、异步请求方法、2、返回方法值

    那为什么会有异步回调的需求呢?

    image-20200308151913619.png

    异步回调跟多线程类似,就是多开一个线程执行任务,但是有独特功能

    package com.zxw.jucproject.module.future;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureDemo {
    
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            CompletableFuture completableFuture = new CompletableFuture();
    
    //        //没有回调结果的
    //        CompletableFuture<Void> voidCompletableFuture = CompletableFuture.runAsync(() -> {
    //
    //            try {
    //                TimeUnit.SECONDS.sleep(2);
    //                System.out.println(Thread.currentThread().getName() + "没有返回值");
    //            } catch (InterruptedException e) {
    //                e.printStackTrace();
    //            }
    //        });
    //        System.out.println("证明上面任务是异步执行的!");
    //        //注意get()方法是获取异步任务结束后的结果,它会等待异步任务执行完毕
    //        System.out.println(voidCompletableFuture.get());
    
            //有回调结果的,且没有异常抛出
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName());
                return 12;
            });
            //当不知道参数是做什么时,把它打印出来就完事
            //当编译完成时
            System.out.println(future.whenComplete((u, v) -> {
                System.out.println("u:------------>" + u);      //u=12返回结果
                System.out.println("v:------------>" + v);      //v=null
            }).exceptionally((e) -> {
                System.out.println(e.getMessage());
                return 500;
            }).get());      //12
    
            System.out.println("-------------------------天马行空分割线-------------------");
            //有回调结果的,并且有异常抛出
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName());
                int a = 10/0;
                return 12;
            });
            //当不知道参数是做什么时,把它打印出来就完事
            //当编译完成时
            System.out.println(future2.whenComplete((u, v) -> {
                System.out.println("u:------------>" + u);      //u=null
                System.out.println("v:------------>" + v);      //v=java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
            }).exceptionally((e) -> {
                System.out.println(e.getMessage());
                return 500;
            }).get());  //500
    
    
        }
    }
    
    

    15、常用辅助类

    1、CountDownLatch

    package com.zxw.jucproject.module.utils;
    
    import java.util.concurrent.CountDownLatch;
    
    /**
     *CountDownLotch允许一个线程或多个线程等待直到其他线程完成的辅助工具
     * 需要注意这是一次性的现象-计数无法重置
     * 实际场景很多:
     *  我们使用多线程调用三方数据源,并且需要取到所有的数据后才进行其它操作。所有之后的线程都需要等待获取数据这个多线程执行完毕
     *
     */
    public class CountDownLatchDemo {
    
        public static void main(String[] args) throws InterruptedException {
    
            CountDownLatch countDownLatch = new CountDownLatch(3);
    
            for (int i =0; i < 3; i++){
                final int tmp = i;
                new Thread(()->{
                    System.out.println(Thread.currentThread().getName());
                    countDownLatch.countDown();//只要执行完一个线程,就减1
                },String.valueOf(tmp)).start();
            }
            //只有CountDownLatch设定的所有线程执行完毕才会自动解锁,不然一直阻塞
            countDownLatch.await();
            //该线程得等到上述线程执行完毕后才能执行
            System.out.println(Thread.currentThread().getName());
        }
    }
    
    

    2、CyclicBarrier

    package com.zxw.jucproject.module.utils;
    
    import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    /**
     * 允许一组线程全部等待彼此达到共同屏障点的同步辅助。
     * 循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。
     * 屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。
     */
    public class CyclicBarrierDemo {
    
        public static void main(String[] args) {
    
            //当给定数量的线程(线程)等待时,它将跳闸,当屏障跳闸时执行给定的屏障动作,由最后一个进入屏障的线程执行。
            CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
                System.out.println(Thread.currentThread().getName()+"所有线程都执行完毕了我再执行");
            });
    
            for (int i = 0; i < 7; i++){
                new Thread(()->{
                    System.out.println(Thread.currentThread().getName());
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }
    
    

    3、semaphore

    主要用在两种地方:多线程共享资源互斥!并发线程的控制!

    package com.zxw.jucproject.module.utils;
    
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    
    /**
     *  计数信号量
     *  作用:固定线程数
     *  实际场景:
     *  总共有三个车位,7辆车进来如何停
     */
    public class SemaphoreDemo {
    
        public static void main(String[] args) {
    
            //模拟三个车位
            Semaphore semaphore = new Semaphore(3);
    
            //模拟7辆车
            for(int i = 0; i < 7;i++){
                final int tmp = i;
                new Thread(()->{
                    try {
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName()+"抢到了车位");
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println(Thread.currentThread().getName()+"离开了车位");
    
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        semaphore.release();
                    }
                },String.valueOf(tmp)).start();
            }
        }
    }
    
    

    16、 JMM(Java内存模型)

    JAVA虚拟机中规范了Java内存模型(java memory model JMM),用来屏蔽掉各种硬件以及不同操作系统内存访问的差异,保证Java在不同平台能达到并发一致性。

    JVM内存模型是JVM内存分区,JMM内存模型是一种虚拟机规范

    Java内存模型和硬件内存架构联系图:

    image-20200314085554621.png

    从抽象的角度来看,JMM定义了线程和主内存之间的抽象关系:

    • 线程之间的共享变量存储在主内存(Main Memory)中
    • 每个线程都有一个私有的本地内存(Local Memory),本地内存是JMM的一个抽象概念,并不真实存在,它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优化。本地内存中存储了该线程以读/写共享变量的拷贝副本。
    • 从更低的层次来说,主内存就是硬件的内存,而为了获取更好的运行速度,虚拟机及硬件系统可能会让工作内存优先存储于寄存器和高速缓存中。
    • Java内存模型中的线程的工作内存(working memory)是cpu的寄存器和高速缓存的抽象描述。而JVM的静态内存储模型(JVM内存模型)只是一种对内存的物理划分而已,它只局限在内存,而且只局限在JVM的内存。

    关于主内存与工作内存之间的具体交互协议,即一个变量如何从主内存拷贝到工作内存、如何从工作内存同步到主内存之间的实现细节,Java内存模型定义了以下八种操作来完成:

    • lock(锁定):作用于主内存的变量,把一个变量标识为一条线程独占状态。
    • unlock(解锁):作用于主内存变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定。
    • read(读取):作用于主内存变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用
    • load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工作内存的变量副本中。
    • use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时将会执行这个操作。
    • assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋值给工作内存的变量,每当虚拟机遇到一个给变量赋值的字节码指令时执行这个操作。
    • store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作。
    • write(写入):作用于主内存的变量,它把store操作从工作内存中一个变量的值传送到主内存的变量中。

    Java内存模型还规定了在执行上述八种基本操作时,必须满足如下规则:

    • 如果要把一个变量从主内存中复制到工作内存,就需要按顺寻地执行read和load操作, 如果把变量从工作内存中同步回主内存中,就要按顺序地执行store和write操作。但Java内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行。
    • 不允许read和load、store和write操作之一单独出现
    • 不允许一个线程丢弃它的最近assign的操作,即变量在工作内存中改变了之后必须同步到主内存中。
    • 不允许一个线程无原因地(没有发生过任何assign操作)把数据从工作内存同步回主内存中。
    • 一个新的变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化(load或assign)的变量。即就是对一个变量实施use和store操作之前,必须先执行过了assign和load操作。
    • 一个变量在同一时刻只允许一条线程对其进行lock操作,但lock操作可以被同一条线程重复执行多次,多次执行lock后,只有执行相同次数的unlock操作,变量才会被解锁。lock和unlock必须成对出现
    • 如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前需要重新执行load或assign操作初始化变量的值
    • 如果一个变量事先没有被lock操作锁定,则不允许对它执行unlock操作;也不允许去unlock一个被其他线程锁定的变量。
    • 对一个变量执行unlock操作之前,必须先把此变量同步到主内存中(执行store和write操作)。

    多线程读同步与可见性

    可见性(共享对象可见性):线程对共享变量修改的可见性。当一个线程修改了共享变量的值,其他线程能够立刻得知这个修改

    线程缓存导致的可见性问题:

    如果两个或者更多的线程在没有正确的使用volatile声明或者同步的情况下共享一个对象,一个线程更新这个共享对象可能对其它线程来说是不可见的:共享对象被初始化在主存中。跑在CPU上的一个线程将这个共享对象读到CPU缓存中,然后修改了这个对象。只要CPU缓存没有被刷新会主存,对象修改后的版本对跑在其它CPU上的线程都是不可见的。这种方式可能导致每个线程拥有这个共享对象的私有拷贝,每个拷贝停留在不同的CPU缓存中。

    下图示意了这种情形。跑在左边CPU的线程拷贝这个共享对象到它的CPU缓存中,然后将count变量的值修改为2。这个修改对跑在右边CPU上的其它线程是不可见的,因为修改后的count的值还没有被刷新回主存中去。

    img

    解决这个内存可见性问题你可以使用:

    • Java中的volatile关键字:volatile关键字可以保证直接从主存中读取一个变量,如果这个变量被修改后,总是会被写回到主存中去。Java内存模型是通过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值这种依赖主内存作为传递媒介的方式来实现可见性的,无论是普通变量还是volatile变量都是如此,普通变量与volatile变量的区别是:volatile的特殊规则保证了新值能立即同步到主内存,以及每个线程在每次使用volatile变量前都立即从主内存刷新。因此我们可以说volatile保证了多线程操作时变量的可见性,而普通变量则不能保证这一点。
    • Java中的synchronized关键字:同步快的可见性是由“如果对一个变量执行lock操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前需要重新执行load或assign操作初始化变量的值”、“对一个变量执行unlock操作之前,必须先把此变量同步回主内存中(执行store和write操作)”这两条规则获得的。
    • Java中的final关键字:final关键字的可见性是指,被final修饰的字段在构造器中一旦被初始化完成,并且构造器没有把“this”的引用传递出去(this引用逃逸是一件很危险的事情,其他线程有可能通过这个引用访问到“初始化了一半”的对象),那么在其他线程就能看见final字段的值(无须同步)

    17、Volatile

    谈谈你对volatile的理解

    1、保证内存可见性(保证每个线程在读取主内存中的变量都是最新的,即这个变量一旦被修改后,都会在其他线程读取之前同步到主内存中)

    2、不保证原子性

    3、禁止指令重排

    package com.zxw.jucproject.module.jmm;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * volatile特点:
     * 1、保证内存可见性
     * 2、不保证原子性
     * 3、禁止指令重排
     * 
     * 该例子证明volatile可以保证内存可见性
     */
    public class VolatileDemo {
    
        private volatile static int num = 0;
        
        public static void main(String[] args) throws InterruptedException {
    
            new Thread(()->{
                while (num == 0){
    
                }
            },"A").start();
    
            TimeUnit.SECONDS.sleep(1L);
            //此时确保线程A一直在循环内
            num = 1;
            System.out.println(num);
    
        }
    }
    
    
    package com.zxw.jucproject.module.jmm;
    
    /**
     * 该例子证明volatile不能保证原子性
     */
    public class VolatileDemo2 {
    
        private volatile static int num = 0;
    
        public static void add(){
            num++;
        }
    
        public static void main(String[] args) {
    
    //        创建10个线程,若能保证原子性,则结果为1000
            for (int i = 0; i < 10; i++){
                new Thread(()->{
                    for (int j = 0; j < 100; j++){
                        add();
                    }
                }).start();
            }
    
            //一个程序运行最少存在两个线程:main线程和gc线程
            if (Thread.activeCount() > 2){
                //yeild线程让步,让其他线程有限执行,自己稍后执行
                Thread.yield();
            }
            System.out.println(Thread.currentThread().getName()+num);
        }
    
        /**
         * 该结果为main0
         * 原因:因为main线程比A线程早执行,可以在add方法中打印输出
         * @param args
         */
        public static void main01(String[] args){
            
            new Thread(()->add(),"A").start();
            System.out.println(Thread.currentThread().getName()+num);//输出:main0
        }
    }
    

    那么如何才能保证原子性,答:使用锁,synchronize或者lock

    单纯这个例子而言,不能保证原子性的操作根本原因在于num++语法不是原子性的,底层实现是先获取num值,再+1再保存值。那么有没有什么整数类型能保证原子性吗?

    答:查API文档java.util.concurrent.atomic包

     package com.zxw.jucproject.module.jmm;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * AtomicInteger整型操作保证原子性
     */
    public class AtomicIntegerDemo {
    
        private static AtomicInteger num = new AtomicInteger(0);
    
        public static  void add(){
            num.addAndGet(1);
        }
    
        public static void main(String[] args) {
    
            for (int i = 0; i < 10; i++){
                new Thread(()->{
                    for (int j = 0; j < 100; j++){
                        add();
                    }
                }).start();
            }
    
            if (Thread.activeCount() > 2){
                Thread.yield();
            }
            System.out.println(num);
        }
    
    }
    
    

    指令重排: 编译器可以安排程序的语句执行顺序

    例子:

    int x = 1,y=2;//1
    x = x+1;//2
    y = y+2;//3
    int z = x*y//4
    执行顺序可以是:1234、1324
    

    编译器进行指令重排时会考虑指令数据之间的依赖性,所以在单线程下指令重排一定是安全的

    但是多线程无法保证

    例子:

    int x,y,a,b=0;
    
    //线程A                                                               //线程B
    x = a;                                                              y=b;
    b = 1;                                                              a=1;
    
    
    
    //正确的结果是x=0,y=0
    //但若进行指令重排(线程A和线程B指令之间不存在依赖性,所以可以进行指令重排)
    //线程A                                                               //线程B
    b = 1;                                                              a=1;
    x = a;                                                              y=b;
    //可能结果就是x=1,y=1
    

    volatile能禁止指令重排,他能给各个指令提供内存屏障memory barrier,确保指令按顺序执行

    18、单例模式

    单例模式核心思想就是构造方法私有化

    1、恶汉模式

    package com.zxw.jucproject.module.single;
    
    /**
     * 单例模式: 构造方法私有
     * 恶汉式
     */
    public class HungrySingleDemo {
    
        //占用内存空间大,若在类未加载情况下已经加载,浪费内存空间
        private byte[] data = new byte[1024*1024];
    
        private HungrySingleDemo(){}
    
        //final,static 类在未加载之前就已经初始化分配内存了,恶汉模式浪费内存
        public final static HungrySingleDemo hungrySingle = new HungrySingleDemo();
    
    
        public static  HungrySingleDemo getInstance(){
    
            return hungrySingle;
        }
    
    
    }
    

    2、懒汉模式

    package com.zxw.jucproject.module.single;
    
    /**
     * 单例模式:
     * 懒汉式
     * 在多线程下失效,因为lazySingle = new LazySingleDemo();不是原子性操作
     */
    public class LazySingleDemo {
    
        private String data;
    
        private LazySingleDemo(){
            System.out.println(Thread.currentThread().getName()+"init");
        }
    
        private static  LazySingleDemo lazySingle;
    
        public static LazySingleDemo getInstance(){
            if (lazySingle == null){
                lazySingle = new LazySingleDemo();
            }
            return lazySingle;
        }
    
        public static void main(String[] args) {
    
            for (int i = 0; i < 10; i++) {
                new Thread(()->{
                   LazySingleDemo lazySingleDemo = LazySingleDemo.getInstance();
                },String.valueOf(i)).start();
            }
        }
    }
    
    

    3、DCL懒汉式

    DCL:double check lock,双重检查锁

    package com.zxw.jucproject.module.single;
    
    /**
     * 单例模式
     * DCL懒汉式
     * DCL:double check lock 双重检查锁
     *
     */
    public class DclLazySingleDemo {
    
        private String data;
    
        private DclLazySingleDemo(){
            System.out.println(Thread.currentThread().getName()+"init");
        }
    
        private static  DclLazySingleDemo dclLazySingle;
    
        public static DclLazySingleDemo getInstance(){
            if (dclLazySingle == null) {
                synchronized (DclLazySingleDemo.class) {
                    if (dclLazySingle == null) {
                        dclLazySingle = new DclLazySingleDemo();
                    }
                }
            }
            return dclLazySingle;
        }
    
        public static void main(String[] args) {
    
            for (int i = 0; i < 10; i++) {
                new Thread(()->{
                    DclLazySingleDemo dclLazySingle = DclLazySingleDemo.getInstance();
                },String.valueOf(i)).start();
            }
        }
    
    }
    

    4、反射破坏单例

    package com.zxw.jucproject.module.single;
    
    import java.lang.reflect.Constructor;
    import java.lang.reflect.InvocationTargetException;
    
    /**
     * 单例模式
     * DCL懒汉式
     * DCL:double check lock 双重检查锁
     *
     */
    public class DclLazySingleDemo {
    
        private String data;
    
        private DclLazySingleDemo(){
            System.out.println(Thread.currentThread().getName()+"init");
        }
    
        private static  DclLazySingleDemo dclLazySingle;
    
        public static DclLazySingleDemo getInstance(){
            if (dclLazySingle == null) {
                synchronized (DclLazySingleDemo.class) {
                    if (dclLazySingle == null) {
                        dclLazySingle = new DclLazySingleDemo();
                    }
                }
            }
            return dclLazySingle;
        }
    
        /**
         * 反射可破坏DCL懒汉式单例模式
         * @param args
         * @throws NoSuchMethodException
         * @throws IllegalAccessException
         * @throws InvocationTargetException
         * @throws InstantiationException
         */
        public static void main(String[] args) throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
    
            for (int  i = 0; i < 5; i++) {
                new Thread(()->{
                    Constructor<DclLazySingleDemo> declaredConstructor = null;
                    try {
                        declaredConstructor = DclLazySingleDemo.class.getDeclaredConstructor();
                        DclLazySingleDemo dclLazySingleDemo1 = declaredConstructor.newInstance();
                    } catch (InstantiationException e) {
                        e.printStackTrace();
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    } catch (InvocationTargetException e) {
                        e.printStackTrace();
                    } catch (NoSuchMethodException e) {
                        e.printStackTrace();
                    }
                }).start();
    
            }
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:JUC并发编程引导学习(超长篇)

          本文链接:https://www.haomeiwen.com/subject/ayytphtx.html