美文网首页并发(个人收集)java之基础熟悉
ThreadPoolExecutor面试题及其原理?

ThreadPoolExecutor面试题及其原理?

作者: 三不猴子 | 来源:发表于2020-10-31 10:37 被阅读0次

    线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的读者更加明确线程池的运行规则,规避资源耗尽的风险。

    • FixedThreadPool 和 SingleThreadPool:允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
    • CachedThreadPool 和 ScheduledThreadPool:允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            // maximumPoolSize 必须大于 0,且必须大于 corePoolSize
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    第 1 个参数:corePoolSize 表示线程池的常驻核心线程数。如果设置为 0,则表示在没有任何任务时,销毁线程池;如果大于 0,即使没有任务时也会保证线程池的线程数量等于此值。但需要注意,此值如果设置的比较小,则会频繁的创建和销毁线程(创建和销毁的原因会在本课时的下半部分讲到);如果设置的比较大,则会浪费系统资源,所以开发者需要根据自己的实际业务来调整此值。

    第 2 个参数:maximumPoolSize 表示线程池在任务最多时,最大可以创建的线程数。官方规定此值必须大于 0,也必须大于等于 corePoolSize,此值只有在任务比较多,且不能存放在任务队列时,才会用到。

    第 3 个参数:keepAliveTime 表示线程的存活时间,当线程池空闲时并且超过了此时间,多余的线程就会销毁,直到线程池中的线程数量销毁的等于 corePoolSize 为止,如果 maximumPoolSize 等于 corePoolSize,那么线程池在空闲的时候也不会销毁任何线程。

    第 4 个参数:unit 表示存活时间的单位,它是配合 keepAliveTime 参数共同使用的。

    第 5 个参数:workQueue 表示线程池执行的任务队列,当线程池的所有线程都在处理任务时,如果来了新任务就会缓存到此任务队列中排队等待执行。

    第 6 个参数:threadFactory 表示线程的创建工厂,此参数一般用的比较少,我们通常在创建线程池时不指定此参数,它会使用默认的线程创建工厂的方法来创建线程。

    从源码的角度分析ThreadPoolExecutor的使用

    1. 添加任务的过程是怎么样的;

    2. 核心线程怎么保持不被销毁的;

    3. 核心线程、非核心线程、工作队列之间的协调。

    execute()方法开始,从该方法看出线程池新建线程执行任务的策略:

    1. 当前线程数小于核心线程数的时候直接新建线程执行任务;
    2. 如果队列能够添加任务,则优先将任务添加到队列,而不是新建进程;
    3. 在队列已满的时候,如果线程数小于最大线程数,则新建非核心线程执行任务,否则报错。
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            
            int c = ctl.get();
            //当前正在工作的线程小于核心的线程数,跳转到addWorker()方法,新建线程执行任务
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //大于等于核心线程数:当前线程在运行状态,且添加到任务队列成功了。
            //对于LinkedBlockingQueue队列,添加任务总会成功(它的任务队列长度很大),所以代码走这里,
            //对于SynchronousQueue因为添加失败(不能添加任务到队列),所以不会从这个分支走。
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //重新检查状态,可能这个时候线程池被停止了,要remove新添加的任务
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                //判断当前线程数是不是0,如果是,则新建一个线程执行新加到队列里面的任务,否则会没有线程去执行新添加进阻塞队列的任务(command)。
                //如果当前线程数大于0,则不需要添加,因为有线程在运行,在线程执行完成之后,后唤醒阻塞队列里面的任务,新加到阻塞队列里的任务还是能够被执行。
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //对于SynchronousQueue会走到这里,所以,在线程数小于最大线程数的时候,会新建线程执行任务,当大于最大线程数时,会直接报错reject(),此时addWorker()会失败
            else if (!addWorker(command, false))
                reject(command);
        }
    

    执行流程图如下:

    img

    addWorker()源码分析:

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                //在SHUTDOWN状态下,如果队列不为空,且firstTask == null,阻塞队列里面还有未完成的任务,则还是会继续执行后面的操作,
                //说明了在该状态下,能够执行阻塞队列里面的任务,firstTask为空说明了是添加了一个执行阻塞队列里面任务的线程
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    //获取当前正在执行的线程数,判断添加进来的任务是不是核心任务,如果是,则正在执行线程数要<=corePoolSize ,
                    //如果不是,则正在执行线程数要<=maximumPoolSize。否则新建线程失败。
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    //线程数增加1,添加成功,跳出循环
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                //新建一个Worker,下面说明这是个什么东西
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        int rs = runStateOf(ctl.get());
                        //当前线程池的正在运行或者是SHUTDOWN状态且firstTask为空,则要新建线程去执行任务。
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        //会执行Worker的run()方法
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    相关文章

      网友评论

        本文标题:ThreadPoolExecutor面试题及其原理?

        本文链接:https://www.haomeiwen.com/subject/qsksvktx.html