美文网首页并发(个人收集)java之基础熟悉
ThreadPoolExecutor面试题及其原理?

ThreadPoolExecutor面试题及其原理?

作者: 三不猴子 | 来源:发表于2020-10-31 10:37 被阅读0次

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的读者更加明确线程池的运行规则,规避资源耗尽的风险。

  • FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
  • CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        // maximumPoolSize 必须大于 0,且必须大于 corePoolSize
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

第 1 个参数:corePoolSize 表示线程池的常驻核心线程数。如果设置为 0,则表示在没有任何任务时,销毁线程池;如果大于 0,即使没有任务时也会保证线程池的线程数量等于此值。但需要注意,此值如果设置的比较小,则会频繁的创建和销毁线程(创建和销毁的原因会在本课时的下半部分讲到);如果设置的比较大,则会浪费系统资源,所以开发者需要根据自己的实际业务来调整此值。

第 2 个参数:maximumPoolSize 表示线程池在任务最多时,最大可以创建的线程数。官方规定此值必须大于 0,也必须大于等于 corePoolSize,此值只有在任务比较多,且不能存放在任务队列时,才会用到。

第 3 个参数:keepAliveTime 表示线程的存活时间,当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量销毁的等于 corePoolSize 为止,如果 maximumPoolSize 等于 corePoolSize,那么线程池在空闲的时候也不会销毁任何线程。

第 4 个参数:unit 表示存活时间的单位,它是配合 keepAliveTime 参数共同使用的。

第 5 个参数:workQueue 表示线程池执行的任务队列,当线程池的所有线程都在处理任务时,如果来了新任务就会缓存到此任务队列中排队等待执行。

第 6 个参数:threadFactory 表示线程的创建工厂,此参数一般用的比较少,我们通常在创建线程池时不指定此参数,它会使用默认的线程创建工厂的方法来创建线程。

从源码的角度分析ThreadPoolExecutor的使用

  1. 添加任务的过程是怎么样的;

  2. 核心线程怎么保持不被销毁的;

  3. 核心线程、非核心线程、工作队列之间的协调。

execute()方法开始,从该方法看出线程池新建线程执行任务的策略:

  1. 当前线程数小于核心线程数的时候直接新建线程执行任务;
  2. 如果队列能够添加任务,则优先将任务添加到队列,而不是新建进程;
  3. 在队列已满的时候,如果线程数小于最大线程数,则新建非核心线程执行任务,否则报错。
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        //当前正在工作的线程小于核心的线程数,跳转到addWorker()方法,新建线程执行任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //大于等于核心线程数:当前线程在运行状态,且添加到任务队列成功了。
        //对于LinkedBlockingQueue队列,添加任务总会成功(它的任务队列长度很大),所以代码走这里,
        //对于SynchronousQueue因为添加失败(不能添加任务到队列),所以不会从这个分支走。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //重新检查状态,可能这个时候线程池被停止了,要remove新添加的任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //判断当前线程数是不是0,如果是,则新建一个线程执行新加到队列里面的任务,否则会没有线程去执行新添加进阻塞队列的任务(command)。
            //如果当前线程数大于0,则不需要添加,因为有线程在运行,在线程执行完成之后,后唤醒阻塞队列里面的任务,新加到阻塞队列里的任务还是能够被执行。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //对于SynchronousQueue会走到这里,所以,在线程数小于最大线程数的时候,会新建线程执行任务,当大于最大线程数时,会直接报错reject(),此时addWorker()会失败
        else if (!addWorker(command, false))
            reject(command);
    }

执行流程图如下:

img

addWorker()源码分析:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //在SHUTDOWN状态下,如果队列不为空,且firstTask == null,阻塞队列里面还有未完成的任务,则还是会继续执行后面的操作,
            //说明了在该状态下,能够执行阻塞队列里面的任务,firstTask为空说明了是添加了一个执行阻塞队列里面任务的线程
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //获取当前正在执行的线程数,判断添加进来的任务是不是核心任务,如果是,则正在执行线程数要<=corePoolSize ,
                //如果不是,则正在执行线程数要<=maximumPoolSize。否则新建线程失败。
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //线程数增加1,添加成功,跳出循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //新建一个Worker,下面说明这是个什么东西
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    //当前线程池的正在运行或者是SHUTDOWN状态且firstTask为空,则要新建线程去执行任务。
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //会执行Worker的run()方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

相关文章

网友评论

    本文标题:ThreadPoolExecutor面试题及其原理?

    本文链接:https://www.haomeiwen.com/subject/qsksvktx.html