美文网首页Java互联网科技老男孩的成长之路
ThreadPoolExecutor线程池实现原理+源码解析,了

ThreadPoolExecutor线程池实现原理+源码解析,了

作者: java菲菲 | 来源:发表于2020-08-27 17:04 被阅读0次

    前言

    或许每个Java工程师都被问过这样一个问题

    Java中开启一个新的线程有几种方法?

    继承Thread类和实现Runnable接口。但是除了写Demo,几乎没人会在生产环境上这样用。具体原因如下:

    • 线程频繁的被创建、销毁,非常消耗资源
    • 这两种方式开启的线程都不便于统一的调度和管理
    • HotSpot虚拟机采用1:1的模型来实现Java线程的,也就是说一个Java线程直接通过一个操作系统线程来实现,如果可以无限制的开启线程,很容易导致操作系统资源耗尽。

    线程池

    继承Thread和实现Runnable的诸多缺点,<typo id="typo-270" ignoretag="true" data-origin="所以">所以</typo>生产环境必须使用线程池来实现多线程。

    线程池(thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 ——维基百科

    简单来说,“池”在计算机领域是指集合,线程池就是指线程集合。线程池可以对一系列线程的生命周期进行统一的调度和管理,包括线程的创建、消亡、生存时间、数量控制等。
    Java中的线程池从JDK1.5开始,有一个标准的实现java.util.concurrent.ThreadPoolExecutor,对于这个类,首先看下它的体系结构图

    image.png
    • Executor:只定义了一个方法execute,用于执行提交的任务
    • ExecutorService:定义了一些线程池管理、任务提交、线程池检测的方法
    • AbstractExecutorService:提供了ExecutorService接口执行方法的默认实现,用于统一处理Callable任务和Runnable任务

    内部结构

    这里主要关注类的定义和一些重要的常量、成员变量

    public class ThreadPoolExecutor extends AbstractExecutorService {
    
        // 高3位表示线程池状态,低29位表示worker数量
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        // 29 = 32 - 3
        private static final int COUNT_BITS = Integer.SIZE - 3;
        // 线程池允许的最大线程数。为 2^29 - 1
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
        // runState is stored in the high-order bits
        // 线程池有5种状态,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    
        // Packing and unpacking ctl
        // 获取线程池状态
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        // 获取线程池worker数量
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        // 根据线程池状态和worker数量生成ctl值
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    
       // 缓冲队列(阻塞队列)
        private final BlockingQueue<Runnable> workQueue;
    
        // 互斥锁
        private final ReentrantLock mainLock = new ReentrantLock();
    
        // 包含线程池工作的所以线程,仅在持有mainLock的时候能访问
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
        private final Condition termination = mainLock.newCondition();
    
        // 跟踪线程池最大的大小(实际的最大值),仅在持有mainLock的时候能访问
        private int largestPoolSize;
    
        // 记录已经完成的任务数,仅在工作线程终止时更新,仅在持有mainLock的时候能访问
        private long completedTaskCount;
    
        // 线程工厂
        private volatile ThreadFactory threadFactory;
    
        // 线程池饱和或者关闭时的执行器
        private volatile RejectedExecutionHandler handler;
    
        // 空闲线程等待工作的超时时间
        private volatile long keepAliveTime;
    
        // 如果为false(默认值),核心线程永远不回收
        // 如果为true,核心线程也通过keepAliveTime参数超时回收
        private volatile boolean allowCoreThreadTimeOut;
    
        // 核心线程数
        private volatile int corePoolSize;
    
        // 最大线程数(程序设置的最大线程数,区别于largestPoolSize)
        private volatile int maximumPoolSize;
    
        // 默认的拒绝策略处理器,抛出RejectedExecutionException异常
        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    }
    

    涉及到的成员变量、常量比较多,也不太容易理解,不过看完整篇后再来回顾这里,就很容易理解了。

    生命周期

    ThreadPoolExecutor类提供了线程池的五个状态描述

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    

    这几种状态之间的转换过程如下

    image.png
    • RUNNING:运行状态,可以执行任务,也可以接受阻塞队列里的任务调度
    • SHUTDOWN:调用了shutdown()方法,该状态可以继续执行阻塞队列中的任务,但是不会再接受新任务
    • STOP:调用了shutdownNow()方法,该状态会尝试中断正在执行的所有任务不能继续执行阻塞队列中的任务,也不会再接受新任务
    • TIDYING:所有任务都执行完毕,至于阻塞队列中的任务是否执行完成,取决于调用了shutdown()还是shutdownNow()方法
    • TERMINATED:terminated()方法执行完成后进入该状态,terminated()方法默认没有任何操作

    构造方法

    ThreadPoolExecutor提供了四个构造方法,忽略它提供的语法糖,我们直接看最吊的那个构造方法:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            // corePoolSize、maximumPoolSize、keepAliveTime都不能小于0
            // 且maximumPoolSize必须大于等于corePoolSize
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            // workQueue、threadFactory、handler均不能为null
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    这个构造方法有七个参数,如果能明白各个参数的作用,那么线程池的工作原理也就基本清晰了。

    • int corePoolSize:核心线程数,当有新的任务提交到线程池时,会进行如下判断:线程池中线程数量小于corePoolSize时,会创建新线程处理任务,即使还有其他空闲的核心线程线程池中线程数量等于corePoolSize时,任务会加入到workQueue缓存队列,直到缓存队列满了,才会新建非核心线程去处理任务线程池中的线程数量等于maximumPoolSize且缓存队列已满时,会根据RejectedExecutionHandler参数指定的拒绝策略来处理提交的任务如果corePoolSize和maximumPoolSize相等,则创建的线程池大小是固定的,缓存队列满了就执行决绝策略
    • int maximumPoolSize:最大线程数
    • long keepAliveTime:非核心线程的最长空闲时间,超过了会被回收(allowCoreThreadTimeOut参数设置成true,也会回收核心线程)
    • TimeUnit unit:keepAliveTime参数的单位
    • BlockingQueue<Runnable> workQueue:阻塞队列,用于缓存,保存正在等待执行的任务。一般有以下几种配置直接切换:常用的队列是SynchronousQueue无界队列:常用的队列是LinkedBlockingQueue,队列基于链表实现,最大长度是Integer.MAX_VALUE,虽然是有界的,但是值太大,所以认为是无界队列。使用无界队列可能会导致最大线程数maximumPoolSize失效,这点结合下文的线程池执行过程会很容易理解有界队列:常用的队列是ArrayBlockingQueue,基于数组实现,能把最大线程数控制为maximumPoolSize。也能避免阻塞队列中堆积的任务过多。
    • ThreadFactory threadFactory:线程Factory,用来创建线程。使用默认的ThreadFactory创建的线程是具有相同优先级的非守护线程。一般需要自定义ThreadFactory,因为要给每个线程设置有意义的名称
    • RejectedExecutionHandler handler: 当线程数达到了最大线程数,且没有线程空闲,且缓冲队列也满了(也就是线程池饱和了),指定拒绝策略,ThreadPoolExecutor自身提供了四种拒绝策略:AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常CallerRunsPolicy:利用调用者所在的线程执行任务,哪个线程提交这个任务,就由哪个线程执行DiscardOldestPolicy:丢弃缓存队列中头部的任务,重试提交的任务DiscardPolicy:直接丢弃显然默认的四种拒绝策略都不能很好的使用在生产环境,所以一般也需要自定义拒绝策略来处理饱和的任务。将暂时无法处理的任务存入中间件、数据库以及日志记录。

    线程池中线程的数量并不是越多越好,因为服务器的性能总是有限的。线程数过多会增加线程切换的开销,并且空闲线程的频繁回收也需要消耗资源。线程池的七个参数相辅相成,相互影响,设置的时候需要根据实际情况酌情考虑。
    看文字描述多少有些不清晰,如果能有张图的话就再好不过了。你就说巧不巧吧,刚好我画了一张图。

    image.png

    对照这张图和上面的描述,相信大家对ThreadPoolExecutor的七个参数有个深刻的认识。也很容易理解为什么使用无界队列LinkedBlockingQueue会使maximumPoolSize失效了,因为缓存队列可能永远不会满

    核心方法

    毫无疑问,线程池最核心的方法除了构造方法,就是执行task的方法了。在看ThreadPoolExecutor的核心方法之前,先看一个非常非常重要的内部类Worker,它是线程池中运行任务的最小单元。

    // 继承了AbstractQueuedSynchronizer,是一把锁
    // 实现了Runnable接口,是一个线程执行的task
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
    
        private static final long serialVersionUID = 6138294804551838833L;
    
        /** 运行任务的线程 */
        final Thread thread;
        /** 要运行的初始任务,可能为null */
        Runnable firstTask;
        /** 每个线程的任务计数器 */
        volatile long completedTasks;
    
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // 把自己作为一个任务传递给ThreadFactory创建的线程
            this.thread = getThreadFactory().newThread(this);
        }
    
        /** runWorker是一个非常重要的方法,后文详细介绍  */
        public void run() {
            runWorker(this);
        }
    
        // 值为0代表解锁状态
        // 值为1表示锁定状态
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
    
        // CAS的方式尝试加锁
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
        // 尝试释放锁
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
    
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    

    Worker类实现了Runnable接口,所以本身就是一个可执行的任务,并且在构造方法中将自己传递给ThreadFactory创建的线程去执行
    Worker类继承了AbstractQueuedSynchronizer类,所以它本身也是一把锁,执行任务的时候锁住自己,任务执行完成后解锁。
    了解了Worker类,再来看核心方法。

    execute

    execute方法用于在将来的某个时间执行指定的任务,execute方法源码比较复杂,应该先理清楚整体逻辑,在逐步深入细节。

    public void execute(Runnable command) {
        if (command == null)
            // 提交空任务,直接抛异常
            throw new NullPointerException();
    
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            // worker数量小于核心线程数,创建核心线程执行任务(第二个参数为true,表示创建核心线程)
            // addWorker方法会检查线程池的状态
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            // worker数量超过核心线程数,进入缓冲队列
            // 再次获取ctl值,因为从上次获取到这里,有可能ctl的值已经被改变,double-check
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                // 线程池不是RUNNING状态,说明已经调用过shutdown方法,需要对新提交的任务执行拒绝策略
                reject(command);
            else if (workerCountOf(recheck) == 0)
                // 因为构造方法中corePoolSize可能为0或者核心线程也都被回收了,所以此处需要判断
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            // 线程池不是RUNNING状态,或者任务加入缓冲队列失败,创建非核心线程执行任务(第二个参数为false)
            // 任务执行失败,需要执行拒绝策略
            reject(command);
    }
    

    整体逻辑就是前文所示的流程图。相信有了流程图的对比,execute方法的理解就容易多了。

    addWorker

    addWorker方法用于往线程池添加新的worker。其实现如下:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry: // 这种写法叫做label语法,一般用于多重性循环中跳转到指定位置
        for (;;) {
            // 外层自旋
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            // 线程池状态 >= SHUTDOWN
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                // 内层自旋
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    // 工作中的线程数大于线程池的容量,或者已经大于等于核心线程数,或者大于等于最大线程数
                    // core为true,表示要创建核心线程,false表示要创建非核心线程
                    // 为什么大于等核心线程数的时候要返回false,因为要添加到缓冲队列,或者创建非核心线程来执行,不能创建核心线程了
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    // 以CAS的方式尝试把线程数加1
                    // 注意这里只是把线程池中的线程数加1,并没有在线程池中真正的创建线程
                    // 成功后跳出内层自旋
                    break retry;
                // CAS失败,再次获取ctl,检查线程池状态    
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    // 线程池状态被改变了,从外层自旋开始再次执行之前的逻辑
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 可以看到两层自旋 + CAS,仅仅是为了把线程池中的线程数加1,还没有新建线程
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 把task包装成Worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    // 获取锁之后,再次检查线程池的状态
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            // 检查线程状态
                            throw new IllegalThreadStateException();
                        // 添加到worders
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            // 维护largestPoolSize变量
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 解锁
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 添加成功
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                // 执行worker的线程启动失败
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    可以看到addWorker方法前一部分,用了外层自旋判断线程池的状态,内层自旋 + CAS给线程池中的线程数加1。后半部分用了ReentrantLock保证创建Worker对象,以及启动线程的线程安全。一个方法中三次获取了线程池的状态(不包含该方法调用的其他方法),因为每两次之间,线程池的状态都有可能被改变。

    runWorker

    前文在介绍Worker内部类时说过,Worker会把自己传递给ThreadFactory创建的线程执行,最终执行Worker的run方法,而Worker类的run方法只有一行代码:

    runWorker(this);
    

    所以接下来看看runWorker方法是如何实现了

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 允许外部中断
        w.unlock(); // allow interrupts
        // 记录worker是不是异常退出的
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                // 自旋,如果task不为空,或者能从缓冲队列(阻塞队列)中获取任务就继续执行,不能就一直阻塞
                // 加锁
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    // 如果线程池正在停止,并且当前线程没有被中断,就中断当前线程
                    wt.interrupt();
                try {
                    // 钩子函数,处理task执行前的逻辑
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 钩子函数,处理task执行后的逻辑
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 完成的任务数加1
                    w.completedTasks++;
                    // 解锁
                    w.unlock();
                }
            }
            // 运行到这里,说明worker没有异常退出
            completedAbruptly = false;
        } finally {
            // 自旋操作被打断了,说明线程需要被回收
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    第10行代码中,task为null时,会通过getTask()方法从缓冲队列中取任务,因为缓冲队列是阻塞队列,所以如果获取不到任务会一直被阻塞,接下来看看getTask方法的内部实现

    getTask

    getTask用于阻塞式的从缓冲队列中获取任务。

    private Runnable getTask() {
        // 线程是否超时
        boolean timedOut = false; // Did the last poll() time out?
    
        for (;;) {
            // 自旋
            // 获取线程池状态
            int c = ctl.get();
            int rs = runStateOf(c);
    
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 线程池终止了,或者线程池停止了,且缓冲队列中没有任务了
                // 自旋 + CAS方式减少线程计数
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            // 根据allowCoreThreadTimeOut参数来判断,要不要给核心线程设置等待超时时间
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                // 当前线程数大于了maximumPoolSize(因为maximumPoolSize可以动态修改)或者当前线程设置了超时时间且已经超时了
                // 且线程数大于1或者缓冲队列为空
                // 这个条件的意思就是:当前线程需要被回收
                if (compareAndDecrementWorkerCount(c))
                    // 返回null后,上层runWorker方法中断循环,执行processWorkerExit方法回收线程
                    return null;
                continue;
            }
    
            try {
                // 从阻塞队列中获取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    // 成功获取任务
                    return r;
                // 没有获取到任务,超时
                timedOut = true;
            } catch (InterruptedException retry) {
                // 线程被中断,重试
                timedOut = false;
            }
        }
    }
    

    理解该方法的前提,是要理解阻塞队列提供的阻塞式API。
    这个方法重点关注两点:

    • 从缓冲队列取任务时,poll非阻塞,take阻塞,调用哪个由当前线程需不需要被回收来决定
    • 该方法返回null之后,上层方法会回收当前线程

    除了这几个核心方法之外,往线程池提交任务还有一个方法叫submit

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    
    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }
    
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    

    submit方法可以接收线程池返回的结果,也就是Futrue对象,可以接收Runnable对象和Callable对象。
    至于Future、FutureTask、Runnable、Callable之间的关系,博主在前一篇博客 如何获取子线程的执行结果 已经详细介绍过,此处不再赘述。

    至此ThreadPoolExecutor的核心方法的源码以及执行逻辑已经讲解完毕,再来看一些非核心方法,了解一下即可

    • public void shutdown():关闭线程池,已经提交过的任务还会执行(线程池中未运行完毕的,缓冲队列中的)
    • public List<Runnable> shutdownNow():停止线程池,试图停止正在执行的任务,暂停缓冲队列中的任务,并且返回
    • public void allowCoreThreadTimeOut(boolean value):设置核心线程是否允许回收
    • protected void beforeExecute(Thread t, Runnable r):钩子函数,处理线程执行任务前的逻辑,这里是空实现
    • protected void afterExecute(Runnable r, Throwable t):钩子函数,处理线程执行任务后的逻辑,这里是空实现
    • public int getActiveCount():返回正在执行任务的线程的大致数量
    • public long getCompletedTaskCount():返回执行完成的任务的大致数量

    除此之外还需要了解的是,构造方法中的七个参数,除了BlockingQueue是不能动态设置外,其余六个参数都可以动态设置,分别调用对于的setXxx方法即可,当然也可以通过对于的getXxx方法获取对应的信息。

    鉴于此,我们再来看一个常见的问题

    Java有几种线程池?

    JDK(准确的说是java.util.concurrent.Executors工具类)提供了四种线程池:

    • CachedThreadPool:缓冲线程池、
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    • FixedThreadPool:固定线程数的线程池
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    • SingleThreadExecutor:单线程的线程池
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
    }
    
    • ScheduledThreadPool:可定时调度的线程池
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    // ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以super()还是调用ThreadPoolExecutor的构造方法
    public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService
        public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
        }
    }
    

    仔细看下这四种线程池,最终都调用了ThreadPoolExecutor的构造方法,只是传递的参数有所不同。

    • CachedThreadPool和ScheculedThreadPool设置的最大线程数都是Integer.MAX_VALUE,可能线程数过多而产生OOM
    • SingleThreadExecutor和FixedThreadPool使用的都是无界队列,最大元素个数为Integer.MAX_VALUE,可能缓冲队列中堆积的任务过多,而产生OOM

    这两点正是阿里巴巴代码规范里禁止使用这四种线程池的原因。
    想要使用线程池,必须通过ThreadPoolExecutor的方法来创建线程池

    总结

    使用线程池需要注意的几点如下:

    • 合理设置七个参数
    • 自定义ThreadFactory,给每个线程设置有意义的名称
    • 自定义RejectedExecutionHandler,处理线程池饱和时的逻辑

    使用线程池之前一定要十分明确每个参数的意义以及对其他参数的影响,才能更加合理的使用线程池。

    作者:Sicimike
    原文链接:https://blog.csdn.net/Baisitao_/article/details/100415358

    相关文章

      网友评论

        本文标题:ThreadPoolExecutor线程池实现原理+源码解析,了

        本文链接:https://www.haomeiwen.com/subject/vxxhsktx.html