美文网首页
java线程池(七):ForkJoinPool源码分析之三(Fo

java线程池(七):ForkJoinPool源码分析之三(Fo

作者: 冬天里的懒喵 | 来源:发表于2020-10-21 20:10 被阅读0次

    [toc]

    1.类结构及其成员变量

    1.1 类结构和注释

    类前面的注释部分如下:
    ForkJoinTask是在ForkJoinPool中运行task的基础抽象类,ForkJoinTask是类似于线程的实体,其权重比普通线程要轻得多。大量的task或者task的子类可能由ForkJoinPool中实际的线程来托管,但以某些使用限制为代价。
    一个main的ForkJoinTask被提交给ForkJoinPool的时候,如果尚未参与ForkJoin计算,则通过ForkJoinPool#commonPool()中fork或者invoke方法开始。一旦启动,通过将依次启动其他子任务。如此类的名称所示,许多使用了ForkJoinTask的程序仅采用fork或者诸如jivokeAll。但是,此类还提供了许多其他可以在高级方法中使用的方法,以及允许支持xin形式的fork/join处理的扩展机制。
    ForkJoinTask是Future的轻量级形式,ForkJoinTask的效率源于一组限制条件,这些限制只能部分静态的强制执行,反映出它们的主要用途是作为计算纯函数或对纯函数隔离的对象进行的操作的计算任务。主要协调机制是fork,用于安排异步执行和join,在计算任务结果之前不会执行。理想情况下,计算应避免使用sync方法块,并应用除加入其他任务或使用被宣传为fork/join的调度配合使用的诸如Phasers之类的同步器之外的其他最小化同步阻塞。可细分的任务也不应执行阻塞的I/O,并且理想情况下应访问与其他正在运行的任务访问的变量完全独立的变量。不允许抛出诸如IOExeption之类的检查异常。从而松散的实现了这些准则,但是,计算可能任会遇到未经检查的异常,这些异常会被尝试加入它的调用者重新抛出。这些异常可能还包括源自内部资源耗尽,例如无法分配任务队列 RejectedExecutionException。重新引发的异常的行为与常规异常相同,但是在可能的情况下,包含启动计算的线程以及实际遇到的线程的堆栈跟踪(例如,使用ex.printStackTrace()显示)异常;最少只有后者。
    可以定义和使用可能阻塞的ForkJoinTasks,但是这样还需要三点考虑:
    1.如果有other个任务,则应该完成少数几个依赖于在外部同步或者I/O,从未加入的事件样例的异常任务,例如,子类为CountedCompleter的哪些子任务通常属于此类。
    2.为了最大程度的减少资源的影响,任务应该很小。理想情况下,仅执行组织操作。
    3.除非使用ForkJoinPoolManagedBlocker API,或者已知可能被阻止的任务数小于pool的ForkJoinPool的getParallelism级别,否则pool无法保证有足够的线程可用来确保进度的良好表现。
    等待完成和提取任务结果的主要方法是join,但是有几种变体,get方法支持中断或定时等待完成,并使用Future约定,方法invoke在语义上等效于fork+join,当时始终尝试在当前线程中开始执行,这些方法的quiet形式不会提取结果或报告异常,当执行一组任务的时候,这些选项可能有用,并且你需要将结果或异常的处理延时到所有任务为止。方法invokeAll有多个版本,执行并调用的最常见的形式:分派一组任务将它们全部加入。
    在最典型的用法中,fork-join对的作用类似于调用fork,并从并行递归中返回join,与其他形式的递归调用一样,返回应从最里面开始执行。例如:

    a.fork();
    b.fork();
    b.join();
    a.join();
    

    可能比在b之前加入a的效率更高。
    可以从多个详细级别查询任务的执行状态,如任务以任何方式完成,则isDone为true。包括取消任务而未执行的情况。如果没有完成任务,则isCompletedNormally返回true。
    如果任务已取消,isCancelled返回true。这种情况下getException返回java.util.concurrent.CancellationException和isCompletedAbnormally。如果任务是已取消的或者遇到异常,在这种情况下,getException将返回所遇到的异常或者java.util.concurrent.CancellationException。
    ForkJoinTask类通常不直接子类化,相反,你可以将一个支持特定样势的fork/join处理的抽象类做为子类,对于大多数不返回结果的计算,通常使用RecursiveAction。对于哪些返回结果的计算,则通常使用RecursiveTask。并使用CountedCompleter 。其中已完成的操作会触发其他操作,通常一个具体的ForkJoinTask子类申明在构造函数中建立的包含其参数的字段,然后定义了一个compute方法。该方法以某种方式使用此基类提供的控制方法。
    方法join及其变体仅在完成依赖项是非循环的时候才适用,也就是说,并行计算可以描述为有向无环图DAG。否则,执行可能会遇到死锁,因为任务的周期性的互相等待。但是,此框架支持其他方法和技术,如Phaser的helpQuiesce和complete来构造针对非自定义问题的自定义子类,静态结构为DAG,为了支持此方法,可以适用setForkJoinTaskTag或者compareAndSetForkJoinTaskTag或者compareAndSetForkJoinTaskTag。适用short值自动标记ForkJoinTask并使用getForkJoinTaskTag进行检查。ForkJoinTask实现不出任何目的的使用这些protected的方法或标记,但是可以在构造函数中专门的子类中使用它们,例如,并行图遍历可以使用提供的方法来避免重访问已处理的节点/任务。用于标记的方法名称很大一部分是为了鼓励定义反映其使用方式的方法。
    大多数基本的支持的方法都是final,以防止覆盖与底层轻量级任务计划框架固有的联系的实现。创新的fork/join处理基本样式的开发人员应最小的实现protected的exec方法、setRawResult和getRawResult方法。同时还引入了一种抽象的计算方法,该方法可以在其子类中实现,可能依赖于此类提供的其他protected方法。
    ForkJoinTasks应该执行相对少量的计算,通常应该通过递归分解将大型任务分解为较小的子类,做为一个非常粗略的经验法则,任务执行100个以上且少于10000个基本计算步骤,并应避免无限循环。如果任务太大,则并行度无法提高吞吐量,如果太小,则内存和内部任务维护开销可能会使处理不堪重负。
    此类为Runnable和Callable提供了Adapt方法。这些方法将在ForkJoinTasks执行与其他类型的任务混合使用的时候可能会有用。当所有任务都具有这种形式的时候,请考虑使用asyncMode 中构造的池。
    ForkJoinTasks是可序列化的,使得它们可以在远程执行的框架中扩展使用,仅在执行前或者之后而不是之前期间序列化任务是明智的。执行本身不依赖于序列化。

    类结构如下:


    类结构
    public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
    
        
    }
    

    在代码中,任有不少注释:
    有关通用的实现概述,请参见类ForkJoinPool的内部文档。在对ForkJoinWorkerThread和ForkJoinPool中的方法进行中继的过程中,ForkJoinTasks主要负责维护其“状态”字段。
    此类的大致方法分为:

    • 1.基于状态维护
    • 2.执行和等待完成
    • 3.用户级方法(另外报告结果)

    有时很难看到,因为此文件以在Javadocs中良好流动的方式对导出的方法进行排序。
    状态字段将运行控制状态位打包为单个int,以最大程度的减少占用空间并确保原子性,(通过cas)。Status最初的状态为0,并采用非负值,直到完成为止。此后,与DONE_MASK一起,状态的值保持为NORMAL,CANCELLED或者EXCEPTIONAL,其他线程正在等待阻塞任务,将SIGNAL位置设置为1。设置了SIGNAL的被窃取的任务完成后,将通过notifyAll唤醒任何等待的线程。 即使出于某些目的不是最优的,我们还是使用基本的内置的wait/notify来利用JVM中的Monitor膨胀。否则我们将需要模拟JVM以避免增加每个任务的记录开销。我们希望这些Monitor是fat的,即不需要使用偏向锁和细粒度的锁,因此要使用一些奇怪的编码习惯来避免它们,主要是安排每个同步块执行一个wait,notifyAll,或者两者都执行。
    这些控制位仅占用状态字段的上半部分中的一部分,低位用于用户定义的标签。

    1.2 变量及常量

    在ForkJoinTask中,主要的变量就一个,为status。

    /** The run status of this task */
    volatile int status; // accessed directly by pool
    

    这是通过colatile修饰的int类型,用以标识每个task的运行状态。
    这些状态主要是一下这些常量。

     and workers
    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
    static final int NORMAL      = 0xf0000000;  // must be negative
    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
    static final int SMASK       = 0x0000ffff;  // short bits for tags
    

    各常量用二进制表述如下图:


    image.png

    2.实现方法

    2.1 抽象方法

    ForkJoinTask是ForkJoinPool中提交任务的主要实现类,需要注意的是,这个类是个抽象类。可扩展的方法如下:

    
    /**
     * Returns the result that would be returned by {@link #join}, even
     * if this task completed abnormally, or {@code null} if this task
     * is not known to have been completed.  This method is designed
     * to aid debugging, as well as to support extensions. Its use in
     * any other context is discouraged.
     *
     * @return the result, or {@code null} if not completed
     */
    public abstract V getRawResult();
    
    /**
     * Forces the given value to be returned as a result.  This method
     * is designed to support extensions, and should not in general be
     * called otherwise.
     *
     * @param value the value
     */
    protected abstract void setRawResult(V value);
    
    /**
     * Immediately performs the base action of this task and returns
     * true if, upon return from this method, this task is guaranteed
     * to have completed normally. This method may return false
     * otherwise, to indicate that this task is not necessarily
     * complete (or is not known to be complete), for example in
     * asynchronous actions that require explicit invocations of
     * completion methods. This method may also throw an (unchecked)
     * exception to indicate abnormal exit. This method is designed to
     * support extensions, and should not in general be called
     * otherwise.
     *
     * @return {@code true} if this task is known to have completed normally
     */
    protected abstract boolean exec();
    

    这些抽象方法,总结如下表:
    |方法|说明|
    |getRawResult()|此方法将返回join的返回结果,即使此任务出现异常,或者返回null(再该任务尚未完成的情况下返回),此方法旨在帮助调试以及支持扩展,不建议在其他任何上下文中使用|
    |setRawResult()|强制将给定值做为结果返回,此方法旨在支持扩展,通常不应以其他的方式调用。|
    |exec()|立即执行此任务的基本操作,如果从该方法返回后,保证此任务已正常完成,则返回true,否则,此方法可能返回false,以表示此任务不一定完成,或不知道是完成的,例如在需要显示调用完成方法的异步操作中。此方法还可能引发未经检查的异常以指示异常退出。此方法旨在支持扩展,通常不应以其他方式调用。|

    实际上,这些抽象方法都是提供给子类进行实现的。如RecursiveTask和RecursiveAction就实现了这些方法,之后这两个类将compute方法提供给子类继续扩展。而上述的抽象方法,则不会提供给子类扩展。我们在使用的过程中,只需要直到这三个方法的大致含义即可。我们使用RecursiveTask和RecursiveAction这两个类只需要再实现compute方法。

    2.2 实现方法

    ForkJoinTask是Future接口的实现类,因此,实现Future的方法,实际上就是当外部将ForkJoinTask当作Future的时候需要调用的方法。

    2.2.1 get()

    get方法,如果使用这个方法的话,其意义就是等待计算完成,然后返回结果。

    public final V get() throws InterruptedException, ExecutionException {
        //判断当前线程是否是工作线程
        int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
           //如果是工作线程,执行doJoin方法
            doJoin() :
            //反之,则调用外部中断等待方法,将外部调用的线程wait。
            externalInterruptibleAwaitDone();
        Throwable ex;
        //如果返回的s结果,为取消状态,抛出异常
        if ((s &= DONE_MASK) == CANCELLED)
            throw new CancellationException();
        //如果s为异常状态,则调用getThrowableException从异常表中获取一个异常,再new一个ExecutionException包裹之后抛出。 
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
            throw new ExecutionException(ex);
        //默认返回结果是getRawResult。
        return getRawResult();
    }
    

    从这个方法中我们可以看到,实际上,外部线程如果调用了get方法,则会调用externalInterruptibleAwaitDone。这个方法会根据当前状态,来将当前线程wait,或者notifyAll。后面将详细介绍。

    2.2.2 get(long timeout, TimeUnit unit)

    根据传入的等待时间进行等待,之后再获取结果。

    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread is not a
     * member of a ForkJoinPool and was interrupted while waiting
     * @throws TimeoutException if the wait timed out
     */
    public final V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        int s;
        //时间通过unit的单位转换之后按纳秒获取
        long nanos = unit.toNanos(timeout);
        //如果线程被中断,则返回中断异常
        if (Thread.interrupted())
            throw new InterruptedException();
        //如果status大于0,且纳秒数也大于0
        if ((s = status) >= 0 && nanos > 0L) {
            //计算时间d和deadline
            long d = System.nanoTime() + nanos;
            long deadline = (d == 0L) ? 1L : d; // avoid 0
            Thread t = Thread.currentThread();
            //如果当前线程是工作线程,调用pool提供的awaitJoin方法。
            if (t instanceof ForkJoinWorkerThread) {
                ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
                s = wt.pool.awaitJoin(wt.workQueue, this, deadline);
            }
            //反之,再判断当前task是否是子类CountedCompleter,则采用externalHelpComplete
            else if ((s = ((this instanceof CountedCompleter) ?
                           ForkJoinPool.common.externalHelpComplete(
                               (CountedCompleter<?>)this, 0) :
                          //tryExternalUnpush方法 ForkJoinPool.common.tryExternalUnpush(this) ?
                           doExec() : 0)) >= 0) {
                long ns, ms; // measure in nanosecs, but wait in millisecs
                //死循环的方式确保wait成功
                while ((s = status) >= 0 &&
                       (ns = deadline - System.nanoTime()) > 0L) {
                    if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) > 0L &&
                        U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                        synchronized (this) {
                           //如果status大于0则wait,反之则botifyAll
                            if (status >= 0)
                                wait(ms); // OK to throw InterruptedException
                            else
                                notifyAll();
                        }
                    }
                }
            }
        }
        //如果状态大于0 s更新status的值
        if (s >= 0)
            s = status;
        //如果s超出范围,则抛出异常,进行异常处理
        if ((s &= DONE_MASK) != NORMAL) {
            Throwable ex;
            if (s == CANCELLED)
                throw new CancellationException();
            if (s != EXCEPTIONAL)
                throw new TimeoutException();
            if ((ex = getThrowableException()) != null)
                throw new ExecutionException(ex);
        }
        return getRawResult();
    }
    

    2.2.3 externalInterruptibleAwaitDone

    此方法的目的是将非工作线程阻塞,直至执行完毕或者被打断。期间有可能会触发窃取操作。

    /**
     * Blocks a non-worker-thread until completion or interruption.
     */
    private int externalInterruptibleAwaitDone() throws InterruptedException {
        int s; 
        //如果线程中断,则抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        //如果状态大于0且s为CountedCompleter的实现
        if ((s = status) >= 0 &&
            (s = ((this instanceof CountedCompleter) ?
                 //此时将采用pool的externalHelpComplete方法,此方法会触发窃取操作 ForkJoinPool.common.externalHelpComplete(
                      (CountedCompleter<?>)this, 0) :
                  ForkJoinPool.common.tryExternalUnpush(this) ? doExec() :
                  0)) >= 0) {
            while ((s = status) >= 0) {
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                 //加锁,等待,注意此时wait的参数为0。那么这个线程只能等待被唤醒
                    synchronized (this) {
                        if (status >= 0)
                            wait(0L);
                        else
                            notifyAll();
                    }
                }
            }
        }
        return s;
    }
    

    2.2.4 其他实现方法

    /**
     * Attempts to cancel execution of this task. This attempt will
     * fail if the task has already completed or could not be
     * cancelled for some other reason. If successful, and this task
     * has not started when {@code cancel} is called, execution of
     * this task is suppressed. After this method returns
     * successfully, unless there is an intervening call to {@link
     * #reinitialize}, subsequent calls to {@link #isCancelled},
     * {@link #isDone}, and {@code cancel} will return {@code true}
     * and calls to {@link #join} and related methods will result in
     * {@code CancellationException}.
     *
     * <p>This method may be overridden in subclasses, but if so, must
     * still ensure that these properties hold. In particular, the
     * {@code cancel} method itself must not throw exceptions.
     *
     * <p>This method is designed to be invoked by <em>other</em>
     * tasks. To terminate the current task, you can just return or
     * throw an unchecked exception from its computation method, or
     * invoke {@link #completeExceptionally(Throwable)}.
     *
     * @param mayInterruptIfRunning this value has no effect in the
     * default implementation because interrupts are not used to
     * control cancellation.
     *
     * @return {@code true} if this task is now cancelled
     */
    public boolean cancel(boolean mayInterruptIfRunning) {
        return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED;
    }
    
    public final boolean isDone() {
        return status < 0;
    }
    
    public final boolean isCancelled() {
        return (status & DONE_MASK) == CANCELLED;
    }
    

    实际上这些方法都是对status状态进行判断或者修改操作。当task执行完毕之后status小于0。cancel参照实际上是set status的状态为CANCELLED。

    2.3 外部调用方法

    2.3.1 fork

    在当前任务正在执行的pool中异步执行此任务,如果不是在ForkJoinPool中执行,则使用ForkJoinPool的commonPool。尽管不一定要强制执行,但是如果任务已完成,并重新初始化,则多次fork任务是错误的。除非调用join或者相关方法。或者调用isDone,否则,执行该任务的状态或执行该操作的任何数据的后续修改不一定可由执行该任务的线程以外的任何线程一致地观察到。

    /**
     * Arranges to asynchronously execute this task in the pool the
     * current task is running in, if applicable, or using the {@link
     * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}.  While
     * it is not necessarily enforced, it is a usage error to fork a
     * task more than once unless it has completed and been
     * reinitialized.  Subsequent modifications to the state of this
     * task or any data it operates on are not necessarily
     * consistently observable by any thread other than the one
     * executing it unless preceded by a call to {@link #join} or
     * related methods, or a call to {@link #isDone} returning {@code
     * true}.
     *
     * @return {@code this}, to simplify usage
     */
    public final ForkJoinTask<V> fork() {
        Thread t;
        //判断是否为工作线程
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            //加入工作队列
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            //外部执行push操作
            ForkJoinPool.common.externalPush(this);
        return this;
    }
    

    该方法执行的时候,判断是否为工作线程,如果为工作线程,则将任务添加到其工作队列。反之则调用externalPush方法。

    2.3.2 join

    返回isDone的时候的计算结果,此方法与get的不同之处在于,异常完成会导致RuntimeException。或者Error,而不是ExecutionException。且调用线程的中断不会导致方法通过抛出InterruptedException而突然返回。

    /**
     * Returns the result of the computation when it {@link #isDone is
     * done}.  This method differs from {@link #get()} in that
     * abnormal completion results in {@code RuntimeException} or
     * {@code Error}, not {@code ExecutionException}, and that
     * interrupts of the calling thread do <em>not</em> cause the
     * method to abruptly return by throwing {@code
     * InterruptedException}.
     *
     * @return the computed result
     */
    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
    

    join的主要逻辑在doJoin里面:

    /**
     * Implementation for join, get, quietlyJoin. Directly handles
     * only cases of already-completed, external wait, and
     * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
     *
     * @return status upon completion
     */
    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        //如果status小于0 则直接返回s 
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof
            //判断是否是工作队列
            ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            //等待join
            wt.pool.awaitJoin(w, this, 0L) :
            //外部等待执行完毕
            externalAwaitDone();
    }
    

    2.3.3 invoke

    开始执行此任务,在必要的时候等待完成,然后返回其结果。如果基础计算执行了此操作,则抛出RuntimeException或者Error。

    /**
     * Commences performing this task, awaits its completion if
     * necessary, and returns its result, or throws an (unchecked)
     * {@code RuntimeException} or {@code Error} if the underlying
     * computation did so.
     *
     * @return the computed result
     */
    public final V invoke() {
        int s;
        //执行doInvoke 并判断其状态
        if ((s = doInvoke() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
    

    主要执行逻辑都在doInvoke中.

    /**
     * Implementation for invoke, quietlyInvoke.
     *
     * @return status upon completion
     */
    private int doInvoke() {
        int s; Thread t; ForkJoinWorkerThread wt;
        //执行doExec方法,并返回,此时就是当前线程,有可能不是工作线程
        return (s = doExec()) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            //如果不是工作线程,通过pool的awaitJoin方法。
            (wt = (ForkJoinWorkerThread)t).pool.
            awaitJoin(wt.workQueue, this, 0L) :
            //执行外部等待执行完成的方法externalAwaitDone
            externalAwaitDone();
    }
    
    

    外部执行完成的方法代码如下:
    这个方法会阻塞非工作线程,直至执行完毕。

    /**
     * Blocks a non-worker-thread until completion.
     * @return status upon completion
     */
    private int externalAwaitDone() {
        //判断s是否是CountedCompleter实例
        int s = ((this instanceof CountedCompleter) ? // try helping
                 //如果是则执行comm的外部协助窃取方法
                 ForkJoinPool.common.externalHelpComplete(
                     (CountedCompleter<?>)this, 0) :
                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
        //如果s大于0 
        if (s >= 0 && (s = status) >= 0) {
            boolean interrupted = false;
            //循环 采用cas的方式
            do {
                //cas方式修改状态
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                   //同步加锁
                    synchronized (this) {
                        if (status >= 0) {
                            try {
                               //将当前线程置于wait状态。
                                wait(0L);
                            } catch (InterruptedException ie) {
                                interrupted = true;
                            }
                        }
                        else
                        //反之 唤醒全部线程
                            notifyAll();
                    }
                }
            } while ((s = status) >= 0);
            if (interrupted)
                Thread.currentThread().interrupt();
        }
        return s;
    }
    

    2.3.4 invokeAll

    分派给定的任务,在每个任务都保留isDone或者遇到未检查的异常的时候返回。在这种情况下,异常被重新抛出,如果一个以上的任务遇到异常,则此方法中将引发这些异常中的任何一个。如果任何任务遇到异常,但其他任务可能会被取消,但是,无法保证在异常返回的时候单个任务的执行状态,可以使用getException和相关方向来获取每个任务的状态,以检查它们是否已被取消,正常、或者未处理。

    /**
     * Forks the given tasks, returning when {@code isDone} holds for
     * each task or an (unchecked) exception is encountered, in which
     * case the exception is rethrown. If more than one task
     * encounters an exception, then this method throws any one of
     * these exceptions. If any task encounters an exception, the
     * other may be cancelled. However, the execution status of
     * individual tasks is not guaranteed upon exceptional return. The
     * status of each task may be obtained using {@link
     * #getException()} and related methods to check if they have been
     * cancelled, completed normally or exceptionally, or left
     * unprocessed.
     *
     * @param t1 the first task
     * @param t2 the second task
     * @throws NullPointerException if any task is null
     */
    public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
        int s1, s2;
        //第二个任务调用fork,用工作线程执行
        t2.fork();
        //第一个任务调用doInvoke,当前线程执行
        if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
            t1.reportException(s1);
        //之后用异步执行的第二个任务的结果阻塞
        if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
            t2.reportException(s2);
        }
    

    invokeAll还有一个变体,就是可以支持2个以上的任务。

    /**
     * Forks the given tasks, returning when {@code isDone} holds for
     * each task or an (unchecked) exception is encountered, in which
     * case the exception is rethrown. If more than one task
     * encounters an exception, then this method throws any one of
     * these exceptions. If any task encounters an exception, others
     * may be cancelled. However, the execution status of individual
     * tasks is not guaranteed upon exceptional return. The status of
     * each task may be obtained using {@link #getException()} and
     * related methods to check if they have been cancelled, completed
     * normally or exceptionally, or left unprocessed.
     *
     * @param tasks the tasks
     * @throws NullPointerException if any task is null
     */
    public static void invokeAll(ForkJoinTask<?>... tasks) {
        Throwable ex = null;
        int last = tasks.length - 1;
        //除第0个之外的任务都会调用fork进行处理,而第0个会用当前线程进行处理
        for (int i = last; i >= 0; --i) {
            ForkJoinTask<?> t = tasks[i];
            if (t == null) {
                if (ex == null)
                    ex = new NullPointerException();
            }
            else if (i != 0)
                t.fork();
            else if (t.doInvoke() < NORMAL && ex == null)
                ex = t.getException();
        }
        //遍历,除第0个任务之外,所有任务都调用doJoin,也就是说当前线程必须等待其他任务都执行完成。
        for (int i = 1; i <= last; ++i) {
            ForkJoinTask<?> t = tasks[i];
            if (t != null) {
                if (ex != null)
                    t.cancel(false);
                else if (t.doJoin() < NORMAL)
                    ex = t.getException();
            }
        }
        if (ex != null)
            rethrow(ex);
    }
    

    可以看到invoke方法会比单纯使用fork+join会节省一个线程。即当前的调用线程也会参与计算过程。如果不用invoke,只用fork和join,这样会造成当前线程浪费。

    3.异常机制

    3.1 异常table

    ForkJoinTask维护了一个ExceptionTable,其相关常量如下:

    // Exception table support
    
    //执行任务的时候抛出的异常表,使调用者能够进行报告,由于异常很少见,因此我们不直接将其保存在任务对象中,而是使用弱引用表。请注意,取消异常未出现在表中,而是记录状态值。注意,这些静态变量在下面的静态块中初始化。
    private static final ExceptionNode[] exceptionTable;
    private static final ReentrantLock exceptionTableLock;
    private static final ReferenceQueue<Object> exceptionTableRefQueue;
    
    //固定长度为32
    private static final int EXCEPTION_MAP_CAPACITY = 32;
    

    可以发现,维护了一个static的异常ExceptionNode的数组。这个数组长度为32位,其中内容是弱引用的ExceptionNode。也就是说,全局只会有一个异常表。

    3.2 ExceptionNode

    ExceptionNode是异常表的链接节点,链接hash表使用身份比较、完全锁定和key的弱引用。该表具有固定容量,因为该表仅将任务异常维护的时间足够长,以使联接者可以访问它们,因此该表在持续时间内永远不会变得很大。但是,由于我们不知道最后一个连接器何时完成,因此必须使用弱引用并将其删除。我们对每个操作都执行此操作(因此完全锁定)。另外,任何ForkJoinPool中的某个线程在其池变为isQuiescent时都会调用helpExpungeStaleExceptions

    static final class ExceptionNode extends WeakReference<ForkJoinTask<?>> {
        final Throwable ex;
        //链表结构
        ExceptionNode next;
        final long thrower;  // use id not ref to avoid weak cycles
        //hashCode
        final int hashCode;  // store task hashCode before weak ref disappears
        ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
           //初始化赋值
            super(task, exceptionTableRefQueue);
            this.ex = ex;
            //下一个记录
            this.next = next;
            //记录线程ID
            this.thrower =
            Thread.currentThread().getId();
            //赋值hashCode
            this.hashCode = System.identityHashCode(task);
        }
    }
    

    3.3 异常的调用方法

    3.3.1 recordExceptionalCompletion

    该方法负责记录异常,并设置状态

    /**
     * Records exception and sets status.
     *
     * @return status on exit
     */
    final int recordExceptionalCompletion(Throwable ex) {
        int s;
        //如果s的状态大于0 说明task没有执行完
        if ((s = status) >= 0) {
           //获取当前task的hashCode
            int h = System.identityHashCode(this);
            //定义锁
            final ReentrantLock lock = exceptionTableLock;
            //加锁
            lock.lock();
            try {
               //调用expungeStaleExceptions
                expungeStaleExceptions();
                //获取异常表引用
                ExceptionNode[] t = exceptionTable;
                int i = h & (t.length - 1);
                //循环遍历
                for (ExceptionNode e = t[i]; ; e = e.next) {
                    //在为空的位置插入当前异常
                    if (e == null) {
                        t[i] = new ExceptionNode(this, ex, t[i]);
                        break;
                    }
                    if (e.get() == this) // already present
                        break;
                }
            } finally {
                //解锁
                lock.unlock();
            }
            s = setCompletion(EXCEPTIONAL);
        }
        return s;
    }
    

    3.3.2 clearExceptionalCompletion

    删除异常节点并清理状态。

    private void clearExceptionalCompletion() {
        //识别hashCode
        int h = System.identityHashCode(this);
        //获得锁
        final ReentrantLock lock = exceptionTableLock;
        //加锁
        lock.lock();
        try {
            //拿到异常表
            ExceptionNode[] t = exceptionTable;
            int i = h & (t.length - 1);
            ExceptionNode e = t[i];
            ExceptionNode pred = null;
            //循环
            while (e != null) {
                ExceptionNode next = e.next;
                //将当前null清理
                if (e.get() == this) {
                    if (pred == null)
                        t[i] = next;
                    else
                        pred.next = next;
                    break;
                }
                pred = e;
                e = next;
            }
            expungeStaleExceptions();
            status = 0;
        } finally {
            lock.unlock();
        }
    }
    

    3.3.3 expungeStaleExceptions

    //取出并删除过时的引用,只在锁定的时候执行

    private static void expungeStaleExceptions() {
       //循环
        for (Object x; (x = exceptionTableRefQueue.poll()) != null;) {
            //判断 如果x是弱引用的异常节点
            if (x instanceof ExceptionNode) {
                //取得hashCode
                int hashCode = ((ExceptionNode)x).hashCode;
                //获得异常表
                ExceptionNode[] t = exceptionTable;
                int i = hashCode & (t.length - 1);
                ExceptionNode e = t[i];
                ExceptionNode pred = null;
                //循环 遍历清除
                while (e != null) {
                    ExceptionNode next = e.next;
                    if (e == x) {
                        if (pred == null)
                            t[i] = next;
                        else
                            pred.next = next;
                        break;
                    }
                    pred = e;
                    e = next;
                }
            }
        }
    }
    

    3.3.4 getThrowableException

    返回给定任务的可重试异常,为了提供准确的堆栈线索,如果异常不是又当前线程引起的,我们将尝试创建一个与引起异常类型相同的异常,但是记录的时候只会做为原因,如果没有这样的构造函数,我们会尝试使用一个无参的构造函数。后面跟着initCause,达到同样的效果。如果这些都不适用,或者由于其他异常而失败,我们返回记录的异常,它仍然是正确的,尽管它可能包含误导性的堆栈跟踪。

    private Throwable getThrowableException() {
        if ((status & DONE_MASK) != EXCEPTIONAL)
            return null;
        //拿到当前task的hashCode
        int h = System.identityHashCode(this);
        ExceptionNode e;
        //加锁
        final ReentrantLock lock = exceptionTableLock;
        lock.lock();
        try {
            //清理无效的引用
            expungeStaleExceptions();
            ExceptionNode[] t = exceptionTable;
            e = t[h & (t.length - 1)];
            while (e != null && e.get() != this)
                e = e.next;
        } finally {
            //解锁
            lock.unlock();
        }
        Throwable ex;
        if (e == null || (ex = e.ex) == null)
            return null;
        //如果异常不是当前线程引起的
        if (e.thrower != Thread.currentThread().getId()) {
            Class<? extends Throwable> ec = ex.getClass();
            try {
                Constructor<?> noArgCtor = null;
                Constructor<?>[] cs = ec.getConstructors();// public ctors only
                //循环
                for (int i = 0; i < cs.length; ++i) {
                    Constructor<?> c = cs[i];
                    Class<?>[] ps = c.getParameterTypes();
                    if (ps.length == 0)
                        noArgCtor = c;
                    else if (ps.length == 1 && ps[0] == Throwable.class) {
                       //返回一个新的异常实例
                        Throwable wx = (Throwable)c.newInstance(ex);
                        return (wx == null) ? ex : wx;
                    }
                }
                //如果noArgCtor不为空
                if (noArgCtor != null) {
                    Throwable wx = (Throwable)(noArgCtor.newInstance());
                    if (wx != null) {
                        //初始化原因
                        wx.initCause(ex);
                        return wx;
                    }
                }
            } catch (Exception ignore) {
            }
        }
        return ex;
    }
    

    4.适配其他类型task

    此外,ForkJoinTask还提供了适配其他类型任务的适配器。如下:

    4.1 AdaptedRunnable

    定义了一个适配器类,来适配Runnable
    实际上就是定义一个类将Runnable进行包裹,参考RunnableFuture的实现思路,这个适配器需要返回结果,结果通过泛型来定义。

    /**
     * Adaptor for Runnables. This implements RunnableFuture
     * to be compliant with AbstractExecutorService constraints
     * when used in ForkJoinPool.
     */
    static final class AdaptedRunnable<T> extends ForkJoinTask<T>
        implements RunnableFuture<T> {
        //需要适配的runnable
        final Runnable runnable;
        T result;
        AdaptedRunnable(Runnable runnable, T result) {
            if (runnable == null) throw new NullPointerException();
            this.runnable = runnable;
            this.result = result; // OK to set this even before completion
        }
        public final T getRawResult() { return result; }
        public final void setRawResult(T v) { result = v; }
        public final boolean exec() { runnable.run(); return true; }
        //调用invoke方法
        public final void run() { invoke(); }
        private static final long serialVersionUID = 5232453952276885070L;
    }
    

    4.2 AdaptedRunnableAction

    这个适配器不需要返回结果。与AdapteRunnable类似

    static final class AdaptedRunnableAction extends ForkJoinTask<Void>
        implements RunnableFuture<Void> {
        final Runnable runnable;
        AdaptedRunnableAction(Runnable runnable) {
            if (runnable == null) throw new NullPointerException();
            this.runnable = runnable;
        }
        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) { }
        public final boolean exec() { runnable.run(); return true; }
        public final void run() { invoke(); }
        private static final long serialVersionUID = 5232453952276885070L;
    }
    

    4.3 RunnableExecuteAction

    这是另外一种Runnable适配器,支持异常。

    static final class RunnableExecuteAction extends ForkJoinTask<Void> {
        final Runnable runnable;
        RunnableExecuteAction(Runnable runnable) {
            if (runnable == null) throw new NullPointerException();
            this.runnable = runnable;
        }
        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) { }
        public final boolean exec() { runnable.run(); return true; }
        void internalPropagateException(Throwable ex) {
            rethrow(ex); // rethrow outside exec() catches.
        }
        private static final long serialVersionUID = 5232453952276885070L;
    }
    

    4.4 AdaptedCallable

    适配Callables。有返回值。

    static final class AdaptedCallable<T> extends ForkJoinTask<T>
        implements RunnableFuture<T> {
        final Callable<? extends T> callable;
        T result;
        AdaptedCallable(Callable<? extends T> callable) {
            if (callable == null) throw new NullPointerException();
            this.callable = callable;
        }
        public final T getRawResult() { return result; }
        public final void setRawResult(T v) { result = v; }
        public final boolean exec() {
            try {
                result = callable.call();
                return true;
            } catch (Error err) {
                throw err;
            } catch (RuntimeException rex) {
                throw rex;
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }
        public final void run() { invoke(); }
        private static final long serialVersionUID = 2838392045355241008L;
    }
    

    4.5 adapt方法

    上面的适配器类通过适配器方法来调用:

    public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
        return new AdaptedCallable<T>(callable);
    }
    
    public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
        return new AdaptedRunnable<T>(runnable, result);
    }
    
    public static ForkJoinTask<?> adapt(Runnable runnable) {
        return new AdaptedRunnableAction(runnable);
    }
    

    分别对应三种适配器。

    5.总结

    ForkJoinTask是ForkJoinPool的基本执行单位。这个类的设计并不复杂,做为理解ForkJoinPool的补充。我们需要直到fork、join、invoke、invokeAll的每个方法的用法。invoke方法会比使用fork节约资源。
    另外我们可以借鉴其异常处理的模式,采用了弱引用。
    适配器模式也得到了很好的应用。在我们写代码的过程中,值得借鉴。

    相关文章

      网友评论

          本文标题:java线程池(七):ForkJoinPool源码分析之三(Fo

          本文链接:https://www.haomeiwen.com/subject/eynemktx.html