美文网首页JUC并发相关
25. 并发终结之ThreadPoolExecutor

25. 并发终结之ThreadPoolExecutor

作者: 涣涣虚心0215 | 来源:发表于2020-09-21 09:46 被阅读0次

线程池概述

线程池的好处:

  1. 线程可重用,线程是稀缺资源,使用线程池可减少创建和销毁线程的次数,每个工作线程都可以被重复使用。
  2. 线程并发数控制,可以根据系统承受能力,调整线程池中工作线程数量,提高系统响应能力,降低资源消耗。
ThreadPoolExecutor
关系图

ThreadPoolExecutor是线程池的核心类,要透彻的了解线程池,必须先了解这个类。

构造函数参数
  1. int corePoolSize:线程池当中所一直维护的线程数量,如果线程池处于任务空闲期间,那么该线程并不会被回收(allowCoreThreadTimeOut默认false,保证核心线程不会timeOut被清除)。prestartAllCoreThreads()方法可以初始化corePoolSize的核心线程。
  2. int maximumPoolSize:线程池所维护的最大线程数量。
  3. long keepAliveTime:超过CorePoolSize的线程(非核心线程)在keepAliveTime时间后一直处于空闲状态,那么超过的这部分线程将会被回收掉。
  4. TimeUnit unit:回收时间单位。
  5. BlockingQueue<Runnable> workQueue:向线程池提交的任务位于阻塞队列,它的实现有多种方式。
  6. ThreadFactory threadFactory:线程工厂类,提供创建新线程的方法。
  7. RejectedExecutionHandler:当线程池中的线程都忙于执行任务且阻塞队列已经满了的情况下,新提交的任务该如何被对待和处理:
    7.1 AbortPolicy:直接抛出异常RejectedExecutionException
    7.2 DiscardPolicy:空实现,什么都不做,直接丢弃当前任务(可能会导致future.get()一直阻塞)
    7.3 DiscardOldestPolicy:丢弃阻塞队列中存放最久的任务(poll对头元素),并且为当前所提交的任务留出一个队列中的空闲空间,以便将其放到队列中
    7.4 CallerRunsPolicy:直接由提交任务的线程来运行这个提交的任务
线程池状态

线程池控制状态的方式类似于ReentrantReadWriteLock通过state高低位表示读写状态,线程池通过一个AtomicInteger ctl来控制。

  1. 线程池本身的状态:ctl的左边高3位
  2. 线程池中所运行的线程的数量:ctl的其余29位;这里补充一下int类型左移<<知识(规则则是带符号位移,高位移出,低位补0,移动位数超过该类型的最大位数,则进行取模,如对Integer型左移34位,实际上只移动了两位。左移一位相当于乘以2的一次方,左移n位相当于乘以2的n次方。)
    1<<COUNT_BITS = 001带29个0
    CAPACITY = 1<<COUNT_BITS - 1 = 000带29个1
    ~CAPACITY = 111带29个0
    所以workerCountOf的判断就是当前count和CAPACITY相与&;
    runStateOf的判断就是当前状态值和~CAPACITY想与&;
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; =>29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; =>(2^29) - 1

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;  =>0
private static final int STOP       =  1 << COUNT_BITS;  =>00100000 (000...00)
private static final int TIDYING    =  2 << COUNT_BITS; =>01000000 (000...00)
private static final int TERMINATED =  3 << COUNT_BITS; =>01100000 (000...00)

private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }

线程池一共存在5种状态

  1. RUNNING:线程池可以接收新的任务提交,并且还可以正常处理阻塞队列中的任务。
  2. SHUTDOWN:不再接受新的任务提交,不过线程池还可以继续处理阻塞队列中的任务(已经入队列的任务还会继续处理)。
  3. STOP:不再接收新的任务,同时还会丢弃阻塞队列中既有的任务;此外,它还会中断正在处理中的任务。
  4. TIDYING:所有的任务都终止(也涵盖了阻塞队列中的任务),当前线程池中的活动线程数workCount降为0,即将会调用terminated()钩子方法(模板方法,由自定义线程池可实现做一些扩展操作)。
  5. TERMINATED:线程池的终止状态,当terminated()执行完毕之后,线程池将会处于该状态之下。
状态切换:
  1. RUNNING -> SHUTDOWN:当调用了线程池的shutdown()方法,或者当finalize()方法被隐式调用。
  2. RUNNING, SHUTDOWN -> STOP:当调用了线程池的shutdownNow()方法。
  3. SHUTDOWN -> TIDYING:在线程池与阻塞队列均变为空时。
  4. STOP -> TIDYING:在线程池变为空的时候(STOP的时候已经将queue的任务清空了)
  5. TIDYING -> TERMINATED:在terminated()被执行完毕时

源码分析

线程池提交任务的方式有两种:submit和execute

线程池提交之submit

submit方法有父类AbstractExecutorService实现。
submit有三种方式,最终都是将传递进来的任务转换为一个FutureTask。(RunnableFuture)对象进行处理。
之前我们做了FutureTask的源码解析,FutureTask可以接受Callable和Runnable类型,且最终都是封装成Callable的(RunnableAdapter适配器模式)。
且可以看到任务被转换成FutureTask之后都是调用execute()方法。

//Runnable在FutureTask构造函数内通过RunnableAdapter转换成Callable
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
//Runnable在FutureTask构造函数内通过RunnableAdapter转换成Callable,并且result为future.get()的返回值
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
//callable直接有FutureTask包装
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
线程池提交之execute

execute()方法有ThreadPoolExecutor自己实现,且是submit()内部会调用的方法。
该方法里主要的三个步骤:

  1. 如果工作的线程数小于corePoolSize,就创建一个新的worker线程,并将提交的任务作为其firstTask。
  2. 如果工作线程数大于corePoolSize,并且成功的加入workQueue,此时还需要重新确认一下运行状态,如果不再运行了(线程挂了或者线程池shutdown了),需要reject当前任务;
  3. 如果不能成功加入workQueue,并且工作线程数没有大于maxPoolSize,则添加非核心线程;如果大于maxPoolSize,则拒绝当前任务。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //注意这里加入workQueue是通过offer()方法,所以不会阻塞。
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        //这里重新检查线程运行状态以及工作线程数
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //添加非核心线程,如果添加失败,则拒绝策略拒绝当前任务。
    else if (!addWorker(command, false))
        reject(command);
}
Worker

在上面看到execute()方法,主要还是操作Worker类,该类继承AQS,那么它就提供了一些锁的功能。
state=0表示unlock
state=1表示locked
所以Worker继承了AQS之后,提供的tryAcquire()和tryRelease()就是将state分别置为1和0,来表示获得锁和释放锁。
那么在构造函数中还有一句state=-1,这里主要对应interruptIfStarted()方法,而该方法被interruptWorkers()调用,InterruptIfStarted()意味着中断只对已经started的worker线程有效,即getState() >= 0的线程。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    //Worker类所运行的线程
    final Thread thread;
    //初始化的运行任务
    Runnable firstTask;
    //完成的任务数
    volatile long completedTasks;
    
    //构造函数,接受一个Runnable firstTask,且注意Worker也是一个Runnable
    //这里thread会接受worker这个Runnable,而不是线程池提交的那个Runnable
    Worker(Runnable firstTask) {
        //这里将state置为-1,为了防止shutdown方法中断未启动的Worker线程,
        // 因为中断只会关注state>=0的线层任务,Worker启动的时候先调用unlock()方法,是为了响应中断。
        setState(-1); 
        this.firstTask = firstTask; 
        this.thread = getThreadFactory().newThread(this);
    }
    //Worker是一个Runnable,run()方法调用runWorker()方法
    public void run() {
        runWorker(this);
    }

    // 0表示 unlocked state.
    // 1表示 locked state.
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    //中断已启动的worker线程
    void interruptIfStarted() {
        Thread t;
        //主要针对getState() >= 0的worker线程
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
runWorker()

这个是Worker的核心方法,该方法在开始的时候会先调用w.unlock(),这与上面构造函数state=-1有一定的关系,unlock()方法主要将state置为0,那么意味着该worker线程已经被启动,那么调用interruptWorkers()方法时,该worker就会能响应中断。

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //将state从-1置为0,从而能够响应中断
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 检查中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //hook方法,由ThreadPoolExecutor子实现类提供
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //拿到firstTask,在这里调用其run()方法
                    //如果是submit方式,任务都会被包装成FutureTask,如果任务里没有捕获异常,
                    //    不会终止worker线程,而是future.get()的时候返回executionException
                    //如果是execute()方式,那么Runnable里面没有捕获异常,会导致Worker抛出异常提前终止。
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //hook方法
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
getTask()

该方法在runWorker()方法里调用,在worker的firstTask完成了之后,就会从workQueue里面去获取task来执行,如果queue是空了, 则该方法就会阻塞(一直或者一段时间)。

private Runnable getTask() {
    boolean timedOut = false; // 上一次轮询是不是timeout
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // 如果线程池shutdown了,并且workQueue已经空了,或者STOP了,即workQueue被终止。
        //那么就需要开始降workerCount了,并且return,不在阻塞。
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        int wc = workerCountOf(c);
        // 看是不是需要定时,如果allowCoreThreadTimeOut为true,则所有的线程在poll的时候都有等待时间;
        //如果allowCoreThreadTimeOut为false,那么只有非核心线程在poll的时候有等待时间。
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //当wc超过maximumPoolSize或者是非核心线程且上一次poll超时了
        //并且wc个数大于1,或者workQueue为空,则降低workerCount,并return null
        //这里主要就是非核心线程在从workQueue里面poll任务的时候,等待超时了,就会被回收,这会return null
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //再次检查workerCount的数量,如果超过了maxPoolSize,则不操作了。
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            //开始从workQueue里面poll或者take任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
processWorkerExit()

该方法在runWorker()方法最后finally里调用,completedAbruptly为true表示firstTask.run()方法抛出异常;completedAbruptly为false,表示正常结束。
一般来说getTask()在workQueue不为空的时候,会一直轮询workQueue;那么workQueue为空的时候,就需要阻塞(非核心线程会有等待时间限制)
所以该方法主要是在抛出异常后剔除该Worker。
当然getTask()也有一些时候会return null(比如非核心线程在poll任务的时候等待超时,会被回收了),那么这个方法同样会检查如果线程池还没stop的(包括shutdown,但是workQueue还没清空的时候),这时候还需要补充新的worker来完成workQueue的清理工作。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //如果run方法出了异常,那么要将worker的count降下来
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        //将该worker从workers集合中剔除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //尝试终止线程池,将其状态更新为TERMINATED,
    // 如果当前(SHUTDOWN and pool and queue empty) or (STOP and pool empty)
    tryTerminate();
  
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            //这里主要看如果运行的worker数目大于最小的数量,就不需要补充
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        //当一个Worker出现异常而退出或者WorkerCout小于min,那么就需要重新创建一个Worker来补充进来
        addWorker(null, false);
    }
}
addWorker

上面看过Worker之后,execute()方法会通过addWorker()来维护与Worker的关系。
addWorker(Runnable, boolean)接收Runnable作为参数,以及一个boolean表示是否是核心线程。
刚开始的两个for循环,外层for循环检查线程池状态;内层for循环检查worker线程数。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (; ; ) {
        //拿到ctl的值,通过runStateOf拿到状态值进行判断
        int c = ctl.get();
        int rs = runStateOf(c);
        //shutdown,stop,tidying,terminated状态都不会接受新的任务提交
        //或者running状态的时候,firstTask传进来是null,任务也不会被接收
        if (rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                        firstTask == null &&
                        !workQueue.isEmpty())) {
            return false;
        }
         //第二个for循环检查workerCount
        for (; ; ) {
            int wc = workerCountOf(c);
            //workerCount工作线程大于CAPACITY,肯定不行
            //如果是新建core核心线程,就看corePoolSize有没有超出
            //如果是新建core核心线程,就看maximumPoolSize有没有超出
            if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize)) {
                return false;
            }
            //如过检查都好,就CAS增加一个workCount
            if (compareAndIncrementWorkerCount(c)) {
                break retry;
            }
            //如果CAS更新失败就重新检查一下状态,如果状态变了就重试最外层for循环
            //否则就重试内部for循环
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs) {
                continue retry;
            }
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    //在状态和workerCount检查完毕并且workerCount加一成功之后,开始增加新的Worker
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //创建一个新的Worker,内部启动一个线程调用Worker的runWorker()方法
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                    {
                        throw new IllegalThreadStateException();
                    } 
                    //这个锁的内部主要就是添加到workers集合里
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize) {
                        largestPoolSize = s;
                    }
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            //上面worker创建并添加集合成功,就调用start()方法,启动worker的runWorker()方法
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (!workerStarted) {
            addWorkerFailed(w);
        }
    }
    return workerStarted;
}
shutdown

shutdown方法主要就是将状态设为SHUTDOWN,不再接收新的任务,然后中断没有工作的worker线程(主要通过tryLock()来看是不是在工作)
shutdown和shutdownNow最后都会调用tryTerminate()方法,就是检查线程池状态,最终更新为TERIMATED,更新成功则termination.signalAll()唤醒调用awaitTermination()而被阻塞的线程去优雅的关闭。

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //更新state到SHUTDOWN
        advanceRunState(SHUTDOWN);
        //中断idle的worker线程
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    //尝试去更新到TERMINATED状态
    tryTerminate();
}
//中断idle,即没有工作的worker线程
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            //主要通过tryLock()来看是不是在工作,因为runWorker()里面getTask()之后开始执行,是lock()了
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    //如果tryLock()成功,则表示worker线程是空闲的,会调用interrupt()方法来中断
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}
shutdownNow

shutdownNow与shutdown的区别就是状态更新到STOP,不再接收新的任务,并且中断所有启动的(state>=0)worker线程,并且返回workQueue里面没有处理的任务列表

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        //更新状态到STOP
        advanceRunState(STOP);
        //中断所有的启动的worker线程
        interruptWorkers();
        //从workQueue里面导出未处理的Runnable列表
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
//中断所有的启动的worker线程
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            //interruptIfStarted在Worker类里面分析过了,只对state>=0的Worker有效
            //因为初始化Worker的state是-1,只有runWorker()方法开始的时候调用unlock()将state更新为0
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
awaitTermination

从上面shutdown和shutdownNow看出一些区别,shutdown不会中断正在处理任务的线程,意味着一直会处理到workQueue被清空;shutdownNow则会中断正在处理中的任务,返回未处理的Runnable列表。
那么如果想要优雅的关闭线程池,就调用shutdown方法,但是又不想无止境的等下去,所以就可以通过awaitTermination(long, timeUnit)来设置一个等待终止的时间。
awaitTermination方法在await()一段时间之后,如果状态到了TERMINATED,表示线程池已经优雅的关闭了,返回true;如果线程池还没到TERMINATED状态,就返回false。

public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //一直循环直到等待时间到达,如果没有到达,则当前线程会被挂起
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

优雅关闭线程池

ExecutorService es = Executors.newFixedThreadPool(10);
try {
    es.shutdown();
    //超过等待时间,如果还有线程没执行完,不再等待,shutdownNow直接关闭线程池
    if (!es.awaitTermination(20, TimeUnit.SECONDS)) {
        es.shutdownNow();
    }
} catch (Throwable e) {
    es.shutdownNow();
}

相关文章

网友评论

    本文标题:25. 并发终结之ThreadPoolExecutor

    本文链接:https://www.haomeiwen.com/subject/dpowhktx.html