美文网首页并发编程
Fork/Join框架运行原理

Fork/Join框架运行原理

作者: xiaolyuh | 来源:发表于2019-08-05 20:13 被阅读47次

    Fork/Join框架的入门可以参考Fork/Join框架。Fork/Join框架的核心类有两个一个是ForkJoinPool,它主要负责执行任务;一个是ForkJoinTask,主要负责任务的拆分和结果的合并;

    ForkJoinPool

    它和ThreadPoolExecutor一样也是一个线程池的实现,并且同样实现了Executor和ExecutorServiceie接口,类图如下:


    ForkJoinPool.png

    核心内部类 WorkQueue

    static final class WorkQueue {
    
        // 队列初始容量
        static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
    
        // 队列最大容量
        static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
    
        volatile int qlock;        // 1: locked, < 0: terminate; else 0
        // 下一次出队的索引位
        volatile int base;         // index of next slot for poll
        // 下一次入队索引位
        int top;                   // index of next slot for push
        // 存放任务的容器
        ForkJoinTask<?>[] array;   // the elements (initially unallocated)
        final ForkJoinPool pool;   // the containing pool (may be null)
        // 执行当前队列任务的线程
        final ForkJoinWorkerThread owner; // owning thread or null if shared
        volatile Thread parker;    // == owner during call to park; else null
    
        WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
            this.pool = pool;
            this.owner = owner;
            // Place indices in the center of array (that is not yet allocated)
            base = top = INITIAL_QUEUE_CAPACITY >>> 1;
        }
    
        // 入队
        final void push(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] a; ForkJoinPool p;
            int b = base, s = top, n;
            if ((a = array) != null) {    // ignore if queue removed
                int m = a.length - 1;     // fenced write for task visibility
                U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
                U.putOrderedInt(this, QTOP, s + 1);
                if ((n = s - b) <= 1) {
                    if ((p = pool) != null)
                        p.signalWork(p.workQueues, this);
                }
                else if (n >= m)
                    growArray();
            }
        }
    
        // 初始化扩展扩容ForkJoinTask<?>[]
        final ForkJoinTask<?>[] growArray() {
            ForkJoinTask<?>[] oldA = array;
            int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
            if (size > MAXIMUM_QUEUE_CAPACITY)
                throw new RejectedExecutionException("Queue capacity exceeded");
            int oldMask, t, b;
            ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
            if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
                (t = top) - (b = base) > 0) {
                int mask = size - 1;
                do { // emulate poll from old array, push to new array
                    ForkJoinTask<?> x;
                    int oldj = ((b & oldMask) << ASHIFT) + ABASE;
                    int j    = ((b &    mask) << ASHIFT) + ABASE;
                    x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
                    if (x != null &&
                        U.compareAndSwapObject(oldA, oldj, x, null))
                        U.putObjectVolatile(a, j, x);
                } while (++b != t);
            }
            return a;
        }
    
    
        // 出队
        final ForkJoinTask<?> poll() {
            ForkJoinTask<?>[] a; int b; ForkJoinTask<?> t;
            while ((b = base) - top < 0 && (a = array) != null) {
                int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
                t = (ForkJoinTask<?>)U.getObjectVolatile(a, j);
                if (base == b) {
                    if (t != null) {
                        if (U.compareAndSwapObject(a, j, t, null)) {
                            base = b + 1;
                            return t;
                        }
                    }
                    else if (b + 1 == top) // now empty
                        break;
                }
            }
            return null;
        }
    }
    

    WorkQueue主要作用是接受外部提交的任务,并且支持工作窃取。

    • 每一个WorkQueue对应一个ForkJoinWorkerThread来执行队列里面的任务;
    • 使用ForkJoinTask<?>[]数组来存储任务,这个数组初始化长度是INITIAL_QUEUE_CAPACITY = 1 << 13 = 8192,最大长度为MAXIMUM_QUEUE_CAPACITY = 1 << 26 = 67108864ForkJoinTask<?>[]是在第一次提交任务的时候初始化;
    • 通过basetop分别来记录下次出队的索引位和下一次入队的索引位。

    核心属性

    // Instance fields
    // 线程池控制位
    volatile long ctl;                   // main pool control
    // 线程池状态
    volatile int runState;               // lockable status
    // 工作队列数组
    volatile WorkQueue[] workQueues;     // main registry
    

    构造函数

    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }
    
    • int parallelism:并发度,默认是CPU核心数
    • ForkJoinWorkerThreadFactory factory:创建工作线程的工厂类
    • UncaughtExceptionHandler handler:异常处理的策略
    • int mode:队列算法标记,0:后进先出,65536:先进先出
    • String workerNamePrefix:工作线程名前缀

    核心方法

    execute()

    public void execute(ForkJoinTask<?> task) {
        if (task == null)
            throw new NullPointerException();
        externalPush(task);
    }
    

    只是判断了一下任务是否为null,然后就调用了externalPush方法。

    externalPush()

    final void externalPush(ForkJoinTask<?> task) {
        // ws工作队列数组;q:存放当前任务的工作队列;m:工作队列数组最后一个索引位
        WorkQueue[] ws; WorkQueue q; int m;
        // 获取一个随机数
        int r = ThreadLocalRandom.getProbe();
        int rs = runState;
        if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
            // 找到存放当前任务的工作队列q
            (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
            // 获取q队列上的锁
            U.compareAndSwapInt(q, QLOCK, 0, 1)) {
            // a:q队列中存放任务的数组;am是数组长度;n:数组中的任务数;q:下一个入队的索引位
            ForkJoinTask<?>[] a; int am, n, s;
            if ((a = q.array) != null &&
                // 判断队列是否满了
                (am = a.length - 1) > (n = (s = q.top) - q.base)) {
                // 计算存放任务的索引位
                int j = ((am & s) << ASHIFT) + ABASE;
                // 存放任务
                U.putOrderedObject(a, j, task);
                // 更新top指针(索引位)
                U.putOrderedInt(q, QTOP, s + 1);
                // 解锁
                U.putIntVolatile(q, QLOCK, 0);
                if (n <= 1)
                    // 尝试创建或者激活线程
                    signalWork(ws, q);
                return;
            }
            U.compareAndSwapInt(q, QLOCK, 1, 0);
        }
        // 完整版的push,处理一些不常见的情况,比如初始化工作队列数组workQueues,和新的工作队列workQueues[i]
        externalSubmit(task);
    }
    
    1. 先判断工作队列数组workQueues数组是否为NULL,如果是直接走下面完整版的push方法
    2. 根据随机数,找到当前任务所需要放入的工作队列p=workQueues[i]
    3. 如果p为NULL直接走下面完整版的push方法
    4. 获取q队列上的锁
    5. 判断队列是否满了,如果是直接走下面完整版的push方法
    6. 任务入队
    7. 解锁

    externalSubmit(task)整版的push方法,处理一些不常见的情况,比如初始化和扩容工作队列数组workQueuesworkQueues;新创建工作队列workQueues[i]

    createWorker()

    externalSubmit(task)方法中我们创建了工作队列后,我们需要将工作队列里面的工作线程启动起来,然后处理工作队列里面的任务,最终externalSubmit(task)方法会调用createWorker()方法创建工作线程,并启动线程。

    private boolean createWorker() {
        ForkJoinWorkerThreadFactory fac = factory;
        Throwable ex = null;
        ForkJoinWorkerThread wt = null;
        try {
            // 创建工作线程,并将调用registerWorker方法将工作线程和工作队列做绑定
            if (fac != null && (wt = fac.newThread(this)) != null) {
                // 启动工程线程
                wt.start();
                return true;
            }
        } catch (Throwable rex) {
            ex = rex;
        }
        // 最后解绑工作线程和工作队列的关系
        deregisterWorker(wt, ex);
        return false;
    }
    

    runWorker()

    ForkJoinWorkerThread.run()最终会调用ForkJoinPool.runWorker()方法,来循环的执行队列里面的任务。

    final void runWorker(WorkQueue w) {
        // 分配队列中存储任务的数据
        w.growArray();                   // allocate queue
        int seed = w.hint;               // initially holds randomization hint
        int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
        // 自旋,执行队列中的任务
        for (ForkJoinTask<?> t;;) {
            // 获取任务
            if ((t = scan(w, r)) != null)
                // 执行任务
                w.runTask(t);
            // 等待任务
            else if (!awaitWork(w, r))
                break;
            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
        }
    }
    

    这个方法最终回去调用到ForkJoinTask.exec()方法,进而调用到子类RecursiveTaskRecursiveActioncompute()方法,任务执行。

    ForkJoinTask

    fork()

    当我们调用ForkJoinTask的fork方法时,程序会将任务放到队列里面去,然后异步地执行这个任务。代码如下:

    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }
    

    push方法把当前任务存放到工作队列中的ForkJoinTask数组中,代码如下:

    final void push(ForkJoinTask<?> task) {
        ForkJoinTask<?>[] a; ForkJoinPool p;
        int b = base, s = top, n;
        if ((a = array) != null) {    // ignore if queue removed
            int m = a.length - 1;     // fenced write for task visibility
            U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
            U.putOrderedInt(this, QTOP, s + 1);
            if ((n = s - b) <= 1) {
                if ((p = pool) != null)
                    p.signalWork(p.workQueues, this);
            }
            else if (n >= m)
                growArray();
        }
    }
    

    join()

    Join方法的主要作用是阻塞当前线程并等待获取结果,代码如下:

    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
    private void reportException(int s) {
        if (s == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL)
            rethrow(getThrowableException());
    }
    

    首先,它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结 果,任务状态有4种:已完成(NORMAL)、被取消(CANCELLED)、信号(SIGNAL)和出现异常 (EXCEPTIONAL)。

    • 如果任务状态是已完成,则直接返回任务结果。
    • 如果任务状态是被取消,则直接抛出CancellationException。
    • 如果任务状态是抛出异常,则直接抛出对应的异常。

    让我们再来分析一下doJoin()方法的实现代码。

    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
                ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                                // 执行任务
                                tryUnpush(this) && (s = doExec()) < 0 ? s :
                                wt.pool.awaitJoin(w, this, 0L) :
                        // 阻塞非工作线程,直到工作线程执行完毕
                        externalAwaitDone();
    }
    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }
    

    在doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成, 则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。

    参考

    《java并发编程的艺术》

    源码

    https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

    spring-boot-student-concurrent 工程

    相关文章

      网友评论

        本文标题:Fork/Join框架运行原理

        本文链接:https://www.haomeiwen.com/subject/wrjmdctx.html