美文网首页JavaJavaConcurrent
ThreadPoolExecutor(3) —— 干活的人 Wo

ThreadPoolExecutor(3) —— 干活的人 Wo

作者: 若琳丶 | 来源:发表于2019-11-30 23:41 被阅读0次

    一、前言

    前一篇文章,大体说明了一下线程池如何添加一个新的Worker去执行任务。本篇来详细分析 Worker 本身。

    二、Worker 的结构

    2.1 Worker 整体结构

        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Worker所绑定的执行任务的线程. */
            final Thread thread;
            /** 初始化时需要执行的任务,有可能为空 */
            Runnable firstTask;
            /** 完成任务数 */
            volatile long completedTasks;
    
            /**
             * 通过给定的任务(有可能为空)来创建初始化,初始化时会创建一条线程进行绑定
             */
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** 实现Runnable接口的run方法  */
            public void run() {
                runWorker(this);
            }
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    

    可以看到 Worker 继承了AbstractQueuedSynchronizer,并实现了Runnable。

    • 继承了 AQS,说明 Worker 本身是个锁,而且在tryAcquire以及其他对AQS方法的实现,都说明了它不支持重入。因为参数都写死为1,如果是重入功能的锁的话,会支持累加(此处可能说的不详细,如果不明白可以参考 AQS 系列文章 ReentrantLock(一) —— AQS简介)。
    • 实现 Runnable,说明 Worker 本身是个可执行的任务类,它与它自身的thread 属性相互绑定(this.thread = getThreadFactory().newThread(this))

    2.2 runWorker 方法

    runWorker 方法是ThreadPoolExecutor的方法。

    /**
     * 执行 Worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 调用unlock()是为了让外部可以中断
        w.unlock(); // allow interrupts
        // 这个变量用于判断是否进入过自旋(while循环)
        boolean completedAbruptly = true;
        try {
            // 这儿是自旋
            // 1. 如果firstTask不为null,则执行firstTask;
            // 2. 如果firstTask为null,则调用getTask()从队列获取任务。
            // 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
            while (task != null || (task = getTask()) != null) {
                // 这儿对worker进行加锁,是为了达到下面的目的
                // 1. 降低锁范围,提升性能
                // 2. 保证每个worker执行的任务是串行的
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 如果线程池正在停止,则对当前线程进行中断操作
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                // 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
                // 这两个方法在当前类里面为空实现。
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 帮助gc
                    task = null;
                    // 已完成任务数加一 
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 自旋操作被退出,说明线程池正在结束
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    总结一下runWorker方法的执行过程:

    1. while循环中,不断地通过getTask()方法从workerQueue中获取任务
    2. 如果线程池正在停止,则中断线程。否则调用
    3. 调用task.run()执行任务;
    4. 如果task为null则跳出循环,执行processWorkerExit()方法,销毁线程workers.remove(w);

    这个流程图非常经典:


    Worker 的执行

    2.3 processWorkerExit 方法

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 这个变量用于表示是否进入过自旋。
        // 1. 如果没有进入过,该值为false
        // 2. 进入过,该值为true
        // 只有进入过自旋,worker的数量才需要减一
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
    
        // 通过全局锁的方式移除worker
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
    
        // 尝试终止线程池
        tryTerminate();
    
        int c = ctl.get();
        // 如果线程池状态为`SHUTDOWN`或`RUNNING`,
        // 则通过调用`addWorker()`来创建线程,辅助完成对阻塞队列中任务的处理。
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }
    

    2.4 Worker 是如何被启动的

    在上一篇 ThreadPoolExecutor(二) —— 线程池源码分析 中,ThreadPoolExecutor 成功将 Worker 添加到集合中后,调用的是 Worker 中 thread 的 start 方法(t.start())。我们知道 Thread 的 start 方法是会启动一个内存中的线程单元,并执行 run 方法:

    public
    class Thread implements Runnable {
        //此处的 target 将会是 Worker 实例
        public Thread(ThreadGroup group, Runnable target, String name,
                      long stackSize) {
            init(group, target, name, stackSize);
        }
    
        @Override
        public void run() {
            if (target != null) {
                target.run();
            }
        }
    }
    

    而在 Thread 执行 run 方法时,实际上调用的是它自身的 target 的run方法,此处的 target 就是与 Thread 绑定的 Worker 实例。我们再看一下两者绑定的过程。
    在 Worker 初始化时,会通过 ThreadFactory 创建一个 Thread 实例:

       Worker(Runnable firstTask) {
           setState(-1); // inhibit interrupts until runWorker
           this.firstTask = firstTask;
           // 在创建 Thread 实例并赋值时,Worker 将自己作为参数传入线程工厂的方法内
           this.thread = getThreadFactory().newThread(this);
       }
    

    此处 getThreadFactory 方法,返回的实际实例时 DefaultThreadFactory:

        /**
         * The default thread factory
         */
        static class DefaultThreadFactory implements ThreadFactory {
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            private final ThreadGroup group;
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix;
    
            DefaultThreadFactory() {
                SecurityManager s = System.getSecurityManager();
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                                      0);
                if (t.isDaemon())
                    t.setDaemon(false);
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    

    此处执行了它的 newThread 方法,其中 Runnable 对象 r,就是 Worker 对象,在此处将 Worker 对象传入 Thread 的构造方法中,与 Thread 完成绑定。

    所以,在线程池直接调用 Thread 的 start 方法,可以直接启动 Worker,执行 Worker 的 run 方法。

    三、线程是如何在线程池中运作的

    1、在线程池成功的 addWorker ,并且成功启动了 Worker 对应的 Thread以后,这个Thread就开始运作,运作的第一个任务,是 Worker 对象中的firstTask。
    2、当 firstTask运作结束,会通过 getTask() 方法从队列中获取任务。在这里获取到的 task,无需与当前的 Thread 对象有什么绑定关系,只需要在当前 Thread 中执行这个 task 的 run 方法即可。
    3、getTask 是从队列中获取 task 的核心逻辑,其中包含对线程数的判断以及是否允许核心线程数超时的判断。这些判断会影响从队列中获取task等待的时长,当然还有些比较细的内容需要额外的去说。
    4、当当前的 Thread 从 getTask 方法中获取的 task 为空时,就说明这个线程已经没用了,就会消亡

    三、有关 Worker 的一些疑问

    3.1 为什么 Worker 要继承 AQS

    • 上面已提到,ThreadPoolExecutor 需要的是不能重入的锁
    • runWorker 方法是在 ThreadPoolExecutor 中的,它的参数是需要执行 Worker。而runWorker 方法是支持多线程,只是不支持同一条线程(也就是同一个Worker)出现并发执行的情况,所以让 Worker 自己来上自己的锁。当然 Worker 也可以不用自己去实现 AQS,可以自己有一个 lock 的属性,初始化时创建一个 lock对象。此处集成 AQS 可能也是为了更简洁,更优雅。

    3.2 为什么 Worker 要实现 Runnable

    • Worker 实现Runnable 接口,是为了可以作为 Runnable 类型的参数,与 Thread 进行绑定,在 Thread 启动时,会启动 Worker 的 run 方法。

    3.3 为什么一定要与 Thread 绑定?为何在 ThreadPoolExecutor 启动 Worker 执行任务要调用 Worker 的 Thread,而不是 Worker 本身呢?

    • 因为只有调用 Thread 的 start 方法,才会在内存中启动一条新的线程单元,如果直接执行 Worker 的 run 方法,那仅仅是主线程执行了 run 方法,并没有启动一条新线程

    3.4 为什么不能启动 Worker 方法的run,然后 Worker 中 run 的内容,是启动 Worker 对象持有的 Thread 对象的 start 方法呢?这样做也可以启动一条新的线程单元啊?

    • 如此问题所描述的那样,如果 ThreadPoolExecutor 直接启动 Worker,并且 Worker 将 ThreadPoolExecutor 和 Worker 中持有的 Thread 对象隔离,想想其实也没啥问题,也有可能我没有想到问题的关键所在。如果有哪位大佬知道这么写的原因,还请下方留言~
    • 我能想到的,可能是类结构,或者代码写法上的偏好。当然也可能作者在写的时候,把 Worker 类就看做一个可执行的任务类,它的存在仅仅是对 Thread 的一层包装。这样想的话,Worker 确实有必要去实现 Runnable 接口。

    3.5 在 runWorker 方法中,只要当前 Worker 完成了所有任务,就跳出了 while 循环,并执行 finally 中的移除过程,那核心的线程也会被移除吗?

    • 此处有待更新

    3.6 Worker为什么不使用ReentrantLock来实现呢?

    • tryAcquire方法它是不允许重入的,而ReentrantLock是允许重入的。对于线程来说,如果线程正在执行是不允许其它锁重入进来的。线程只需要两个状态,一个是独占锁,表明正在执行任务;一个是不加锁,表明是空闲状态。

    四、总结

    本篇文章介绍了 ThreadPoolExecutor 中真正去执行任务的对象 —— Worker,Worker 与 Thread 之间的关系,以及 ThreadPoolExecutor 是如何去启动 Worker 的。再加上个人愚见,如果有理解错误或者缺失的地方,还请下方留言,大家一起交流,一起学习,一起成长。

    相关文章

      网友评论

        本文标题:ThreadPoolExecutor(3) —— 干活的人 Wo

        本文链接:https://www.haomeiwen.com/subject/lzbawctx.html