美文网首页
JUC并发编程(二)

JUC并发编程(二)

作者: 勿念及时雨 | 来源:发表于2020-03-07 01:51 被阅读0次

    读写锁

    写锁:也叫独占锁,一次只能被一个线程占有。
    读锁:也叫共享锁,该锁可以被多个线程占有。
    ReadWriteLock,即读写锁,正如它的名字一样,它包含了读锁和写锁,一个用于只读操作,一个用于写入操作,我们先来看看JDK文档中对它的说明。

    读写锁
    创建一个读写锁对象:
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    

    加读锁和解读锁:

    readWriteLock.readLock().lock(); 
    readWriteLock.readLock().unlock();
    

    加写锁和解写锁:

    readWriteLock.writeLock().lock(); 
    readWriteLock.writeLock().unlock();
    

    数据读写时可以使用读写锁来保证线程安全,示例代码如下:

    package com.wunian.juc.rwlock;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.locks.ReadWriteLock;
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    /**
     * 独占锁(写锁)一次只能被一个线程占有
     * 共享锁(读锁)一个锁可以被多个线程占有
     */
    public class ReadWriteLockDemo {
        public static void main(String[] args) {
            MyCacheLock myCache=new MyCacheLock();
            //模拟线程
            //写
            for (int i=1;i<=5;i++){
                final int tempInt=i;
                new Thread(()->{
                    myCache.put(tempInt+"",tempInt+"");
                },String.valueOf(i)).start();
            }
            //读
            for (int i=1;i<=5;i++){
                final int tempInt=i;
                new Thread(()->{
                    myCache.get(tempInt+"");
                },String.valueOf(i)).start();
            }
        }
    }
    
    //加锁后的读写操作
    class MyCacheLock{
    
        private volatile Map<String,Object> map=new HashMap<>();
    
        //读写锁
        private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
    
        //读:可以被多个线程同时读
        public void get(String key){
            //锁一定要匹配,否则可能导致死锁
            readWriteLock.readLock().lock();//读锁,被多个线程同时持有
            try {
                System.out.println(Thread.currentThread().getName()+"读取"+key);
                Object o=map.get(key);
                System.out.println(Thread.currentThread().getName()+"读取结果"+o);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                readWriteLock.readLock().unlock();//解锁
            }
        }
    
        //写:应该保证原子性,不应该被打扰,写线程写入的过程中如果不加锁,会被读线程打扰
        public void put(String key,Object value){
            readWriteLock.writeLock().lock();//写锁,只能被一个线程占有
            try {
                System.out.println(Thread.currentThread().getName()+"写入"+key);
                map.put(key,value);
                System.out.println(Thread.currentThread().getName()+"写入成功");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                readWriteLock.writeLock().unlock();//解锁
            }
        }
    }
    

    阻塞队列

    队列:是一种先进先出的数据结构。
    栈:是一种后进先出的数据结构。
    阻塞队列是一种队列,我们先来看看JDK文档中对它的说明。

    阻塞队列
    阻塞队列在什么情况下一定会被阻塞?
    • 当队列是满的,如果还要往它里面添加元素就会被阻塞。
    • 当队列是空的,如果还要取它里面的元素就会被阻塞。

    什么时候使用阻塞队列?
    当编写多线程程序时,对于线程之间的通信,不需要关心唤醒的情况下可以使用阻塞队列。
    阻塞队列是新知识吗?
    List、Set这些集合类我们都学过,阻塞队列和它们是一样的,我们可以来看一张集合类的关系图。

    集合类的关系图
    由上图可知,BlockingQueue是Queue的子类,而Queue与List、Set一样都是Collection类的子类。
    ArrayBlockingQueue是BlockingQueue的子类,它含有四组对元素的插入和获取的API,我们可以用表格来对比一下。
    方法 会抛出异常 返回布尔值,不会抛出异常 延时等待 一直等待
    插入 add() offer(e) offer(e,time) put()
    取出 remove() poll() poll(time) take()
    检查 element() peek() - -

    四组API的示例代码如下:

    package com.wunian.juc.queue;
    
    import com.sun.scenario.effect.impl.sw.java.JSWBlend_SRC_OUTPeer;
    
    import java.sql.Time;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 阻塞队列
     */
    public class BlockingQueueDemo {
    
        public static void main(String[] args) throws InterruptedException {
    
            ArrayBlockingQueue blockingQueue=new ArrayBlockingQueue(3);
            //队列已满出现的四种情况:报错、抛弃不报错、一直等待、超时等待!
            //java.lang.IllegalStateException: Queue full
            blockingQueue.add("a");
            blockingQueue.add("b");
            blockingQueue.add("c");
            //blockingQueue.add("d");//会抛出队列已满的异常
            System.out.println(blockingQueue.element());//检测第一个元素,输出
    
    //        System.out.println(blockingQueue.remove());
    //        System.out.println(blockingQueue.remove());
    //        System.out.println(blockingQueue.remove());
    
            //返回布尔值,不抛出异常
    //        System.out.println(blockingQueue.offer("a"));//true
    //        System.out.println(blockingQueue.offer("b"));//true
    //        System.out.println(blockingQueue.offer("c"));//true
    //        //尝试等待三秒,三秒钟后会失败,返回false
    //        //System.out.println(blockingQueue.offer("d", 3L,TimeUnit.SECONDS));
    //        System.out.println(blockingQueue.peek());//检测第一个元素,输出
    //
    //        System.out.println("================================");
    //        System.out.println(blockingQueue.poll());
    //        System.out.println(blockingQueue.poll());
    //        System.out.println(blockingQueue.poll());
    //        //虽然是空的,还是会等待3秒,然后返回null
    //        System.out.println(blockingQueue.poll(3L, TimeUnit.SECONDS));
    
    //        blockingQueue.put("a");
    //        blockingQueue.put("b");
    //        blockingQueue.put("c");
    //        //System.out.println("准备放入第四个元素");
    //        //blockingQueue.put("d");//队列满了会一直等,并且会阻塞
    //
    //        System.out.println("===============================");
    //        System.out.println(blockingQueue.take());
    //        System.out.println(blockingQueue.take());
    //        System.out.println(blockingQueue.take());
    //        System.out.println(blockingQueue.take());//队列空了会一直等,并且阻塞
        }
    }
    

    同步队列

    SynchronousQueue,即同步队列,是一种特殊的阻塞队列,因为它只有一个容量,并且每进行一个put操作,就需要有一个take操作。
    示例代码如下:

    package com.wunian.juc.queue;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.SynchronousQueue;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 同步队列
     * 只能存放一个值,一存一取(存一个必须取一个才能继续存)
     */
    public class SynchronuseQueueDemo {
    
        public static void main(String[] args) {
            //特殊的阻塞队列
            BlockingQueue<String> blockingQueue=new SynchronousQueue<>();
            //存
            new Thread(()->{
                try {
                    System.out.println(Thread.currentThread().getName()+" put  a");
                    blockingQueue.put("a");
                    System.out.println(Thread.currentThread().getName()+" put  b");
                    blockingQueue.put("b");
                    System.out.println(Thread.currentThread().getName()+" put  c");
                    blockingQueue.put("c");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            },"A").start();
            //取
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName()+" "+blockingQueue.take());
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName()+" "+blockingQueue.take());
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName()+" "+blockingQueue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            },"A").start();
        }
    }
    

    线程池

    线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。
    为什么要使用线程池?
    实现线程复用,线程复用可以提高线程的使用效率,保证内核的充分利用,防止过分调度。
    线程池的三大方法
    创建只有一个线程的线程池:

    ExecutorService pool= Executors.newSingleThreadExecutor();
    

    创建固定线程数的线程池:

    ExecutorService  pool=Executors.newFixedThreadPool(3);
    

    创建可伸缩的线程池:

    ExecutorService pool=Executors.newCachedThreadPool();
    

    线程池的七大参数
    我们先看看上面三大方法的源码:

    //可伸缩的线程池
    new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 约等于21亿
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    //固定线程数的线程池
    new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
    //单线程的线程池
    new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>())
    

    由以上代码可以知道,三大方法的底层还是创建的ThreadPoolExecutor对象,这也就是为什么阿里巴巴开发手册要求不能使用三大方法,而必须直接通过创建ThreadPoolExecutor对象来创建线程池的原因。
    我们再来看看定义了七大参数的构造器源码:

    public ThreadPoolExecutor(int corePoolSize, // 核心池线程数大小 (常用)
                                  int maximumPoolSize,  // 最大的线程数大小 (常用)
                                  // 超时等待时间,超过一定时间会把核心线程数以外的闲置线程关闭(常用)
                                  long keepAliveTime, 
                                  TimeUnit unit, // 时间单位 (常用)
                                  BlockingQueue<Runnable> workQueue, // 阻塞队列(常用)
                                  ThreadFactory threadFactory, // 线程工厂
                                  RejectedExecutionHandler handler // 拒绝策略(常用)) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
            null :
        AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    由以上代码可知,七大参数为:

    • int corePoolSize:核心池线程数,即线程池固定开启的线程数量。
    • int maximumPoolSize:最大线程数,当核心池线程全部都在处理任务,阻塞队列中的任务也已经满了,一旦增加了新任务,线程池会开启一个新的线程来处理,开启的线程数量最多为最大线程数,超过这个数量,将执行拒绝策略。
    • long keepAliveTime:超时等待时间,超过这个时间会将闲置的线程关闭,最后只保留核心池线程数量的线程。
    • TimeUnit unit:keepAliveTime参数的时间单位。
    • BlockingQueue< Runnable > workQueue:阻塞队列,用来存放等待线程执行的任务。
    • ThreadFactory threadFactory:线程工厂,默认值:Executors.defaultThreadFactory()
    • RejectedExecutionHandler handler:拒绝策略,当线程池中执行的线程数量达到最大线程数,新增的任务将根据拒绝策略进行处理。

    除了ThreadFactory这个参数使用的是系统默认的线程工厂外,其它六个参数都是必须掌握的。
    四大拒绝策略
    七大参数中的RejectedExecutionHandler参数,指的就是拒绝策略,源码中为我们提供了四大拒绝策略。

    四大拒绝策略
    • ThreadPoolExecutor.AbortPolicy(): 抛出异常,丢弃任务。
    • ThreadPoolExecutor.DiscardPolicy():不抛出异常,丢弃任务。
    • ThreadPoolExecutor.DiscardOldestPolicy(): 尝试获取任务,不一定执行。
    • ThreadPoolExecutor.CallerRunsPolicy():哪来的去哪里找对应的线程执行。

    最大线程数应该如何设置?

    • CPU密集型:根据CPU的处理器数量来决定,这样能够保证最大效率。
      获取电脑CPU核数:Runtime.getRuntime().availableProcessors();
    • IO密集型:例如有50个线程都是进程操作大IO资源,比较耗时,这时就要考虑最大线程数一定要大于这个常用IO的任务数,即最大线程数要大于50。
      最终代码如下:
    package com.wunian.juc.threadpool;
    
    import java.util.concurrent.*;
    
    /**
     * 线程池
     */
    public class ThreadPoolDemo {
    
        public static void main(String[] args) {
            ExecutorService pool=new ThreadPoolExecutor(
                    2,//核心线程数大小(常用)
                    //最大线程数大小(常用)
                    Runtime.getRuntime().availableProcessors(),//获取电脑CPU核数
                    //超时等待时间(常用,超过一定时间会把核心线程数以外的闲置线程关闭)
                    3L,
                    //时间单位(常用)
                    TimeUnit.SECONDS,
                    //阻塞队列(常用)
                    new LinkedBlockingDeque<>(3),
                    //线程工厂
                    Executors.defaultThreadFactory(),
                    //拒绝策略(常用,4种)
                    new ThreadPoolExecutor.CallerRunsPolicy()
            );
    
            try {
                //线程池的使用方式
                for(int i=1;i<=100;i++){
                    pool.execute(()->{
                        System.out.println(Thread.currentThread().getName()+" ok");
    
                    });
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //使用完毕后需要关闭!
                pool.shutdown();
            }
        }
    }
    

    四个函数式接口

    函数式接口在java.util.function包下面,所有的函数式接口都可以用来简化编程模型,都可以使用lambda表达式简化。
    必须掌握的四个函数式接口

    • Function:有一个输入参数,有一个输出参数。
    • Consumer:有一个输入参数,没有输出参数。
    • Supplier:没有输入参数,只有输出参数。
    • Predicate:有一个输入参数,判断是否正确

    lambda表达式语法格式可以简单概括为:(参数)->{方法体},示例代码如下:

    package com.wunian.juc.function;
    
    import java.util.function.Consumer;
    import java.util.function.Function;
    import java.util.function.Predicate;
    import java.util.function.Supplier;
    
    /**
     * 函数式接口是现在必须掌握且精通的
     *所有的函数式接口都可以使用lambda表达式简化
     * lambda表达式是java8必须掌握的
     *
     */
    public class FunctionInterfaceDemo {
    
        public static void main(String[] args) {
    
            /*Function<String,Integer> function=new Function<String,Integer>(){
    
                @Override
                public Integer apply(String s) {
                    return s.length();
                }
            };*/
    
            //Function  lambda表达式格式 (参数)->{方法体}
            Function<String,Integer> function=(str)->{return str.length();};
            System.out.println(function.apply("1234565"));
    
            /*Predicate<String> predicate=new Predicate<String>() {
                @Override
                public boolean test(String s) {
                    return s.isEmpty();
                }
            };*/
    
            //Predicate lambda表达式格式 (参数)->{方法体}
            Predicate<String> predicate=str->{return str.isEmpty();};
            System.out.println(predicate.test("qqq"));
    
            /*Supplier<String> supplier=new Supplier<String>() {
                @Override
                public String get() {
                    return "hello word";
                }
            };*/
    
            //Supplier lambda表达式格式 (参数)->{方法体}
            Supplier<String> supplier=()->{return "hello juc";};
            System.out.println(supplier.get());
    
            /*Consumer<String> consumer=new Consumer<String>() {
                @Override
                public void accept(String s) {
    
                    System.out.println(s);
    
                }
            };*/
    
            //Consumer lambda表达式格式 (参数)->{方法体}
            Consumer<String> consumer=(s)->{
                System.out.println(s);
            };
            consumer.accept("hello consumer");
        }
    }
    

    Stream流式计算

    数据库和集合都是用来存数据的,我们可以把计算和处理数据交给Stream。先来看看JDK文档中对它的说明。


    Stream

    比如现在有一个集合,需要按条件用户筛选以下条件:
    1、id 为偶数
    2、年龄大于24
    3、用户名大写 映射
    4、用户名倒排序
    5、输出一个用户
    并且只能用一行代码完成!
    使用Stream流式计算,这个问题很好处理,代码如下:

    package com.wunian.juc.stream;
    
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * 流式计算
     * 数据库、集合:存数据的
     *计算和处理数据交给Stream
     */
    public class StreamDemo {
        public static void main(String[] args) {
            User u1=new User(1,"a",23);
            User u2=new User(2,"b",20);
            User u3=new User(3,"c",26);
            User u4=new User(4,"d",30);
            User u5 =new User(5,"e",21);
            //存储
            List<User> users= Arrays.asList(u1,u2,u3,u4,u5);
            //计算等操作交给流
            //forEach(消费者类型接口)
            users.stream()
                    .filter(u->{return u.getId()%2==0;})
                    .filter(u->{return u.getAge()>24;})
                    .map(u->{return u.getName().toUpperCase();})
                    .sorted((o1,o2)->{return o2.compareTo(o1);})
                    .limit(1)
                    .forEach(System.out::println);
        }
    }
    

    ForkJoin分支合并

    ForkJoin采用了分治算法思想,在必要的情况下,将一个大任务,进行拆分(fork) 成若干个子任务(拆到不能再拆,这里就是指我们制定的拆分的临界值),再将一个个小任务的结果进行join汇总。
    MapReduce:input->split->map->reduce->output
    主要就是两步:
    1、任务拆分
    2、结果合并

    ForkJoin原理示意图
    前提
    使用ForkJoin的前提是在大数据量的情况下,如果数据量很小,ForkJoin的效率还不如不用来的快。
    ForkJoin的工作原理
    假如两个CPU上有不同的任务,这时候B已经执行完,A还有任务等待执行,这时候B就会将A队尾的任务偷过来,加入自己的队列中,这叫做工作窃取,ForkJoin的底层维护的是一个双端队列
    好处:处理效率高。
    坏处:可能产生资源争夺。
    ForkJoin的工作原理
    我们可以使用求和计算测试一下ForkJoin,这就需要用到RecursiveTask类了,先来看看它的JDK文档。
    RecursiveTask
    先来创建一个类继承RecursiveTask,重写其compute方法,代码如下:
    package com.wunian.juc.forkjoin;
    
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    /**
     * 求和计算
     */
    public class ForkJoinDemo  extends RecursiveTask<Long> {
    
        private Long start;
        private Long end;
        private static final Long tempLong=10000L;//临界值,只要超过了这个值,ForkJoin效率就会更高
    
        public ForkJoinDemo(Long start, Long end) {
            this.start = start;
            this.end = end;
        }
        //计算方法
        @Override
        protected Long compute() {
            if((end-start)<=tempLong){
                Long sum=0L;
                for(Long i=start;i<=end;i++){
                    sum+=i;
                }
                return sum;
            }else{//超过临界值,用第二种方式
                long middle=(end+start)/2;
                //ForkJoin实际上是通过递归来实现
                ForkJoinDemo right=new ForkJoinDemo(start,middle);
                right.fork();//压入线程队列
                ForkJoinDemo left=new ForkJoinDemo(middle+1,end);
                left.fork();//压入线程队列
                //获得结果join,会阻塞等待结果
                return right.join()+left.join();
            }
        }
    }
    

    创建一个测试类,分别测试普通方法、ForkJoin方法、并行流计算方法的计算效率,代码如下:

    package com.wunian.juc.forkjoin;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.stream.LongStream;
    
    /**
     * 测试forkjoin
     */
    public class MyTest {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //test1();//6345
            //test2();//13476
            test3();//902
        }
    
        //普通方法
        public  static void test1() {
            long sum=0L;
            long start=System.currentTimeMillis();
            for(Long i=0L;i<=10_0000_0000L;i++){
                sum+=i;
            }
            long end=System.currentTimeMillis();
            System.out.println("times:"+(end-start)+" rs=>:"+sum);
        }
    
        //forkjoin(计算大数据量时才有效果,数据较小时可能还不如普通方法快)
        public  static void test2() throws ExecutionException, InterruptedException {
            long start=System.currentTimeMillis();
            ForkJoinPool forkJoinPool=new ForkJoinPool();
            ForkJoinDemo forkJoinWork=new ForkJoinDemo(0L,10_0000_0000L);
            ForkJoinTask<Long> submit=forkJoinPool.submit(forkJoinWork);
            Long sum=submit.get();
            long end=System.currentTimeMillis();
            System.out.println("times:"+(end-start)+" rs=>:"+sum);
        }
    
        //并行流计算
        public  static void test3() {
            long sum=0L;
            long start=System.currentTimeMillis();  
     sum=LongStream.rangeClosed(0L,10_0000_0000L).parallel().reduce(0,Long::sum);//并行计算
    
            long end=System.currentTimeMillis();
            System.out.println("times:"+(end-start)+" rs=>:"+sum);
        }
    }
    

    最后的测试结果表明,在计算数据量较大时,并行流计算方法是最快的,ForkJoin方法次之,普通方法最慢。在计算数据量很小时,ForkJoin方法和并行流计算方法反而比普通方法慢,再次验证了使用ForkJoin的前提是在大数据量的情况下的这个结论。

    异步回调

    以往我们常常会使用callable来进行异步调用,但是callable没有返回值。与之相比,Future可以有返回值,也可以没有返回值。我们可以使用Future的子类completableFuture来测试一下,代码如下:

    package com.wunian.juc.future;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 异步回调  callable没有返回值,使用Future
     *
     */
    public class CompletableFutureDemo {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //没有返回值
           /* CompletableFuture<Void> completableFuture=CompletableFuture.runAsync(()->{
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+"没有返回值!");
            });
            System.out.println("111111");
            completableFuture.get();*/
    
           //有返回值
            CompletableFuture<Integer> completableFuture=CompletableFuture.supplyAsync(()->{
                System.out.println(Thread.currentThread().getName()+"=>supply Async!");
                int i=10/0;
                return  1024;
            });
            System.out.println(completableFuture.whenComplete((t,u)->{
                System.out.println("t=>"+t);//正确结果
                System.out.println("u=>"+u);//错误信息
            }).exceptionally(e->{//失败,如果错误就返回错误的结果
                System.out.println("e:"+e.getMessage());
                return 500;
            }).get());
        }
    }
    

    由以上代码我们可以知道,CompletableFuture不但有返回值,并且当异步调用过程中如果出现异常,连错误信息也会返回,这样就可以很方便的做一些异常处理,显然这点要比callable强大很多。

    相关文章

      网友评论

          本文标题:JUC并发编程(二)

          本文链接:https://www.haomeiwen.com/subject/wsfzrhtx.html