美文网首页
java线程池(八):ForkJoinPool源码分析之四(Fo

java线程池(八):ForkJoinPool源码分析之四(Fo

作者: 冬天里的懒喵 | 来源:发表于2020-10-22 14:43 被阅读0次

    [toc]

    1.类结构及其成员变量

    1.1 类结构和注释

    类结构代码如下:

    public class ForkJoinWorkerThread extends Thread {
        
    }
    
    image.png

    ForkJoinWorkerThread继承了Thread类,其注释大意如下:

    ForkJoinWorkerThread是由ForkJoinPool管理的线程,该线程执行ForkJoinTask。此类仅可做为扩展功能的需要而被集成,因为没有提供可以调度或者可重新的方法。但是,你可以覆盖主任务处理循环周围初始化和终止方法。如果确实创建了这个类的子类,还需要在ForkJoinPool中提供自定义的ForkJoinWorkerThreadFactory来使用。

    1.2 常量

    主要有两个:

    final ForkJoinPool pool;                // the pool this thread works in
    final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
    

    注释大意:
    ForkJoinWorkerThreads由ForkJoinPools管理,并执行ForkJoinTasks。请参见ForkJoinPool的内部文档。此类仅仅维护了指向pool和WorkQueue的链接。pool字段在构造的时候直接设置。但是直到对registerWorker调用完成之后,才设置workQueue字段。这将导致可见性竞争,可以通过要求workQueue字段仅由其所属线程访问来规避这个问题。对于InnocuousForkJoinWorkerThread子类的支持,要求我们在此处和子类中破坏很多封装,通过Unsafe以访问和设置Thread字段。
    这是两个final修饰的常量,智能初始化一次。

    2.构造函数

    2.1 ForkJoinWorkerThread(ForkJoinPool pool)

    /**
     * Creates a ForkJoinWorkerThread operating in the given pool.
     *
     * @param pool the pool this thread works in
     * @throws NullPointerException if pool is null
     */
    protected ForkJoinWorkerThread(ForkJoinPool pool) {
        // Use a placeholder until a useful name can be set in registerWorker
        super("aForkJoinWorkerThread");
        this.pool = pool;
        this.workQueue = pool.registerWorker(this);
    }
    

    其构造函数主要是创建一个在给定pool中的ForkJoinWorkerThread。
    构造函数中最主要的方法就是registerWorker。

    2.2 ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc)

    ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup,
                         AccessControlContext acc) {
        super(threadGroup, null, "aForkJoinWorkerThread");
        U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc);
        eraseThreadLocals(); // clear before registering
        this.pool = pool;
        this.workQueue = pool.registerWorker(this);
    }
    

    这个方法需要注意的是,采用unSafe方法,在这个类的INHERITEDACCESSCONTROLCONTEXT位置处,设置传入的AccessControlContext对象。
    之后调用方法eraseThreadLocals将threadLocals清除。
    之后与同用的构造函数一致。
    擦除ThreadLocal也是采用UnSafe来完成。通过putObject将ThreadLocals的位置设置为null。

    /**
     * Erases ThreadLocals by nulling out Thread maps.
     */
    final void eraseThreadLocals() {
        U.putObject(this, THREADLOCALS, null);
        U.putObject(this, INHERITABLETHREADLOCALS, null);
    }
    

    实际上这个方法将会提供给InnocuousForkJoinWorkerThread继承的时候使用。

    3.重要的方法

    3.1 run

    做为Thread,最重要的就是run方法,我们来看看ForkJoinWorkerThread的实现:

    public void run() {
         //如果workQueue为空,则抛出异常
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
               //调用onStart方法
                onStart();
                //调用pool的runWorker方法,运行workQueue
                pool.runWorker(workQueue);
            } catch (Throwable ex) {
                exception = ex;
            } finally {
               //操作完之后处理
                try {
                   //调用onTermination方法
                    onTermination(exception);
                } catch (Throwable ex) {
                   //如果出现异常,则进行异常处理
                    if (exception == null)
                        exception = ex;
                } finally {
                   //最终需要指向deregister方法
                    pool.deregisterWorker(this, exception);
                }
            }
        }
    }
    

    runWorker方法实际上是对workQueue结合随机魔数,选择一个workQueue进行遍历,调用scan方法,如果不为空则执行,反之则wait。
    其中registerWorker与deregisterWorker方法 ,我们可以参考前面的ForkJoinPool源码解读。

    3.2 定义的可扩展方法

    由于ForkJoinWorkerThread还支持继承扩展,因此在此定义了两个扩展的方法:

    /**
     * Initializes internal state after construction but before
     * processing any tasks. If you override this method, you must
     * invoke {@code super.onStart()} at the beginning of the method.
     * Initialization requires care: Most fields must have legal
     * default values, to ensure that attempted accesses from other
     * threads work correctly even before this thread starts
     * processing tasks.
     */
    protected void onStart() {
    }
    
    /**
     * Performs cleanup associated with termination of this worker
     * thread.  If you override this method, you must invoke
     * {@code super.onTermination} at the end of the overridden method.
     *
     * @param exception the exception causing this thread to abort due
     * to an unrecoverable error, or {@code null} if completed normally
     */
    protected void onTermination(Throwable exception) {
    }
    

    这两个方法用于执行之前和之后,onStart用于run实际执行之前,执行一些初始化操作。onTermination用于run实际执行之后,执行一些清理操作。

    4.内部类InnocuousForkJoinWorkerThread

    这个类就是继承了ForkJoinWorkerThread的一个实现类。此类定义了一个没有任何权限、也非用户定义的任何线程组的线程。这个线程在运行完每个top的task之后,会擦除所有的ThreadLocals。


    image.png

    源码如下:

    static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {
        /** The ThreadGroup for all InnocuousForkJoinWorkerThreads */
        private static final ThreadGroup innocuousThreadGroup =
            createThreadGroup();
    
        /** An AccessControlContext supporting no privileges */
        private static final AccessControlContext INNOCUOUS_ACC =
            new AccessControlContext(
                new ProtectionDomain[] {
                    new ProtectionDomain(null, null)
                });
    
        InnocuousForkJoinWorkerThread(ForkJoinPool pool) {
            super(pool, innocuousThreadGroup, INNOCUOUS_ACC);
        }
    
        @Override // to erase ThreadLocals
        void afterTopLevelExec() {
            eraseThreadLocals();
        }
    
        @Override // to always report system loader
        public ClassLoader getContextClassLoader() {
            return ClassLoader.getSystemClassLoader();
        }
    
        @Override // to silently fail
        public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { }
    
        @Override // paranoically
        public void setContextClassLoader(ClassLoader cl) {
            throw new SecurityException("setContextClassLoader");
        }
    
        /**
         * Returns a new group with the system ThreadGroup (the
         * topmost, parent-less group) as parent.  Uses Unsafe to
         * traverse Thread.group and ThreadGroup.parent fields.
         */
        private static ThreadGroup createThreadGroup() {
            try {
                sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe();
                Class<?> tk = Thread.class;
                Class<?> gk = ThreadGroup.class;
                long tg = u.objectFieldOffset(tk.getDeclaredField("group"));
                long gp = u.objectFieldOffset(gk.getDeclaredField("parent"));
                ThreadGroup group = (ThreadGroup)
                    u.getObject(Thread.currentThread(), tg);
                while (group != null) {
                    ThreadGroup parent = (ThreadGroup)u.getObject(group, gp);
                    if (parent == null)
                        return new ThreadGroup(group,
                                               "InnocuousForkJoinWorkerThreadGroup");
                    group = parent;
                }
            } catch (Exception e) {
                throw new Error(e);
            }
            // fall through if null as cannot-happen safeguard
            throw new Error("Cannot create ThreadGroup");
        }
    }
    
    

    AccessControlContext INNOCUOUS_ACC 定义了一个不支持任何特权访问的AccessControlContext。这个类会创建一个单独的threadGroup,以确保其不属于任何一个用户创建的ThreadGroup。

    5. ForkJoinPool中创建工作线程的过程

    此时再来结合ForkJoinPool中的ForkJoinWorkerThreadFactory,就能明白ForkJoinThread的创建意义了。ForkJoinPool根据访问权限的需要,定义了采用默认的创建方法,还是创建InnocuousForkJoinWorkerThread。

    5.1 makeCommonPool创建过程

    再ForkJoinPool重的makeCommPool,有如下代码:

    if (factory == null) {
        if (System.getSecurityManager() == null)
            factory = defaultForkJoinWorkerThreadFactory;
        else // use security-managed default
            factory = new InnocuousForkJoinWorkerThreadFactory();
    }
    

    这里也就是说,如果System.getSecurityManager()为null,则返回默认的ThreadFactory,而不为null,则说, 使用了默认的安全管理级别,因此将创建InnocuousForkJoinWorkerThreadFactory。

    5.2 createWorker创建过程

    ForkJoinWorkerThreadFactory fac = factory;
    try {
            if (fac != null && (wt = fac.newThread(this)) != null) {
                wt.start();
                return true;
            }
        } catch (Throwable rex) {
            ex = rex;
        }
    

    也就是说,createWorker根据ForkJoinWorkerThreadFactory的实现类来创建。

    5.3 InnocuousForkJoinWorkerThreadFactory

    static final class InnocuousForkJoinWorkerThreadFactory
        implements ForkJoinWorkerThreadFactory {
    
        /**
         * An ACC to restrict permissions for the factory itself.
         * The constructed workers have no permissions set.
         */
        private static final AccessControlContext innocuousAcc;
        static {
            Permissions innocuousPerms = new Permissions();
            innocuousPerms.add(modifyThreadPermission);
            innocuousPerms.add(new RuntimePermission(
                                   "enableContextClassLoaderOverride"));
            innocuousPerms.add(new RuntimePermission(
                                   "modifyThreadGroup"));
            innocuousAcc = new AccessControlContext(new ProtectionDomain[] {
                    new ProtectionDomain(null, innocuousPerms)
                });
        }
    
        public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
            return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread)
                java.security.AccessController.doPrivileged(
                    new java.security.PrivilegedAction<ForkJoinWorkerThread>() {
                    public ForkJoinWorkerThread run() {
                        return new ForkJoinWorkerThread.
                            InnocuousForkJoinWorkerThread(pool);
                    }}, innocuousAcc);
        }
    }
    

    5.4 DefaultForkJoinWorkerThreadFactory

    static final class DefaultForkJoinWorkerThreadFactory
        implements ForkJoinWorkerThreadFactory {
        public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
            return new ForkJoinWorkerThread(pool);
        }
    }
    

    6.总结

    ForkJoinWorkerThread实际上非常简单,就是结合ForkJoinPool,然后根据其需要,创建合适的线程的过程。这里面值得我们借鉴的是,如果需要创建无其他访问权限的线程,实际上这两种线程大部分内容都是相同的,因此可以通过继承来复用大部分代码。之后定义两个factory,让最终的用户根据需要选择factory。

    相关文章

      网友评论

          本文标题:java线程池(八):ForkJoinPool源码分析之四(Fo

          本文链接:https://www.haomeiwen.com/subject/ataumktx.html