美文网首页
Fork/Join框架任务分割

Fork/Join框架任务分割

作者: 离别刀 | 来源:发表于2018-06-08 12:51 被阅读0次
141314.png

1.概念

Fork/Join框架是一个实现了ExecutorService接口的多线程处理器。它可以把一个大的任务划分为若干个小的任务并发执行,充分利用可用的资源,进而提高应用的执行效率。

Fork/Join实现了ExecutorService,所以它的任务也需要放在线程池中执行。它的不同在于它使用了工作窃取算法,空闲的线程可以从满负荷的线程中窃取任务来帮忙执行。(我个人理解的工作窃取大意就是:由于线程池中的每个线程都有一个队列,而且线程间互不影响。那么线程每次都从自己的任务队列的头部获取一个任务出来执行。如果某个时候一个线程的任务队列空了,而其余的线程任务队列中还有任务,那么这个线程就会从其他线程的任务队列中取一个任务出来帮忙执行。就像偷取了其他人的工作一样)。

Fork/Join框架的核心是继承了AbstractExecutorService的ForkJoinPool类,它保证了工作窃取算法和ForkJoinTask的正常工作。

Oracle官方定义原文:
The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.

As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.

The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.

2.基本用法

1).Fork/Join就是要讲一个大的任务分割成若干小的任务,所以第一步是要做任务的分割。实现FrokJoinTask需要一个继承了RecursiveTask或RecursiveAction的基类,并根据自身业务情况将上面的代码放入基类的coupute方法中。RecursiveTask和RecursiveAction都继承了FrokJoinTask,它俩的区别就是RecursiveTask有返回值而RecursiveAction没有。主要重写compute方法,大致如下:

if (这个任务足够小){
执行要做的任务
} else {
将任务分割成两小部分
执行两小部分并等待执行结果
}

2).做好了基类就可以开始调用了,调用时首先我们需要Fork/Join线程池ForkJoinPool,然后向线程池中提交一个ForkJoinTask并得到结果。ForkJoinPool的submit方法的入参是一个ForkJoinTask,返回值也是一个ForkJoinTask,它提供一个get方法可以获取到执行结果。
大致如下:

ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Object> future = pool.submit(forkJoinService);
Object result= future.get();
pool.shutdown();

3.示例代码

import java.util.Date;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class ForkJoinTask extends RecursiveTask<List<Object>> {
    
    //定义最小拆分任务数量
    private static final int HOLD= 2;
    private List<UserInfo> userInfoList;
    private UserService userService;

    ForkJoinTask(List<UserInfo> userInfoList,UserService userService){
        this.userInfoList= userInfoList;
        this.userService= userService;
    }

    //任务拆分算法
    @Override
    protected List<Object> compute() {
        List<Object> results= new ArrayList<>();
        boolean canNotCompute= userInfoList.size()<=HOLD;
        if(canNotCompute){
            for(UserInfo userInfo:userInfoList){
                results.add(userService.getUserInfoByProvince(userInfo.province));
            }
            return results;
        }else {
            int middle = userInfoList.size() / 2;
            List<UserInfo> subList1= userInfoList.subList(0,middle);
            List<UserInfo> subList2= userInfoList.subList(middle,userInfoList.size());
            ForkJoinTask left= new ForkJoinTask(subList1,userService);
            ForkJoinTask right= new ForkJoinTask(subList2,userService);
            left.fork();
            right.fork();
            List<Object> join=left.join();
            join.addAll(right.join());
            return join;
        }
    }

    //测试
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<UserInfo> userInfos= parseUserInfoList();
        UserService userService= new UserService();
        //1.使用forkjoin执行任务
        Date date= new Date();
        ForkJoinPool pool = new ForkJoinPool(20);
        ForkJoinTask task= new ForkJoinTask(userInfos,userService);
        Future<List<Object>> future= pool.submit(task);
        List<Object> list= future.get();
        System.out.println(list);
        System.out.println("fork join spend date: "+(new Date().getTime()-date.getTime()));
        //2.使用同步执行任务
        date= new Date();
        List<Object> results= new ArrayList<>();
        for(UserInfo userInfo:userInfos){
            results.add(userService.getUserInfoByProvince(userInfo.province));
        }
        System.out.println(results);
        System.out.println("synchronize spend date: "+(new Date().getTime()-date.getTime()));
    }

    //模拟数据请求,Sleep100毫秒
    private static class  UserService{
        public String getUserInfoByProvince(String id){
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return id;
        }
    }

    //构造要分组的数据
    private static List<UserInfo> parseUserInfoList(){
        List<UserInfo> list= new ArrayList<>();
        int i=0;
        while (i<100){
            list.add(new UserInfo("ligang"+i,10+i,"0000"+i));
            i++;
        }
        return list;
    }


    private static class UserInfo{
        private String name;
        private int age;
        private String province;

        UserInfo(String name,int age,String province){
            this.age= age;
            this.name= name;
            this.province= province;
        }
    }
}

运行结果:
[00000, 00001, 00002, 00003, 00004, 00005, 00006, 00007, 00008, 00009, 000010, 000011, 000012, 000013, 000014, 000015, 000016, 000017, 000018, 000019, 000020, 000021, 000022, 000023, 000024, 000025, 000026, 000027, 000028, 000029, 000030, 000031, 000032, 000033, 000034, 000035, 000036, 000037, 000038, 000039, 000040, 000041, 000042, 000043, 000044, 000045, 000046, 000047, 000048, 000049, 000050, 000051, 000052, 000053, 000054, 000055, 000056, 000057, 000058, 000059, 000060, 000061, 000062, 000063, 000064, 000065, 000066, 000067, 000068, 000069, 000070, 000071, 000072, 000073, 000074, 000075, 000076, 000077, 000078, 000079, 000080, 000081, 000082, 000083, 000084, 000085, 000086, 000087, 000088, 000089, 000090, 000091, 000092, 000093, 000094, 000095, 000096, 000097, 000098, 000099]
fork join spend date: 785
[00000, 00001, 00002, 00003, 00004, 00005, 00006, 00007, 00008, 00009, 000010, 000011, 000012, 000013, 000014, 000015, 000016, 000017, 000018, 000019, 000020, 000021, 000022, 000023, 000024, 000025, 000026, 000027, 000028, 000029, 000030, 000031, 000032, 000033, 000034, 000035, 000036, 000037, 000038, 000039, 000040, 000041, 000042, 000043, 000044, 000045, 000046, 000047, 000048, 000049, 000050, 000051, 000052, 000053, 000054, 000055, 000056, 000057, 000058, 000059, 000060, 000061, 000062, 000063, 000064, 000065, 000066, 000067, 000068, 000069, 000070, 000071, 000072, 000073, 000074, 000075, 000076, 000077, 000078, 000079, 000080, 000081, 000082, 000083, 000084, 000085, 000086, 000087, 000088, 000089, 000090, 000091, 000092, 000093, 000094, 000095, 000096, 000097, 000098, 000099]
synchronize spend date: 10471

可见使用fork join在性能上提升十几倍。

相关文章

  • Java7新特性8-fork/join框架

    Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割...

  • ForkjoinPool -3

    任务分割与等待:fork和join fork和join是ForkJoinTask的方法,也是整个框架的设计灵魂:f...

  • 并发框架之Fork/Join框架

    今天来介绍一下java并发框架之Fork/Join 初探Fork/Join: 分割任务 使用一个fork类来把大任...

  • Fork/Join框架任务分割

    1.概念 Fork/Join框架是一个实现了ExecutorService接口的多线程处理器。它可以把一个大的任务...

  • Java常用并发包

    一、Fork/Join Java7提供了Fork/Join用于并行执行任务的框架, 可以把一个大任务分割成若干个小...

  • Fork/Join框架

    Fork/Join框架首先要考虑的是分割任务,当任务计算过大时分割成两个子任务分别计算 ForkJoinTask需...

  • Fork/Join框架

    Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干 个小任务(Fork...

  • 带你熟悉Java并行任务框架Fork/Join

    Fork/Join是什么? Fork意思是分叉,Join为合并。Fork/Join是一个将任务分割并行运行,然后将...

  • Java7任务并行执行神器:Fork&Join框架

    Fork/Join是什么? Fork/Join框架是Java7提供的并行执行任务框架,思想是将大任务分解成小任务,...

  • Fork/Join框架解析

    Fork/Join是什么? Fork/Join框架是Java7提供的并行执行任务框架,思想是将大任务分解成小任务,...

网友评论

      本文标题:Fork/Join框架任务分割

      本文链接:https://www.haomeiwen.com/subject/cioosftx.html