并行执行任务的Fork/Join框架

作者: 小草莓子桑 | 来源:发表于2017-06-22 10:49 被阅读0次

    背书中(引用书上的话):Java7中提供了用于并行执行任务的Fork/Join框架, 可以把任务分成若干个分任务,最终汇总每个分任务的结果得到总任务的结果。这篇我们来看看Fork/Join框架。

    先举个栗子

    一个字符串数组,需要把每个元素中的*字符的索引返回,并求和(自己编了个栗子,没有撒实际意义),用Fork/Join框架来实现,可以定义一个处理字符串数组的总任务,然后把总任务拆分,把数组中每个字符串交给子任务去处理,然后等待子任务执行完毕,汇总结果,并返回:

    package thread.ForkJoin;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    /**
     * @Description: .
     * @Author: ZhaoWeiNan .
     * @CreatedTime: 2017/6/21 .
     * @Version: 1.0 .
     */
    public class StringTask extends RecursiveTask<Integer>{
        //要处理的字符串
        private String dest;
    
        public StringTask(String dest) {
            this.dest = dest;
        }
        //父类RecursiveTask是一个抽象类,所以需要实现compute方法
        @Override
        protected Integer compute() {
            if (dest == null || "".equals(dest))
                return 0;
            //判断字符串中 * 的索引,并返回
            return dest.indexOf("*");
        }
    }
    
    class ArrayTask extends RecursiveTask<Integer>{
        //需要处理的字符串数组
        private String[] array;
    
        public ArrayTask(String[] array) {
            this.array = array;
        }
    
        @Override
        protected Integer compute() {
            if (array == null || array.length < 1)
                return 0;
    
            //申明一个StringTask变量,作为子任务
            StringTask stringTask;
            //定义一个子任务队列,用于任务执行完毕后,获取子任务的执行结果
            List<StringTask> list = new ArrayList<>();
            int sum = 0;
            //把字符串数组的中每一个字符串分给多个StringTask子任务去处理
            for (String s : array){
                //创建一个变量,作为子任务去处理字符串
                stringTask = new StringTask(s);
                //执行子任务
                stringTask.fork();
                //加入子任务队列
                list.add(stringTask);
            }
    
            for (StringTask task : list){
                //等子任务执行完毕,获取子任务执行的结果,并累加
                sum += task.join();
            }
    
            return sum;
        }
    }
    
    class Demo{
    
        public static void main(String[] args){
            //初始化字符串数组
            String[] array = new String[]{"#####*####","##*########","###*#######","#*############"};
            //创建一个总任务,处理字符串数组
            ArrayTask arrayTask = new ArrayTask(array);
            //创建执行任务的线程池ForkJoinPool对象
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            //执行总任务
            ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(arrayTask);
    
            //返回任务的结果
            try {
                System.out.println(forkJoinTask.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    

    代码放到了开源中国:http://git.oschina.net/zhaoweinan/forkjoin,有兴趣的小伙伴可以拿去
    总的来说,Fork/Join框架就是一个用来并行执行任务的框架,可以把一个大任务,分成若干个子任务,等各个子任务执行完毕,可以把他们的执行结果获取到,并汇聚,起到了并行执行任务作用。

    Fork/Join框架的构成

    1.ForkJoinPool

    ForkJoinPool类图

    ForkJoinPool继承了AbstractExecutorService抽象类,AbstractExecutorService实现了ExecutorService接口,由此看来ForkJoinPool也是线程池家族的一员,


    过滤了下方法,只显示了公共方法,并截取了一下

    ForkJoinPool使用invoke、execute、submit用来执行任务。

    2.ForkJoinTask

    ForkJoinTask类图

    ForkJoinTask是Fork/Join框架使用的任务类,实现了Future接口,我们一般使用它的两个子类RecursiveTask和RecursiveAction,


    RecursiveAction类图
    public abstract class RecursiveAction extends ForkJoinTask<Void> {
        private static final long serialVersionUID = 5232453952276485070L;
    

    RecursiveAction适用于没有返回结果的任务,

    RecursiveTask类图
    public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
        private static final long serialVersionUID = 5232453952276485270L;
    

    RecursiveTask适用于有返回值的任务。

    Work-Stealing (工作窃取)

    粗略说一下Work-Stealing,ForkJoinPool具有 Work-Stealing (工作窃取)的能力,什么意思呢?就拿文章开头的栗子来说,把处理字符串数组的大任务,分成了若干个处理字符串的子任务,这些子任务线程执行完毕后,不会闲着,回去执行别的子任务,通俗的来说,Work-Stealing (工作窃取)就是线程从其他队列里面获取任务来执行。

    Work-Stealing的优点

    充分利用了线程,提高了线程并行执行任务的效率,并减少了线程间竞争带来的系统开销。

    Work-Stealing的缺点

    存在竞争的情况,而且占用了更多的系统资源。

    Fork/Join框架原理

    ForkJoinPool分析

    贴一张ForkJoinPool的类图


    大小不好调整,就截取一般吧

    注意箭头所指的两个属性,
    ForkJoinTask<?>数组submissionQueue,存放程序加到ForkJoinPool的任务

        private ForkJoinTask<?>[] submissionQueue;
    

    ForkJoinWorkerThread类继承了Thread,是一个线程类, ForkJoinWorkerThread[] workers就是一个线程数组,负责去执行submissionQueue中的任务

        ForkJoinWorkerThread[] workers;
    
        .....
        public class ForkJoinWorkerThread extends Thread
    

    ForkJoinTask分析

    fork方法

    获取当前ForkJoinWorkerThread线程,调用ForkJoinWorkerThread的pushTask方法执行ForkJoinTask任务

       public final ForkJoinTask<V> fork() {
            //获取当前ForkJoinWorkerThread线程,调用ForkJoinWorkerThread的pushTask方法执行任务
            ((ForkJoinWorkerThread) Thread.currentThread())
                    .pushTask(this);
            return this;
        }
    

    再来看看ForkJoinWorkerThread的pushTask方法:

    final void pushTask(ForkJoinTask<?> t) {
            ForkJoinTask<?>[] q; int s, m;
            if ((q = queue) != null) {    // ignore if queue removed
                long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
                UNSAFE.putOrderedObject(q, u, t);
                queueTop = s + 1;         // or use putOrderedInt
                if ((s -= queueBase) <= 2)
                    //调用线程池ForkJoinPool的signalWork方法
                    pool.signalWork();
                else if (s == m)
                    growQueue();
            }
        }
    

    ForkJoinWorkerThread的pushTask方法把任务ForkJoinTask加到了ForkJoinTask[]任务数组中,并调用了ForkJoinPool线程池的signalWork方法唤醒线程或者创建一个线程去执行任务,粗略的贴一下signalWork的关键代码:

    private void addWorker() {
            Throwable ex = null;
            ForkJoinWorkerThread t = null;
            try {
                t = factory.newThread(this);
            } catch (Throwable e) {
                ex = e;
            }
            if (t == null) {  // null or exceptional factory return
                long c;       // adjust counts
                do {} while (!UNSAFE.compareAndSwapLong
                             (this, ctlOffset, c = ctl,
                              (((c - AC_UNIT) & AC_MASK) |
                               ((c - TC_UNIT) & TC_MASK) |
                               (c & ~(AC_MASK|TC_MASK)))));
                // Propagate exception if originating from an external caller
                if (!tryTerminate(false) && ex != null &&
                    !(Thread.currentThread() instanceof ForkJoinWorkerThread))
                    UNSAFE.throwException(ex);
            }
            else
                t.start();
        }
    

    最终调用到了这里,来执行任务。

    join方法

    从文章开头的栗子来看,join方法会阻塞当前线程,等待获取任务执行的结果

        //百度了这四种状态的含义
        private static final int NORMAL      = -1;   //NORMAL已完成
        private static final int CANCELLED   = -2;  //CANCELLED已取消
        private static final int EXCEPTIONAL = -3;  //EXCEPTIONAL出现异常
        private static final int SIGNAL      =  1;  //SIGNAL信号
    
         public final V join() {
            //先调用doJoin方法判断上面定义的四个状态
            if (doJoin() != NORMAL)
                return reportResult();
            else
                return getRawResult();
        }
    

    join方法先调用doJoin方法判断任务的状态,看看doJoin方法,

       private int doJoin() {
            Thread t; ForkJoinWorkerThread w; int s; boolean completed;
            //获取当前ForkJoinWorkerThread线程
            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
                //如果状态是小于0 也就是 -1,-2,-3 分别代表已完成、已取消、出现异常
                //直接返回状态
                if ((s = status) < 0)
                    return s;
                if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                    //如果状态为1|SIGNAL|信号
                    //执行任务
                    try {
                        completed = exec();
                    } catch (Throwable rex) {
                        //出现异常,把状态改为-3|EXCEPTIONAL|出现异常,返回
                        return setExceptionalCompletion(rex);
                    }
                    if (completed)
                        //执行成功,把状态改为-1|NORMAL|已完成,返回
                        return setCompletion(NORMAL);
                }
                return w.joinTask(this);
            }
            else
                return externalAwaitDone();
        }
    

    doJoin查看任务的状态,如果状态是-1|NORMAL|已完成,-2|CANCELLED|已取消,-3|EXCEPTIONAL|出现异常,证明任务已经执行完毕,返回状态位,如果状态是 1|SIGNAL|信号,则去执行任务,如果执行成功返回-1|NORMAL|已完成,出现异常返回-3|EXCEPTIONAL|出现异常。
    再来看看返回结果的reportResult方法和getRawResult方法:

    private V reportResult() {
            int s; Throwable ex;
            //如果状态为-2|CANCELLED|已取消,抛出一个CancellationException异常
            if ((s = status) == CANCELLED)
                throw new CancellationException();
            if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
                UNSAFE.throwException(ex);
            //调用getRawResult方法返回结果
            return getRawResult();
        }
    

    reportResult方法,先会判断状态,如果状态为-2|CANCELLED|已取消,则抛出一个CancellationException异常,否则调用getRawResult方法返回结果:

        public abstract V getRawResult();
    

    getRawResult方法在ForkJoinTask类是抽象方法,具体实现在他的两子类中。
    RecursiveAction子类:

        public final Void getRawResult() { return null; }
    

    所以说RecursiveAction子类使用于没有返回值的任务。
    RecursiveTask子类:

    public final V getRawResult() {
            return result;
        }
    

    RecursiveTask子类适用于有返回值的任务。

    并行执行任务的Fork/Join框架是说完了。
    欢迎大家来交流,指出文中一些说错的地方,让我加深认识。
    谢谢大家!

    相关文章

      网友评论

        本文标题:并行执行任务的Fork/Join框架

        本文链接:https://www.haomeiwen.com/subject/hrytcxtx.html