美文网首页
Java--8--新特性--串并行流与ForkJoin框架

Java--8--新特性--串并行流与ForkJoin框架

作者: IT人故事会 | 来源:发表于2021-03-08 11:46 被阅读0次

    之前说的AQS,其实AQS的设计很难达到的高度,开发中常用的工具想出来,知道大家有这个需求,开发人员需求可以通过这种方式降低代码量,软件开发思维很重要,抽象的模板方法,模板方法的经典实现(AbstractOwnableSynchronizer),看一些源码,实现原理,还需要了解内部的原理,原理无非就是用到了park,unpark,lock,sync,逻辑其实不复杂,仔细看都可以看懂,关键抽象的思维真的很难理解透,看它的源码就是要理解,原来可以这么取抽象,大家中存在大量的重复的逻辑,这时候需要考虑能否将重复代码进行抽象,重复的代码逻辑,AQS身上类似的逻辑,是否可以抽象成,一个模板的方法,设计模式的提现。增删改查在很多框架里面也进行了抽象,逻辑是固定的,都可以进行抽象的。做成地图的方式学习他的思维很重要。

    (一)ForkJoin

    • ① 介绍

    从JDK1.7开始,Java提供Fork/Join框架用于并行执行任务,它的思想就是讲一个大任务分割成若干小任务,最终汇总每个小任务的结果得到这个大任务的结果。将一个复杂的计算,按照设定的阈值进行分解成多个计算,然后将各个计算结果进行汇总。相应的ForkJoin将复杂的计算当做一个任务。而分解的多个计算则是当做一个子任务。

    • ② 场景

    ForkJoinPool 是 ExecutorService接口的实现,它专为可以递归分解成为小块的工作而设计,for/join框架将任务分配给线程池中的工作线程,充分利用多处理器的优势,提高程序性能。

    使用fork join 的第一步是编写执行一部分工作的代码。将代码包装在ForkJoinTask子类中,通常是RecursiveTask(可以返回结果) 或 RecursiveAction。

    • ③ 实现思路
    1. 每个worker线程都维护一个任务队列,即ForkJoinWorkerThread中的任务队列。
    2. 任务队列是双向队列,这样可以同时实现LIFO和FIFO(First in, First out.先进先出。Last in, First out.后进先出)
    3. 子任务会被加入到原先任务所在worker线程的任务队列。
    4. Worker线程用LIFO的方法取出任务,后进队列的任务先取出来(子任务总是后加入队列,但是需要先执行)
    5. 当任务队列为空,会随机从其他的worker的队列中拿走一个任务执行(工作窃取:steal work)
    6. 如果一个worker线程遇到了join操作,而这个时候正在处理其他任务,会等到这个任务结束。否则直接返回。
    7. 如果一个worker线程窃取任务失败,它会用yield或者sleep之类的方法休息一会,再尝试(如果所有线程都是空闲状态,即没有任务运行,那么该线程也会进入阻塞状态等新任务的到来)
    • ⑤ 适用

    适用尽可能少的线程池 - 在大多数情况下,最好的决定是为了每个应用程序或系统使用一个线程池,如果不需要特定调整,请使用默认的公共线程池,使用合理的阈值将ForkJoinTask拆分为子任务,避免ForkJoinTask中出现任何阻塞(调用接口,调用数据库)。

    • ⑥ 源码
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    // 分而治之的理念
    public class ForkJoinDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 默认情况下,并行线程数量等于可用处理器的数量
            // ForkJoinPool与其他类型的ExecutorService的区别主要在于它使用了工作窃取:
            // 池中的所有线程都试图查找和执行提交给池的任务和/或其他活动任务创建的任务
            // (如果不存在工作,则最终阻塞等待工作)。
            ForkJoinPool forkJoinPool = new ForkJoinPool();
    
            RecursiveTask<String> recursiveTask = new RecursiveTask<String>() {
                @Override
                protected String compute() {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(toString() + "" + Thread.currentThread());
                    ForkJoinTask<String> newTask = this.fork();
                    newTask.join();
                    System.out.println("执行结束");
                    return "";
                }
            };
    
            ForkJoinTask<String> submit = forkJoinPool.submit(recursiveTask);
            ForkJoinTask<String> submitx = forkJoinPool.submit(recursiveTask);
            System.out.println(submit.get());
    
            recursiveTask.join();
        }
    }
    
    

    PS:工作窃取带来的性能提升偏理论,API的复杂性较高,实际研发中可控性来说不如其他API。一般使用最多的就是做数据处理。接口和数据库尽量不要使用,线程如何堵塞了就尴尬了。吐槽下,从JDK1.8以后,JDK的源码越来越难度了,变量都是一个字母。

    相关文章

      网友评论

          本文标题:Java--8--新特性--串并行流与ForkJoin框架

          本文链接:https://www.haomeiwen.com/subject/gvdibctx.html