美文网首页
Java并发工具类

Java并发工具类

作者: 一生逍遥一生 | 来源:发表于2020-08-05 09:16 被阅读0次

    Lock和Condition

    Java SDK并发包通过Lock和Condition两个接口来实现管程,其中Lock用于解决互斥问题,Condition用于解决同步问题。

    如何设计一个互斥锁:能够响应中断、支持超时、非阻塞地获取。

    调用方是否需要等待结果,如果需要等待结果,就是同步;如果不需要等待结果,就是异步。

    ReadWriteLock实现一个缓存

    针对读多写少的场景,Java SDK提供了读写锁--ReadWriteLock。

    读写锁需要遵守三条基本原则:

    • 允许多个线程同时读共享变量
    • 只允许一个线程写共享变量
    • 如果一个写线程正在执行写操作,此时禁止读线程读共享变量

    读写锁与互斥锁的一个重要区别是读写锁允许多个线程同时读共享变量,而互斥锁不允许,这是读写锁在读多邪少的场景下优于互斥锁的关键。

    StampedLock

    ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。StampedLock支持三种模式:写锁、悲观读锁、乐观锁。

    CountDownLatch和CyclicBarrier让多线程步调一致

    CountDownLatch主要用来解决一个线程等待多个线程的场景;CyclicBarrier的计数器是可以循环利用,
    CyclicBarrier是一组线程之间互相等待。

    并发容器

    List

    CopyOnWrite就是写的时候会将共享变量新复制一份出来,这样做的好处就是读操作完全无锁。
    CopyOnWriteArrayList内部维护了一个数组,成员变量array就指向这个内部数组,所有的读操作都是基于array进行的。
    在遍历的时候,增加元素:CopyOnWriteArrayList会将array复制一份,然后在新复制处理的数组上执行增加元素的操作,执行完之后在将
    array指向这个新的数组(写操作基于新的array)。
    CopyOnWriteArrayList仅适用于写操作非常少的场景,CopyOnWriteArrayList迭代器只读的,不支持增删改,
    迭代器的遍历仅仅是一个快照。

    Map

    ConcurrentHashMap的key是无序的,而ConcurrentSkipListMap的key是有序的。他们的key和value都不能为空,因为并发时不知道这个key是否油🈯️。

    集合类 key value 是否线程安全
    HashMap 允许为null 允许为null
    TreeMap 不允许为null 允许为null
    HashTable 不允许为null 不允许为null
    ConcurrentHashMap 不允许为null 不允许为null
    ConcurrentSkipListMap 不允许为null 不允许为null

    原子类

    原子话的基本数据类型、原子化的对象应用类型、原子化数组、原子化数组、原子对象属性更新器和原子化的累加器。

    原子化的对象应用类型

    利用AtomicStampedReference、AtomicMarkableReference可以实现对象引用的原子化更新,增加一个版本号维度就可以了(解决ABA的问题)。

    原子化数组

    AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray利用原子类,可以原子化地更新数组里面的每一个元素。

    原子化对象属性更新器

    利用AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater可以原子化地更新对象的属性,
    这三个方法都是利用反射机制实现的,对象属性必须是volatile类型的,只有这样才能保证可见性。

    原子化的累加器

    DoubleAccumulator、DoubleAddr、LongAccumulator、LongAddr仅仅用来执行累加操作,相比原子化的基本数据类型,速度更快,但是不支持compareAndSet()方法。

    Executor与线程池:如何创建正确的线程池

    设置最大线程数、有界阻塞队列、线程命名方式。

    ThreadPoolExecutor submit方法之间的区别

    1.提交Runnable任务submit(Runnable task):Runnable接口的run方法是没有返回值的,返回的Future仅可以用来断言任务已经结束了,类似于Thread.join()。

    2.提交Callable任务submit(Callable<T> task):调用call方法,并且这个方法有返回值,通过get来获取。

    CompletableFuture

    异步化:利用多线程优化性能。
    CompletableFuture对象的方法:runAsync(Runnable r)、supplyAsync(Supplier<U> supplier),它们之间的区别是:
    Runnable接口的run方法没有返回值,Supplier接口的get方法是有返回值。
    CompletableFuture默认使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU的核数(或者通过
    JVM option-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置线程池的线程数)。
    根据不同的业务类型创建不同的线程池,以避免互相干扰。
    对比 Future<V>,CompletableFuture 优点在于:不需要手工分配线程,JDK 自动分配;代码语义清晰;异步任务链式调用;支持编排异步任务。

    CompletionStage

    CompletionStage可以清晰的描述任务之间的时序关系。

    参数总结

    函数式接口名称 方法名称 参数 返回值
    Runnable run 无参数 无返回值
    Function apply 1个参数 有返回值
    Consume accept 1个参数 无返回值
    Supplier get 没有参数 有返回值
    BiConsumer accept 2个参数 无返回值

    划分

    在以上三类横向划分方法的基础上,又可以按照以下的规则对这些接口方法进行纵向的划分:

    1.多阶段的依赖:一个阶段的执行可以由一个阶段的完成触发,或者两个阶段的同时完成,或者两个阶段中的任何一个完成。

    方法前缀为then的方法安排了对单个阶段的依赖。
    那些由完成两个阶段而触发的,可以结合他们的结果或产生的影响,这一类方法带有combine或者both字样。
    那些由两个阶段中任意一个完成触发的,不能保证哪个的结果或效果用于相关阶段的计算,这类方法带有either字样。

    2.按执行的方式:
    阶段之间的依赖关系控制计算的触发,但不保证任何特定的顺序。因为一个阶段的执行可以采用以下三种方式之一安排:
    默认的执行方式。所有方法名没有以async后缀的方法都按这种默认执行方式执行。
    默认的异步执行。所有方法名以async为后缀,但没有Executor参数的方法都属于此类。
    自定义执行方式。所有方法名以async为后缀,并且具有Executor参数的方法都属于此类。
    默认的执行方式(包括默认的异步执行)的执行属性由CompletionStage的实现类指定例如CompletableFuture,
    而自定义的执行方式的执行属性由传入的Executor指定,这可能具有任意的执行属性,甚至可能不支持并发执行,但还是被安排异步执行。

    3.按上一个阶段的完成状态:无论触发阶段是正常完成还是异常完成,都有两种形式的方法支持处理。

    不论上一个阶段是正常还是异常完成:
    whenComplete方法可以在上一个阶段不论以何种方式完成的处理,但它是一个消费型接口,即不对整个阶段的结果产生影响。
    handle前缀的方法也可以在上一个阶段不论以何种方式完成的处理,它是一个产出型(或函数型)接口,既可以由上一个阶段的异常产出新结果,
    也可以其正常结果产出新结果,使该结果可以由其他相关阶段继续进一步处理。上一个阶段是异常完成的时候执行:
    exceptionally方法可以在上一个阶段以异常完成时进行处理,它可以根据上一个阶段的异常产出新的结果,
    使该结果可以由其他相关阶段继续进一步处理。

    根据阶段正常完成结果的产出型(或者叫函数型)

    thenApply:依赖单个阶段(必须要保证前面的任务完成);thenCombine:依赖两个阶段都完成(前面两个任务都必须要完成);
    applyToEither:依赖两个阶段中的任何一个完成(前面的任务有一个完成就可以)。参数为Function
    有参数,有返回值

    根据阶段正常完成结果的消费型

    thenAccept:依赖单个阶段(必须要保证前面的任务完成);thenAcceptBoth:依赖两个阶段都完成(前面两个任务都必须要完成);
    acceptEither:依赖两个阶段中的任何一个完成(前面的任务有一个完成就可以);参数为Consumer
    有参数,没有返回值。

    只要求依赖的阶段正常完成的不消耗也不产出型:

    thenRun:依赖单个阶段(必须要保证前面的任务完成);runAfterBoth:依赖两个阶段都完成(前面两个任务都必须要完成);
    runAfterEither:依赖两个阶段中的任何一个完成(前面的任务有一个完成就可以);参数为Runnable。
    无参数,无返回值。

    根据正常完成的阶段本身而不是其结果的产出型

    thenCompose参数为Function,与thenCombine类似。

    不论阶段正常还是异常完成的消耗型

    whenComplete参数为BiConsume,whenComplete则不论依赖的上一个阶段是正常完成还是异常完成都不会影响它的执行,
    但它是一个消耗型接口,即不会对阶段的原来结果产生影响。
    thenCombine与whenComplete组合使用。
    因为thenCombine同时依赖两个阶段的正常完成,此时第一个阶段中抛出了异常,所以不会执行thenCombine指定的函数。

    不论阶段正常还是异常完成的产出型

    whenComplete是对不论依赖阶段正常完成还是异常完成时的消耗或者消费,即不会改变阶段的现状,而handle前缀的方法则是对应的产出型方法,
    即可以对正常完成的结果进行转换,也可以对异常完成的进行补偿一个结果,即可以改变阶段的现状。

    异常完成的产出型

    CompletionStage还提供了一个仅当上一个阶段异常完成时的处理,并且可以修改阶段的结果。

    实现该接口不同实现之间互操作的类型转换方法

    返回一个与此阶段保持相同完成属性的CompletableFuture实例。如果此阶段已经是一个CompletableFuture,那么直接返回该阶段本身,
    否则此方法的调用可能等效于thenApply(x -> x),但返回一个类型为CompletableFuture的实例。
    不选择实现该互操作性的CompletionStage实现,可能会抛出UnsupportedOperationException异常。

    CompletionService:批量执行异步任务

    CompletionService适用于知道任务个数,按照任务完成的顺序获取结果。

    public class TestCompletionService {
        public static void main(String[] args) {
            ExecutorService EXECUTOR = Executors.newFixedThreadPool(5);
            try {
                int taskCount = 10;
                // 结果集
                List<Integer> list = new ArrayList<Integer>();
                List<Future<Integer>> futureList = new ArrayList<Future<Integer>>();
                // 1.定义CompletionService
                CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(EXECUTOR, new ArrayBlockingQueue<Future<Integer>>(200));
                // 2.添加任务
                for (int i = 0; i < taskCount; i++) {
                    Future<Integer> future = completionService.submit(new Task(i + 1));
                    futureList.add(future);
                }
                // 3.获取结果
                for (int i = 0; i < taskCount; i++) {
                    Integer result = completionService.take().get();
                    list.add(result);
                }
                System.out.println("list=" + list);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                EXECUTOR.shutdown();
            }
        }
        static class Task implements Callable<Integer> {
            Integer i;
            public Task(Integer i) {
                super();
                this.i = i;
            }
            @Override
            public Integer call() throws Exception {
                if (i == 5) {
                    Thread.sleep(5000);
                } else {
                    Thread.sleep(1000);
                }
                System.out.println("Thread:" + Thread.currentThread().getName() + "task=" + i + ",task finish!");
                return i;
            }
        }
    }
    

    Fork/Join

    Fork操作将会启动一个新的并行fork/join子任务。Join操作会一直等待直到所有的子任务都结束。Fork/Join算法,
    如同其他分治算法一样,总是会递归的、反复的划分子任务,直到这些子任务可以用足够简单的、短小的顺序方法来执行。
    RecursiveAction与RecursiveTask区别在与RecursiveTask是有返回结果而RecursiveAction是没有返回结果。

    使用Fork/Join需要注意的问题:

    • 多线程带来的数据共享问题。使用JDK自带的开发容器。
    • 考虑OOM、stackoverflow的问题。
    • 如果数据量小的时候,不要使用fork/join。

    参考文献

    搞定 CompletableFuture,并发异步编程和编写串行程序还有什么区别?你们要的多图长文
    Java并发包之阶段执行之CompletionStage接口
    ForkJoin分析

    相关文章

      网友评论

          本文标题:Java并发工具类

          本文链接:https://www.haomeiwen.com/subject/neknrktx.html