美文网首页Javassh
Java线程池源码分析

Java线程池源码分析

作者: _kkk | 来源:发表于2019-05-03 21:25 被阅读3次

    多线程是进行并发编程时运用到的最主要的技术,由于频繁的创建销毁线程会带来较大的开销,因此又引申出线程池的概念。在需要一个新的线程来执行任务时,从池子中找出一个空闲的线程来执行任务,减少了线程重复创建的次数,减少CPU等资源的消耗。

    经过JDK很好的封装,线程池使用起来非常的简单,只需要简单的参照一个示例代码,就能够使用上线程池,那么对于这种并发中最重要的技能,仅仅会用是不够的,必须深入理解其实现原理,那么一起从源码中一探究竟吧。

    首先在Intellij IDEA中打开java.util.concurrent目录,通过IDE可以生成concurrent包整体的层次结构图,对线程池技术的宏观了解有助于理解其底层实现,那么我从中截取了线程池相关的层次结构图如下:

    线程池类层次结构图

    那么今天的分析将从按照Executor->ExecutorService->AbsExeService->ThreadPoolExecutor的路线展开,但是其中前三个是接口和抽象类,分析的重点还是在ThreadPoolExecutor上。ForkJoinPoolScheduled相关的留到以后分析。

    Executor & ExecutorService

    Executor是一个接口,它的实例对象是用来执行被提交的Runnable任务,这个接口解耦了任务的提交和任务如何被执行(选择线程,执行时间等)。

    // 接口非常简单,只定义了一个方法
    public interface Executor {
        void execute(Runnable command);
    }
    

    ExecutorService接口拓展了Executor接口,提供了更多操纵线程的方法,比如管理线程的终止。Executor中定义的方法,无法获取线程执行的结果,无法了解线程执行状况,使用上有些许不便。因此ExecutorService还提供了可以返回Future对象的方法,Future对象可以理解为异步结果的占位符,可以通过它在之后某个时间获取异步操作的结果。

    public interface ExecutorService extends Executor {
        // 线程终止
        void shutdown();
    
        List<Runnable> shutdownNow();
    
        boolean isShutdown();
    
        boolean isTerminated();
    
        boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException;
        // submit方法拓展了execute方法,提供了Future对象用于管理任务的执行
        <T> Future<T> submit(Callable<T> task);
    
        <T> Future<T> submit(Runnable task, T result);
    
        Future<?> submit(Runnable task);
        // 批量执行任务
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException;
    
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                      long timeout, TimeUnit unit)
            throws InterruptedException;
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)
            throws InterruptedException, ExecutionException;
    
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                        long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    AbstractExecutorService分析

    ExecutorService只是一个接口,定义了很多方法。AbstractExecutorService则对其中submit开始的方法提供了默认实现。但是把execute和终止相关的方法的实现留给了ThreadPoolExecutor

    简单介绍下Future,其对象可以看做是一个异步执行任务的占位符,提供了get,cancel等方法,可以获取异步任务的结果,也可以取消异步任务的执行。比如在A线程中,开启了线程B去执行一个任务,那么A线程就无法了解B的状态,但是通过Future f = B.doAsync(),就可以通过Future绑定到执行结果。线程A就能从而得到B的执行结果。

    public abstract class AbstractExecutorService implements ExecutorService {
        // submit方法拓展了execute方法,使其可以返回一个Future对象
        // submit方法都会检测任务不为空
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
        // 绑定Future和task    
            RunnableFuture<Void> ftask = newTaskFor(task, null);
        // 真正的执行逻辑还是exectue,在具体类中实现    
            execute(ftask);
        // 返回future对象    
            return ftask;
        }
        // 逻辑类似,不重复分析
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
        // 绑定task和future对象
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
        // ...
        // invokeAll, invokeAny不介绍
    

    Executors类

    介绍完AbsExecutorService,应该到了ThreadPoolExecutor,但是这里还需要提到一个结构图中没有出现的类Executors,它提供了大量的ExecutorEexcutorService的工厂函数和工具方法,简化了线程池的创建和使用,使开发者在不了解线程池的原理的情况下也可以很快上手。线程池最佳实践中很少直接由开发者创建一个ThreadPoolExecutor对象,而是通过Executors提供的几个常用的工厂函数来创建线程池对象。Executors中提供了很多的静态工厂方法,那么我选出最常用的以及跟这篇文章息息相关的几个。

    // 创建一个线程数量固定的线程池,线程池数量为nThreads
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
    }
    // 通过指定的threadFactory来创建线程
    // 可以设置线程名字,优先级等
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            threadFactory);
    }
    // 创建一个可无限增长的线程池(注意场景,放止创建过多线程,内存溢出)
    // 省略需要传入threadFactory的工厂函数
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
            60L, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>());
    }
    // 创建只有一个线程的线程池
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService(
            new ThreadPoolExecutor(1, 1,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>()));
    }
    

    这里只列出了ThreadPoolExecutor相关的工厂方法,Executors中还包括了ForkJoinPoolScheduled的工厂方法。

    列出的工厂函数最终都是调用了ThreadPoolExecutor的构造函数,那么根据几个方法的作用及参数的组合,聪明的读者已经可以大概猜出构造函数中每个变量的意义。还没有注意到的可以回头比较一下几个工厂方法参数的差异,相信会有一些收获。

    ThreadPoolExecutor分析

    绕了这么多,终于来到今天的主角,线程池的实现类ThreadPoolExecutor.

    在没分析之前,首先思考一下如果自己来实现一个线程池,需要考虑哪些问题?

    • 如何创建线程,何时创建线程(初始化时全部创建?按需创建?)
    • 创建多少个线程?
    • 任务多于线程数量时,如何处理任务?
    • 线程池状态获取,管理
    • 空闲线程如何处理?

    构造函数

    // ThreadPoolExecutor继承了AbsExecutorService
    public class ThreadPoolExecutor extends AbstractExecutorService {
        ...
    }
    // 进入主题,从构造函数开始分析
    // 构造函数的参数非常多,所以有多个构造函数
    // 那么无论哪一个构造函数,最终都会调用该构造函数
    /* @param corePoolSize 核心线程数,即使处于空闲状态也会保存在线程池中,
     *        除非设置了allowCoreThreadTimeOut
     * @param maximumPoolSize 线程池最大可以拥有的线程数,超过核心线程数的
     *        线程会在空闲超过指定时间后销毁
     * @param keepAliveTime 超过核心线程数的线程的最大空闲时间
     * @param unit 时间单位
     * @param workQueue 用于保存还未执行的任务
     * @param threadFactory 线程工厂,用于创建新的线程
     * @param handler 当线程池大小边界和任务队列的边界到达时执行的拒绝策略
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // 对几个变量的值进行合法性校验                          
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        // 变量合法性校验    
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
            null : AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    构造函数中只是进行了简单的赋值操作,没有创建线程等相关操作,那么就继续从线程池的使用方法着手分析。

    线程池通过ExecutorServicesubmit方法执行任务,而在前面的分析中得到AbsExeService中的默认实现最终调用了ThreadPoolExecutor中的execute方法。

    execute()方法分析

    // 内部使用一个原子型整数保存两个状态,runState和workerCount
    // 整数的高3为表示运行状态,包括RUNNING,STOP,SHUTDOWN等5个状态
    // 其余位数表示线程的数量
    // 因为将两个变量放在一个整数里,所以对ctl变量提供了打包、解包方法
    // ctlof方法用于将runState和workerCount打包成一个整数
    // 初始化时runState=RUNNING, workerCount=0
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // runState详细信息
    // 接收任务并且处理排队的任务     
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 不接受新任务,处理队列中的任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 不接受新任务,不处理排队任务,中断正在处理的任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 所有任务已经终止,线程转换为TIDYING状态会调用钩子方法terminated()
    private static final int TIDYING    =  2 << COUNT_BITS;
    // terminated()方法执行完成
    private static final int TERMINATED =  3 << COUNT_BITS;
    
    public void execute(Runnable command) {
        // 判断command飞空
        if (command == null)
            throw new NullPointerException();
        // 读取ctl当前的值    
        int c = ctl.get();
        // workerCountOf方法解包ctl,读取其中线程数量的值
        // 如果当前工作线程小于核心线程数,调用addWorker()添加一个核心线程
        if (workerCountOf(c) < corePoolSize) {
            // 调用addWorker成功的话直接返回
            if (addWorker(command, true))
                return;
            // addWorker失败,则重新读取ctl,可能的失败原因后面分析    
            c = ctl.get();
        }
        //线程数大于核心线程数时,尝试将任务放在队列中
        // 如果runState=RUNNING并且任务队列添加command成功
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 如果队列添加成功,仍然需要执行双重检查
            // 避免在上次检查到目前的时间内,一个线程销毁了
            // 或者线程池ShutDown了
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        } 
        // 如果任务队列不能添加任务,尝试创建新的线程
        // 这里的addWorker线程添加的线程是核心线程之外的线程
        else if (!addWorker(command, false))
            // 如果创建失败,则拒绝改任务
            reject(command);
    }
    
    // 下面分析下execute需要用到的addWorker()方法
    /* @param firstTask 新线程的第一个任务
     * @param core 为true时以corePoolSize当做边界,否则maxPoolSize
     */   
    // 注意这个core的不同取值,区分是否添加核心线程
    // 可以回到上面的execute方法调用addWorker时的参数,体会下 
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 根据上面分析的runState方法,在某些状态需要检查队列是否为空
            //正常情况runState为RUNNING,除非调用shutDown等方法
    // 之前看过一篇博客,介绍如何分析源码。讲到一定要关注重点,这种异常情况如果
    // 想要研究清楚,比较耗时,且意义不大。跟着正常的逻辑阅读下去,会更容易理解
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                    firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                int wc = workerCountOf(c);
                //根据core的取值,选择以corePoolSize还是maximumPoolSize作边界
                // 如果线程池数量达到边界,则拒绝创建新线程,返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 通过cas操作增加线程数量,添加成功跳出retry循环    
                if (compareAndIncrementWorkerCount(c))
                    break retry;    
                c = ctl.get();
                // 如果runState发生变化,重新进入外层retry循环  
                if (runStateOf(c) != rs)
                    continue retry;
                // 否则是cas修改workerCount失败,重新进行cas操作
            }
        }
        // 通过cas将workerCount自增后进入这
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //Worker的构造函数通过threadFactory的newThread()创建一个新线程
            w = new Worker(firstTask);
            // 获取worker创建的线程
            final Thread t = w.thread;
            // threadFactory创建线程成功
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    //重新检查runState状态,RUNNING < SHUTDOWN
                    //也就是说正常情况会进入该分支
                    //或者处于shutdown状态,但是队列中还有任务
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                // workers = new HashSet<Worker>();
                //将新的worker加入集合          
                        workers.add(w);
                        int s = workers.size();
                        //更新最大线程数
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                    //修改workerAdded状态        
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //执行线程
                    t.start();
                    workerStarted = true;
                }
            }
        // 返回addWorker的结果,成功为true    
        } finally {
            if (! workerStarted)
            //添加worker失败调用addWorkerFailed方法
            //addWorkerFailed方法在workers集合中删除w,并将workerCount-1
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    Worker分析

    看完execute()addWorker(),已经对线程池如何工作有一定了解了,现在唯一的困惑可能就是Worker封装了线程后做了些什么,一起来看看。

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable {
        final Thread thread;
        Runnable firstTask;
        //worker已经完成任务的个数
        volatile long completedTasks;
    
        Worker(Runnable firstTask) {
        // AbstractQueuedSynchronizer方法,不分析了    
            setState(-1);
            //赋值firstTask,可能为空    
            this.firstTask = firstTask;
            //通过threadFactory的newThread方法创建新的线程
            //并把当前worker实例传入,Worker实现了Runnable接口,符合线程创建规则
            this.thread = getThreadFactory().newThread(this);
        }
        // 当调用thread.start()方法后执行run方法
        // 将任务委派给ThreadPoolExecutor的runWorker方法
        public void run() {
            runWorker(this);
        }
        //...省略一些方法
    }
    

    runWorker()方法

    runWorker()封装了线程的执行逻辑。

    final void runWorker(Worker w) {
        //获取到当前线程
        Thread wt = Thread.currentThread();
        //获取到需要执行的任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 允许中断
        boolean completedAbruptly = true;
        try {
            // 如果firstTask不为空或者队列中还有任务
            //getTask()不仅仅是获取任务这么简单,后面介绍
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //当线程池停止时,对线程进行中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //执行钩子方法,ThreadPoolExecutor默认是无操作
                    //可以通过继承ThreadPoolExecutor来修改其实现
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //执行任务逻辑
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //执行钩子函数
                        afterExecute(task, thrown);
                    }
                } finally {
                    //执行完成后,回收task,将已完成任务数量加1
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    // 分析下runWorker中调用的getTask()方法
    private Runnable getTask() {
        // 标志是否超时
        boolean timedOut = false; 
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // 如果runState状态为shutdown之后,忽略队列中的任务
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            //allowCoreThreadTimeOut为false(默认)时,核心线程即使空闲也不会回收
            //如果线程数大于核心线程数,timed=true
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    // 如果线程数大于最大线程数或者有多余的空闲线程超时了
    // 并且线程数大于1或者队列为空时,将ctl减1
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
            // 有多于核心线程数的线程时,timed为true
            // 则从任务队列中用poll()方法取任务,阻塞keepAliveTime秒的时间
            // 如果线程数小于等于核心线程数,则无限期的阻塞等待队列中的第一个元素   
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                // 如果队列中有任务,返回该任务并执行    
                if (r != null)
                    return r;
                // 队列中没有任务,标记超时,则在下一次循环中会将ctl减1
                // 并退出循环,然后进入runWorker方法中的processWorkerExit()销毁线程
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    

    这里稍微对上面总结下,一个Worker就对应了线程池中的一个线程,通过runWorker()方法执行任务,线程工作时首先执行传入Worker对象的firstTask,之后从workQueue中获取任务并执行。当线程数小于等于核心线程数时,从队列中获取任务时是无限期等待的,即没有任务时,会一直阻塞在获取任务的状态;但是当线程数多于核心线程数时,从阻塞队列中获取元素的阻塞时间为keepAliveTime, 超时后退出runWorker中的while循环,进入processWorkerExit()方法执行线程的销毁动作。

    processWorkerExit()方法

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // completedAbruptly用来判断ctl当前的值是否已经是应该存活线程的数量
        // 因为getTask方法中已经对ctl减1,所以runWorker方法传入的应该是false
        // 即不需要再次对ctl的值减1
        if (completedAbruptly) 
            decrementWorkerCount();
    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 整个线程池已经完成的任务数
            completedTaskCount += w.completedTasks;
            // 从Worker集合中删除Worker w
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        // tryTerminate()方法在RUNNING状态下不进行任何操作,直接返回
        // 其他runState情况不过多介绍
        tryTerminate();
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return;
            }
            addWorker(null, false);
        }
    }
    

    拒绝策略

    execute方法中,如果addWorker()失败,则会执行拒绝策略。

    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
    // 方法非常简单,调用我们传入的RejectedExecutionHandler对象的方法执行拒绝策略
    // 下面介绍几个简单的拒绝策略
    // (1)抛出异常
    public static class AbortPolicy implements RejectedExecutionHandler {
    
        public AbortPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                " rejected from " + e.toString());
        }
    }
    // (2)直接忽略新的任务
    public static class DiscardPolicy implements RejectedExecutionHandler {
    
        public DiscardPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
    }
    // (3)忽略队列中最后一个任务
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    
        public DiscardOldestPolicy() { }
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }
    

    总结

    看到这里,关于线程池的核心部分都已经知其然且知其所以然,还有一些修改内部状态,终止线程的方法就不过多介绍了。

    再用一张图总结一下线程池的工作流程

    线程池工作流程

    除了图中的流程,还需要记得线程池通过阻塞队列的poll()方法超时,来将空闲的多余线程(大于核心线程数的线程)销毁。

    往期回顾

    Java集合源码系列-HashMap

    Java集合源码系列-ArrayList

    Java动态代理原理剖析(一)

    欢迎关注我的公众号

    欢迎关注我的公众号,会经常分享一些技术文章和生活随笔~

    技术旅途

    相关文章

      网友评论

        本文标题:Java线程池源码分析

        本文链接:https://www.haomeiwen.com/subject/sfaznqtx.html