美文网首页
线程的并发工具类 --- Fork-Join

线程的并发工具类 --- Fork-Join

作者: 咕噜咕噜_f443 | 来源:发表于2020-03-14 17:48 被阅读0次

    Fork-Join 


    java下多线程的开发可以我们自己启用多线程,线程池,还可以使用forkjoin, forkjoin 可以让我们不去了解诸如 Thread,Runnable 等相关的知识,只要遵循 forkjoin 的开发模式,就可以写出很好的多线程并发程序,

    分而治之同时 forkjoin 在处理某一类问题时非常的有用,哪一类问题?

    分而治之的问题。

    十大计算机经典算法:快速排序、堆排序、归并排序、二分查找、线性查找、 深度优先、广度优先、Dijkstra、动态规划、朴素贝叶斯分类,

    有几个属于分而治之?

    3 个,快速排序、归并排序、二分查找,还有大数据中 M/R 都是。 分治法的设计思想是:将一个难以直接解决的大问题,分割成一些规模较小 的相同问题,以便各个击破,分而治之。

    分治策略是:对于一个规模为 n 的问题,若该问题可以容易地解决(比如说 规模 n 较小)则直接解决,否则将其分解为 k 个规模较小的子问题,这些子问题 互相独立且与原问题形式相同(子问题相互之间有联系就会变为动态规范算法), 递归地解这些子问题,然后将各子问题的解合并得到原问题的解。这种算法设计 策略叫做分治法。

    归并排序( 降 序 )示例

    1.先将数组划分为左右两个子表

    2.然后继续左右两个子表拆分

    3.对最后的拆分的子表,两两进行排序

    4.对有序的子表进行排序和比较合并

    5.对合并后的子表继续比较合并

    Fork-Join原理

    工作密取

    即当前线程的 Task 已经全被执行完毕,则自动取到其他线程的 Task 池中取 出 Task 继续执行。

     ForkJoinPool 中维护着多个线程(一般为 CPU 核数)在不断地执行 Task,每 个线程除了执行自己职务内的 Task 之外,还会根据自己工作线程的闲置情况去 获取其他繁忙的工作线程的 Task,如此一来就能能够减少线程阻塞或是闲置的时 间,提高 CPU 利用率。

    Fork-Join实战

    Fork-Join使用的标准范式

    我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务 中执行 fork 和 join 的操作机制,通常我们不直接继承 ForkjoinTask 类,只需要直 接继承其子类。 

    1.RecursiveAction,用于没有返回结果的任务 

    2.RecursiveTask,用于有返回值的任务 

    task 要通过 ForkJoinPool 来执行,使用 submit 或 invoke/execute  提交,两者的区 别是:invoke 是同步执行,调用之后需要等待任务完成,才能执行后面的代码; submit/execute 是异步执行。 

    join()和 get 方法当任务完成的时候返回计算结果。

    join()与get()区别?

    方法join()与get()虽然都能取得计算后的结果值,但它们之间还是在出现异常时有处理上的区别

    使用get()方法执行任务时,当子任务出现异常时可以在main主线程中进行捕获。方法join()遇到异常直接抛出

    在我们自己实现的 compute 方法里,首先需要判断任务是否足够小,如果 足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用 invokeAll 方法时,又会进入 compute 方法,看看当前子任务是否需要继 续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用 join 方法会等待子任务执行完并得到其结果。

    Fork-Join的同步用法和异步用法

    public class MakeArray {

    //数组长度

        public static final int ARRAY_LENGTH =40000000;

    public final static int THRESHOLD =47;

    public static int[] makeArray() {

    //new一个随机数发生器

            Random r =new Random();

    int[] result =new int[ARRAY_LENGTH];

    for (int i =0; i

    //用随机数填充数组

                result[i] = r.nextInt(ARRAY_LENGTH *3);

    }

    return result;

    }

    }

    ForkJoin执行累加

    public class SumArray {

    private static class SumTaskextends RecursiveTask{

    /*阈值*/

            private final static int THRESHOLD = MakeArray.ARRAY_LENGTH/10;

    private int[]src;

    private int fromIndex;

    private int toIndex;

    public SumTask(int[] src,int fromIndex,int toIndex) {

    this.src = src;

    this.fromIndex = fromIndex;

    this.toIndex = toIndex;

    }

    @Override

            protected Integer compute() {

    /*任务的大小是否合适*/

                if (toIndex -fromIndex

    //                System.out.println(" from index = "+fromIndex

    //                        +" toIndex="+toIndex);

                    int count =0;

    for(int i=fromIndex;i<=toIndex;i++){

    SleepTools.ms(1);

    count = count +src[i];

    }

    return count;

    }else{

    //fromIndex....mid.....toIndex

                    int mid = (fromIndex+toIndex)/2;

    SumTask left =new SumTask(src,fromIndex,mid);

    SumTask right =new SumTask(src,mid+1,toIndex);

    invokeAll(left,right);

    return left.join()+right.join();

    }

    }

    }

    public static void main(String[] args) {

    int[] src = MakeArray.makeArray();

    /*new出池的实例*/

            ForkJoinPool pool =new ForkJoinPool();

    /*new出Task的实例*/

            SumTask innerFind =new SumTask(src,0,src.length-1);

    long start = System.currentTimeMillis();

    pool.invoke(innerFind);

    //System.out.println("Task is Running.....");

            System.out.println("The count is "+innerFind.join()

    +" spend time:"+(System.currentTimeMillis()-start)+"ms");

    }

    }

    遍历指定目录(含子目录)找寻指定类型文件

    public class FindDirsFilesextends RecursiveAction {

    private Filepath;

    public FindDirsFiles(File path) {

    this.path = path;

    }

    @Override

        protected void compute() {

    List subTasks =new ArrayList<>();

    File[] files =path.listFiles();

    if (files!=null){

    for (File file : files) {

    if (file.isDirectory()) {

    // 对每个子目录都新建一个子任务。

                        subTasks.add(new FindDirsFiles(file));

    }else {

    // 遇到文件,检查。

                        if (file.getAbsolutePath().endsWith("txt")){

    System.out.println("文件:" + file.getAbsolutePath());

    }

    }

    }

    if (!subTasks.isEmpty()) {

    // 在当前的 ForkJoinPool 上调度所有的子任务。

                    for (FindDirsFiles subTask :invokeAll(subTasks)) {

    subTask.join();

    }

    }

    }

    }

    public static void main(String [] args){

    try {

    // 用一个 ForkJoinPool 实例调度总任务

                ForkJoinPool pool =new ForkJoinPool();

    FindDirsFiles task =new FindDirsFiles(new File("F:/"));

    /*异步提交*/

                pool.execute(task);

    /*主线程做自己的业务工作*/

                System.out.println("Task is Running......");

    Thread.sleep(1);

    int otherWork =0;

    for(int i=0;i<100;i++){

    otherWork = otherWork+i;

    }

    System.out.println("Main Thread done sth......,otherWork="

                        +otherWork);

    task.join();//阻塞方法

                System.out.println("Task end");

    }catch (Exception e) {

    // TODO Auto-generated catch block

                e.printStackTrace();

    }

    }

    }

    以上内容仅代表个人学习的见解,希望可以为大家提供一个学习参考

    相关文章

      网友评论

          本文标题:线程的并发工具类 --- Fork-Join

          本文链接:https://www.haomeiwen.com/subject/tuyvshtx.html