Java 线程池系列(下)之 ThreadPoolExecuto

作者: 爱打乒乓的程序员 | 来源:发表于2020-04-10 00:37 被阅读0次

    通过上一篇文章Java 线程池系列(上)之线程池是什么东东?,简单的讲解了一下线程池的作用,使用场景以及ThreadPoolExecutor的一个Demo。那ThreadPoolExecutor是什么怎么创建线程和管理线程,有什么设计思想值得我们学习的呢?那带着疑问,开始看源码吧!

    类继承结构

    在了解ThreadPoolExecutor之前,先来看看其类继承结构

    Executor接口只定义了一个execute方法,当我们提交任务的时候就是调用此方法;ExecutorService接口继承Executor接口,定义了任务的执行和管理相关功能。

    public interface Executor {
        // 执行任务的方法
        void execute(Runnable command);
    }
    
    public interface ExecutorService extends Executor {
        // 关闭线程池,不再接受新的任务提交,但不会中断正在执行的任务,如果线程池已经关闭,再调用shutdown方法没有效果
        void shutdown();
    
        // 关闭线程池,不再接受新的任务提交,同时会中断当前正在运行的线程,并返回尚未执行的task的list 
        List<Runnable> shutdownNow();
    
        // 查看 executor 是否已经关闭了,返回值 true 表示已关闭
        boolean isShutdown();
    
        // 所有的任务是否都已经终止,是的话,返回 true
        boolean isTerminated();
    
        // 判断线程池是否已经关闭。根据设定的超时时间,如果达到超时时间后,线程池关闭或在超时之前线程池关闭返回true,否则返回false
        boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    
        // 提交有返回值的任务,返回封装了异步计算结果的Future
        <T> Future<T> submit(Callable<T> task);
    
        // 提交没有返回值的任务,并指定在调用Future的get方法时返回的result对象
        <T> Future<T> submit(Runnable task, T result);
    
        // 提交没有返回值的任务,返回封装了异步计算结果的Future
        Future<?> submit(Runnable task);
    
        // 给定任务集合,返回已经执行完成的 Future 集合
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    
        // 设定超时时间,根据给定的任务集合,返回已经执行完成的 Future 集合,如果超时则终止集合中还没执行的线程
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
    
        // 给定任务中有一个执行成功就返回,如果抛异常,其余未完成的任务将被取消
        <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    
        // 设定超时时间,根据给定任务中有一个执行成功就返回,如果超时则不会再执行集合中的任务
        <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    可以看出,ExecutorService接口定义了很多方法丰富的方法。而AbstractExecutorService抽象类实现了ExecutorService接口,封装好了许多Executor通用的方法如:submit、newTaskFor等方法

    public abstract class AbstractExecutorService implements ExecutorService {
        // 把Runnable对象转化成RunnableFuture对象
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
        // 把Callable对象转化成RunnableFuture对象
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
        // 提交无返回值的任务
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        // 提交任务,调用Future的get方法时返回的result对象
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
        // 提交有返回值的任务
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
        
        // 为省篇幅,其余方法源码暂时省略
    }
    

    ThreadPoolExecutor类注释

    ThreadPoolExecutor类原文注释十分长,信息量极大,有兴趣的朋友自行查看。以下是我看了注释后总结的几点:

    1. 线程池解决两个问题:A:通过减少任务间的调度开销,提高大量任务时的执行性能;B:提供了一种方式来管理线程和消费,维护一些基本数据统计等工作,比如统计完成的任务数量;
    2. 可根据实际场景调整线程池参数和使用可扩展的钩子方法;
    3. 可以使用更加便捷Executors工厂类创建线程池
    4. 当新的任务提交的时候,如果当前线程池工作线程数量小于coreSize,即使有工作线程空闲都会创建一个新的线程执行新任务;如果线程池工作线程数等于coreSize,若阻塞队列未满则将新任务添加到队列,如果队列不能再容纳新任务的时候,线程池会在maximumPoolSize范围内创建线程执行新任务
    5. 可以使用setCorePoolSize和setMaximumPoolSize动态设置coreSize和maximumPoolSize大小
    6. 默认情况下,线程池只有接收到新任务才会创建core线程。可以使用prestartCoreThread或prestartAllCoreThreads方法提前创建一个core线程或创建所有core线程
    7. 线程池默认通过ThreadFactory创建新线程,新建线程的线程组都是一样,优先级都是NORM_PRIORITY,都是非守护线程
    8. KeepAliveTime 参数的作用是当前线程池中超过 coreSize 的线程,且线程空闲的时间超过 keepAliveTime,当前线程就会被回收,可避免线程没有被使用时造成的资源浪费。程序中可调用setKeepAliveTime方法动态设置keepAliveTime;当设置allowCoreThreadTimeOut 为 ture 的时候,核心线程数的线程空闲时间超过keepAliveTime也会被回收
    9. 可根据实际情况设置线程池的阻塞队列,如:SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue
    10. 当线程池关闭或达到线程池最大线程数和最大队列都满的情况下,可以使用四种处理策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy
    11. 提供钩子方法可以在任务执行之前或之后执行特定的业务逻辑

    ThreadPoolExecutor 重要的成员变量

        // 高3位为线程池的状态,低29位为工作线程的数量
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        private static final int COUNT_BITS = Integer.SIZE - 3;
        //容量 000 11111111111111111111111111111
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // 线程池状态
        //运行中 111 00000000000000000000000000000
        private static final int RUNNING    = -1 << COUNT_BITS;
        //关闭 000 00000000000000000000000000000
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        //停止 001 00000000000000000000000000000
        private static final int STOP       =  1 << COUNT_BITS;
        //整理 010 00000000000000000000000000000
        private static final int TIDYING    =  2 << COUNT_BITS;
        //终止 011 00000000000000000000000000000
        private static final int TERMINATED =  3 << COUNT_BITS;
        
        // 阻塞队列,用于阻塞任务
        private final BlockingQueue<Runnable> workQueue;
    
        // 控制工作线程(worker)访问
        private final ReentrantLock mainLock = new ReentrantLock();
        private final Condition termination = mainLock.newCondition();
        
        // 线程池中所有的工作线程的集合
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
        // 线程池最大容量
        private int largestPoolSize;
    
        // 已经完成的任务数
        private long completedTaskCount;
    
        // 用于创建新线程的线程工厂
        private volatile ThreadFactory threadFactory;
    
        // 饱和或者运行中拒绝任务的 handler 处理类
        private volatile RejectedExecutionHandler handler;
    
        // 工作线程存活时间
        private volatile long keepAliveTime;
    
        // 如果设置为true,当核心线程数线程空闲时间超过keepAliveTime就会被回收
        private volatile boolean allowCoreThreadTimeOut;
    
        // 核心线程数
        private volatile int corePoolSize;
    
        // 最大线程数量
        private volatile int maximumPoolSize;
        
        // 默认的拒绝策略
        private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();    
    
        // 对于调用线程池的shutdown(),shutdownNow()方法权限认证
        private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
    

    线程池的静态方法

        //获取线程池的状态
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        //获取线程池中工作线程数量
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        //根据线程池的状态和工作线程数得到ctl
        private static int ctlOf(int rs, int wc) { return rs | wc; }
        // 判断线程池状态是否为可运行
        private static boolean isRunning(int c) { return c < SHUTDOWN; }
    

    ThreadPoolExecutor构造函数

        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {
            if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    不难看出,ThreadPoolExecutor的构造函数实际上都是调用同一个方法,接下来着重分析构造方法中参数含义以及其相关的知识点,这些是高频的面试题!熟悉这些参数变量,能够反映出对线程池的了解程度!

    • corePoolSize:核心线程数
    • maximumPoolSize:最大线程数
    • keepAliveTime:线程保持活跃时间
    • unit:keepAliveTime参数的单位
    • workQueue:保存任务的阻塞队列
    • threadFactory:线程工厂
    • handler:饱和策略

    corePoolSize、maximumPoolSize和keepAliveTime这三个参数决定了什么时候线程、什么时候回收线程。为了便于理解,画张图结合理解吧!


    workQueue参数的阻塞队列有多种选择,除了可以选择J.U.C包下的阻塞队列,还可以自定义适合特殊业务场景的阻塞队列。一般来说,不必要重复造轮子,大部分业务场景下J.U.C包下的阻塞队列已经够用。常见的ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue等等,具体区别与使用可参看我另外一个系列拙作:J.U.C 阻塞队列源码剖析系列(一)之 BlockingQueue 接口

    threadFactory用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

    RejectedExecutionHandler(饱和策略)指的是当阻塞队列满了之后,线程数量也达到最大值,无法再接受新任务的时候,可以根据饱和策略对新任务作出相应的处理。原生JDK线程池提供了4种饱和策略:

    1. AbortPolicy:直接抛出异常。
    2. CallerRunsPolicy:只用调用者所在线程来运行任务。
    3. DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    4. DiscardPolicy:不处理,丢弃掉

    除此之外,我们还可以自定义饱和策略满足业务场景的需求,比如:

    public class LogPolicy implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            if (!executor.isShutdown()) {
                // 持久化不能处理的任务
                insertToDB(r);
            }
        }
    }
    

    以上是ThreadPoolExecutor构造函数的参数详细解析和作用。接下来回到源码继续看一下线程池中的任务是“长”什么样的。

    线程池中的任务——Worker对象

    在线程池中,任务运行的最小单位为Worker对象,我们可以认为Worker对象是Runnable任务的代理。由下面源码可发现,Worker 类实现了Runnable接口,实现了run方法,在Worker对象初始化的时候,使用线程工厂(ThreadFactory)创建线程,所以Worker对象也具有执行任务的能力。

    // Worker 实现 Runnable,本身是一个可执行的任务
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
        // 任务运行的线程
        final Thread thread;
        // 需要执行的任务
        Runnable firstTask;
        //记录完成任务的数量
        volatile long completedTasks;
    
        // 初始化任务并调用线程工厂创建执行任务的线程
        Worker(Runnable firstTask) {
            setState(-1);
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        public void run() {
            //调用ThreadPoolExecutor类的runWorker(Worker)方法
            runWorker(this);
        }
    
        /**
         * 检测是否是否获取到锁
         * state=0表示未获取到锁
         * state=1表示已获取到锁
         */
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
    
        //使用AQS设置线程状态
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        //尝试释放锁
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        // 下面方法调用Reentrantlock的相关方法
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    
        // 如果线程正在运行,中断线程
        void interruptIfStarted() {
            Thread t;
            // state>=0 && t!=null && 且t没有被中断。如果都成立则中断线程
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    

    提交任务的时序图

    接下来就是分析线程池中,任务提交之后发生的事情,其中调用了几个核心方法,为方便理解,附上一张时序图:


    提交任务——submit与execute方法

    public abstract class AbstractExecutorService implements ExecutorService {
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
        // 把 Runnable 和 Callable 都转化成FutureTask,然后把FutureTask对象交给ThreadPoolExecutor类的execute方法执行
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    }
    

    submit方法在 AbstractExecutorService 抽象类定义的,其主要就是将Runnable和Callable对象转化为FutureTask对象,然后将FutureTask对象传给ThreadPoolExecutor类中的execute方法执行任务。

    ps.关于FutureTask类的使用和源码,可参考我另外一篇拙作:多线程之 FutureTask 源码剖析

    所以线程池的核心方法其实是executor方法,请继续往下看:

        /**
         * 要执行的任务可能会在线程池中创建新的线程或在线程池中已有的线程中执行;
         * 如果任务不能提交执行,有可能因为线程池已经关闭或者线程池的队列容量、线程池中线程数量达到阀值,然后任务由饱和策略处理
         */
        public void execute(Runnable command) {
            // 空任务跑异常
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            // 工作线程数小于核心线程数,调用addWorker方法创建新的线程并执行任务,成功返回,失败不抛异常
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    // 新建线程成功,跳出方法
                    return;
                // 新建线程失败,获取线程池状态
                c = ctl.get();
            }
            // 执行到这一步,证明工作线程数大于等于核心线程数,或者新建线程失败
            // isRunning(c):判断线程池状态是否正常
            // workQueue.offer(command):任务入队到阻塞队列
            if (isRunning(c) && workQueue.offer(command)) {
                // 获取线程池状态
                int recheck = ctl.get();
                // 如果线程池处于不可运行的状态,将任务从阻塞队列中移除
                if (! isRunning(recheck) && remove(command))
                    // 拒绝任务
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    //这里其实算是一个比较极端的情况,因为当线程池内的线程都被回收才会执行到这一步。
                    //如果阻塞队列内有任务等待线程执行,但线程池内工作线程居然都没了,那就需要新建线程执行任务
                    // 新增线程
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))// 执行到这一步,证明线程池已经不可用或阻塞队列不能再入队
                reject(command);
        }
    

    创建新线程执行任务——addWorker

        private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                // 获取线程池状态
                int rs = runStateOf(c);
    
                // 检查线程池状态是否已经关闭,如果线程池状态为关闭且线程池队列为空则返回false
                if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                    return false;
                // 自旋
                for (;;) {
                    // 线程池中工作线程的数量
                    int wc = workerCountOf(c);
                    // 工作线程的数量大于线程池的容量或根据core判断工作线程数是否大于核心线程数或最大线程数
                    if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // CAS操作工作线程数加1
                    if (compareAndIncrementWorkerCount(c))
                        // 结束for循环
                        break retry;
                    // 执行到这一步证明有其它的任务优先新增了线程,重新读取线程池的状态
                    c = ctl.get();  // Re-read ctl
                    // 如果线程池的状态改变
                    if (runStateOf(c) != rs)
                        // 重新从retry的位置循环
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            // 执行到这里,证明工作线程数已经加1,那么接下来就新增线程和执行任务
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                // 将任务封装成worker对象,并新建一个线程
                w = new Worker(firstTask);
                // 通过ThreadFactory创建的线程
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    // 加锁
                    mainLock.lock();
                    try {
                        // 再次检验线程池的状态
                        int rs = runStateOf(ctl.get());
    
                        // rs < SHUTDOWN:说明线程池的状态为Runnable
                        // rs == SHUTDOWN && firstTask == null:说明线程池已关闭且firstTask == null
                        // 为什么线程池关闭了还可以往下执行新线程呢?因为firstTask == null可能是由于阻塞队列已经满了,
                        // 需要新建工作线程执行任务,虽然此时线程池已经关闭,但还是要将阻塞队列中的任务先完成
                        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
                            // 预检测线程是否可用,当返回true代表线程已经启动,则抛出异常
                            if (t.isAlive())
                                throw new IllegalThreadStateException();
    
                            // 往线程集合中添加任务线程
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                // 更新线程池最大容量
                                largestPoolSize = s;
                            // 此处为一个标记,代表任务线程添加成功
                            workerAdded = true;
                        }
                    } finally {
                        // 释放锁
                        mainLock.unlock();
                    }
                    // 如果添加任务成功则启动线程
                    if (workerAdded) {
                        // 启动线程,实际上去执行 Worker.run 方法
                        t.start();
                        // 标记,代表线程启动成功
                        workerStarted = true;
                    }
                }
            } finally {
                // 如果启动不超过,执行addWorkerFailed移除集合中的任务
                if (!workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    线程池复用线程的奥秘——runWorker方法 + getTask方法

    runWorker方法中,通过while方法,不断从getTask方法中取出阻塞队列的任务,然后将任务赋值给Worker对象的task线程对象并调用run方法执行线程。如果getTask执行过程中,阻塞队列中没任务,线程池中的线程将会一直阻塞直到有新任务才会被唤醒;如果allowCoreThreadTimeOut==false的情况下,线程池中会保留核心线程数量的线程,如果除了核心线程数量之外的线程空闲时间超过keepAliveTime都会回收线程;如果allowCoreThreadTimeOut==true的情况下,只要线程池中的线程空闲时间超过keepAliveTime,无论线程池中线程数是否小于核心线程数,都会将线程回收。

        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            // 便于GC回收
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                // 循环获取任务
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                            (Thread.interrupted() &&
                                    runStateAtLeast(ctl.get(), STOP))) &&
                            !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
        // 从阻塞队列获取任务
        private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                // 线程池状态
                int rs = runStateOf(c);
    
                // 如果线程池状态为关闭或停止,阻塞队列为空,则减少工作线程数量
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
                // 获取工作线程的数量
                int wc = workerCountOf(c);
    
                // allowCoreThreadTimeOut设置为true,即使工作线程数小于等于核心线程,只要工作线程空闲时间超过keepAliveTime都会被回收
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                // timedOut 为 true说明下面 workQueue.poll 方法执行返回的是 null,阻塞等待keepAliveTime后依然没有数据,这时就应该减少线程的数量
                // 如果工作线程数大于线程池最大值或获取任务超时,只要存在工作线程或阻塞队列为空就会CAS减少工作线程的数量并返回null
                if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    // 从阻塞队列中获取task
                    // 如果需要超时控制,则调用poll(),否则调用take()
                    Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    再回顾一下,当我们调用submit方法的时候,实际上是将提交的Runnable或Callable对象通过newTaskFor方法转换为FutureTask对象,然后封装成为Worker对象的task属性。因此runWorker方法中task.run()方法实际上是调用FutureTask的run方法,那咱们打开FutureTask.run()的源码

        public void run() {
            // 如果当前 FutureTask 对象的状态不是 NEW 或者执行 CAS 操作赋值给 runnerOffset 失败直接跳出 run 方法
            if (state != NEW ||
                    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                            null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        // 执行任务
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // 设置 runner 为 null ,利于 GC
                runner = null;
                int s = state;
                // 如果有其它线程在中断任务,会调用 handlePossibleCancellationInterrupt 方法处理
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    

    关闭线程池——shutdown和shutdownNow方法

    相对于线程池执行任务来说,关闭线程池的逻辑就简单很多了。以下是关闭线程池的核心源码,我们在使用线程池的时候,必须要清晰的知道,shutdown和shutdownNow方法有什么区别。

    shutdown:将线程池状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。
    shutdownNow:将线程池的状态设置成STOP状态,然后中断所有任务(包括正在执行的)的线程,并返回等待执行任务的列表。

        public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 检查shutdown权限
                checkShutdownAccess();
                // 设置线程池控制状态为SHUTDOWN
                advanceRunState(SHUTDOWN);
                // 中断空闲worker
                interruptIdleWorkers();
                // 调用shutdown钩子函数
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            // 尝试终止
            tryTerminate();
        }
        public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 检查shutdown权限
                checkShutdownAccess();
                // 设置线程池控制状态为STOP
                advanceRunState(STOP);
                // 中断所有的worker
                interruptWorkers();
                // 清空任务队列
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            // 尝试终止
            tryTerminate();
            return tasks;
        }
    

    总结

    以上是ThreadPoolExecutor类的核心方法,通过多张辅助图结合源码分析,理解ThreadPoolExecutor整体就不难了。文章的开头就从ThreadPoolExecutor类的构造方法分析各个参数的作用,结合示例图片,我们清晰知道线程池中什么时候创建线程,什么时候会回收线程等机制。然后再从源码的角度,分析了ThreadPoolExecutor底层源码,提交任务后,将任务封装为Worker对象,然后通过Worker对象的线程简单粗暴的运行while循环从阻塞队列取出任务重复执行,当阻塞队列没有任务就会阻塞线程,直到有新任务唤醒线程继续执行。

    如果觉得文章不错的话,麻烦点个赞哈,你的鼓励就是我的动力!对于文章有哪里不清楚或者有误的地方,欢迎在评论区留言~

    参考资料:

    面试官系统精讲Java源码及大厂真题——37 ThreadPoolExecutor 源码解析
    多线程之 FutureTask 源码剖析
    《Java多线程编程实战指南-核心篇》

    相关文章

      网友评论

        本文标题:Java 线程池系列(下)之 ThreadPoolExecuto

        本文链接:https://www.haomeiwen.com/subject/qksbphtx.html