美文网首页
多线程并发编程17-线程池ThreadPoolExecutor源

多线程并发编程17-线程池ThreadPoolExecutor源

作者: Demo_zfs | 来源:发表于2020-03-28 17:56 被阅读0次

        今天来说一说线程池ThreadPoolExecutor,线程池主要解决两个问题:一是当执行大量异步任务时线程池能够提供较好的性能。在不使用线程池时,每当需要执行异步任务时直接new一个线程来运行,而线程的创建和销毁都需要开销。线程池中的线程是可以复用的,不需要每次执行异步任务都进行创建线程,从而减少了开销。二是线程池提供了一种资源限制和管理的手段,例如限制线程的个数、动态增加线程的个数、缓存异步任务等。

        ThreadPoolExecutor内部有一个原子性变量ctl,用来存储线程池的状态以及线程的个数,变量ctl高3位存储线程池状态,低29位存储线程个数。其中线程池状态有如下5中:

    RUNNING:接受新任务并且处理阻塞队列里的任务。

    SHUTDOWN:拒绝新任务但是处理阻塞队列里的任务。

    STOP:拒绝新任务并抛弃阻塞队列中的任务,并同时中断正在处理的任务。

    TIDYING:所有任务执行完(包括阻塞队列里的任务)后当前线程池活动线程数位0,线程状态变为次TIDYING,之后将要调用terminated方法。

    TERMINATED:调用terminated()方法后的状态。

    线程池之间状态变化如下所示:

    RUNNING -> SHUTDOWN:显式调用了shutdown()方法,或者隐式调用finalize()方法里面的shutdown()。

    (RUNNING or SHUTDOWN) -> STOP:显式调用shutdownNow()方法。

    SHUTDOWN -> TIDYING:当阻塞队列和线程池都为空时。

    STOP -> TIDYING:当线程池为空时。

    TIDYING -> TERMINATED:当terminated()方法执行完成时。

        下面通过线程池构造函数的参数来说说线程池的执行步骤。构造函数如下

    public ThreadPoolExecutor(int corePoolSize,

                                  int maximumPoolSize,

                                  long keepAliveTime,

                                  TimeUnit unit,

                                  BlockingQueue<Runnable> workQueue,

                                  ThreadFactory threadFactory,

                                  RejectedExecutionHandler handler)

    corePoolSize:线程池核心线程个数。

    maximumPoolSize:线程池最大线程个数。

    keepAliveTime:存活时间,当非核心线程处于闲置状态,这些闲置的非核心线程能存活的最大时间。

    unit:存活时间的时间单位。

    workQueue:任务队列,用来保存等待执行任务的阻塞队列。

    threadFactory:创建线程的工厂。

    handler:饱和策略。当线程个数已达到最大线程个数,并且任务队列也已满,继续添加任务则会指定该饱和策略,比如:AbortPolicy(抛出异常)、CallerRunsPolicy(使用调用则所在线程来运行任务)、DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)和DiscardPolicy(默默丢弃,不抛异常)。

        当调用线程池的executor方法执行任务的时候都会创建一个core线程(即使之前创建的core线程处于空闲状态),直到core线程数达到corePoolSize,当core线程数达到corePoolSize并且所有的core线程都在执行任务,这时再添加任务就会往BlockingQueue阻塞队列中放,当BlockingQueue阻塞队列也达到设置的容量之后,就会再创建线程来执行新添加的任务,直到创建的线程达到maximumPoolSize,如果线程达到maximumPoolSize还在添加任务,这时就会执行饱和策略(抛弃、报错或当前执行executor方法的线程去执行该任务)。

        创建ThreadPoolExecutor 时会指定一个keepAliveTime的参数,用来指定非core线程在空闲keepAliveTime的时候之后就会被回收,这个非core线程keepAliveTime超时之后被回收是通过在获取BlockingQueue阻塞队列中任务时添加一个超时时间(BlockingQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)),从而实现了非core线程 空闲时间超过keepAliveTime之后被回收。 

        下面对ThreadPoolExecutor内部主要方法源码进行讲解。

    execute(Runnable command)

        提交任务。

    public void execute(Runnable command) {

    //(1)检查提交的任务是否为null,是则抛出 NullPointerException异常。

        if (command == null)

            throw new NullPointerException();

    //(2)获取原子变量ctl,高3位表示线程池状态, 低29位表示线程个数

        int c = ctl.get();

    //(3)如果当前线程池的线程小于核心线程数,则尝试添加一个核心work线程到线程池中,并将当前任务作为新创建线程的第一个任务。

        if (workerCountOf(c) < corePoolSize) {

            if (addWorker(command, true))

                return;

            c = ctl.get();

        }

    //(4)如果线程池处于RUNNING状态,则添加任务到阻塞队列。

        if (isRunning(c) && workQueue.offer(command)) {

    //(5)因为是向阻塞队列中插入元素,这个操作有可能阻塞,在阻塞的这段时间有可能线程池状态发生了变化,所以这里需要再进行一次状态检查。

            int recheck = ctl.get();

    //(6)如果当前线程状态不是RUNNING则从队列中删除任务,并执行拒绝策略。

            if (! isRunning(recheck) && remove(command))

                reject(command);

    //(7)如果当前线程为空,则尝试添加一个非核心线程,代码走到这个分支有可能就是执行的核心线程为0。

            else if (workerCountOf(recheck) == 0)

                addWorker(null, false);

        }

    //(8)如果阻塞队列已满,则尝试创建一个非核心线程,并将当前任务作为该非核心线程的第一个任务。创建失败则执行拒绝策略。

        else if (!addWorker(command, false))

            reject(command);

    }

    addWorker(Runnable firstTask, boolean core)

        添加工作线程,可以是核心线程也可以是非核心线程。

    private boolean addWorker(Runnable firstTask, boolean core) {

        retry:

        for (;;) {

    //(1)获取ctl变量的值,并获取线程池状态。

            int c = ctl.get();

            int rs = runStateOf(c);

    //(2)检查阻塞队列是否只在必要时为空。

            if (rs >= SHUTDOWN &&

                ! (rs == SHUTDOWN &&

                  firstTask == null &&

                  ! workQueue.isEmpty()))

                return false;

    //(3)循环CAS增加线程个数。  

            for (;;) {

    //(4)如果线程个数超限则返回false,下面3中情况都会返回false

    //一:线程个数大于最大容量数。

    //二:如果创建的是核心线程,线程池的核心线程个数已大于corePoolSize指定的核心线程数大小。

    //三:如果创建的是非核心线程,线程池的线程数已大于 maximumPoolSize指定的线程数大小。

                int wc = workerCountOf(c);

                if (wc >= CAPACITY ||

                    wc >= (core ? corePoolSize : maximumPoolSize))

                    return false;

    //(5)增加线程数,使用CAS算法为ctl变量的低29位表示的数字加1。

                if (compareAndIncrementWorkerCount(c))

                    break retry;

    //(6)CAS失败,则看线程池状态是否变化了,变化则跳到外层循环重新尝试获取线程池状态,否则在内层循环重新CAS。

                c = ctl.get();  // Re-read ctl

                if (runStateOf(c) != rs)

                    continue retry;

                // else CAS failed due to workerCount change; retry inner loop

            }

        }

        boolean workerStarted = false;

        boolean workerAdded = false;

        Worker w = null;

        try {

    //(7)创建worker,在获取独占锁之前进行创建,减少锁占用的时间。

            w = new Worker(firstTask);

            final Thread t = w.thread;

            if (t != null) {

    //(8)尝试获取独占锁。

                final ReentrantLock mainLock = this.mainLock;

                mainLock.lock();

                try {

    //(9)再次检查线程池状态,有可能在获取锁前有别的线程调用了线程池的shutdown接口。

                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||

                        (rs == SHUTDOWN && firstTask == null)) {

                        if (t.isAlive()) // precheck that t is startable

                            throw new IllegalThreadStateException();

                        workers.add(w);

                        int s = workers.size();

                        if (s > largestPoolSize)

                            largestPoolSize = s;

                        workerAdded = true;

                    }

                } finally {

    //(10)释放获取的独占锁。

                    mainLock.unlock();

                }

    //(11)添加worker成功后,则调用该新增worker的start()方法开始执行任务。

                if (workerAdded) {

                    t.start();

                    workerStarted = true;

                }

            }

        } finally {

    //(12)如果添加worker失败,则通过CAS算法将ctl变量的低29位表示的数字减1,将该创建的worker移除,并检查线程池状态。

            if (! workerStarted)

                addWorkerFailed(w);

        }

        return workerStarted;

    }

        提交任务之后worker是如何执行任务的呢?下面来看一下worker是如何处理任务的。

    runWorker(Worker w)

        运行worker,从阻塞队列中持续获取任务进行执行。

    final void runWorker(Worker w) {

        Thread wt = Thread.currentThread();

        Runnable task = w.firstTask;

        w.firstTask = null;

        w.unlock(); // allow interrupts

        boolean completedAbruptly = true;

        try {

    // (1)调用getTask()方法从阻塞队列中持续获取任务进行执行。

            while (task != null || (task = getTask()) != null) {

    //(2)在执行任务之前获取worker中的锁,防止任务执行的时候被中断,除非线程池正在stop。

                w.lock();

                // If pool is stopping, ensure thread is interrupted;

                // if not, ensure thread is not interrupted.  This

                // requires a recheck in second case to deal with

                // shutdownNow race while clearing interrupt

    //(3)如果线程池正在stop,保证worker中的线程是中断状态,否则保证worker中的线程为非中断状态。

                if ((runStateAtLeast(ctl.get(), STOP) ||

                    (Thread.interrupted() &&

                      runStateAtLeast(ctl.get(), STOP))) &&

                    !wt.isInterrupted())

                    wt.interrupt();

                try {

                    beforeExecute(wt, task);

                    Throwable thrown = null;

                    try {

    //(4)执行任务。

                        task.run();

                    } catch (RuntimeException x) {

                        thrown = x; throw x;

                    } catch (Error x) {

                        thrown = x; throw x;

                    } catch (Throwable x) {

                        thrown = x; throw new Error(x);

                    } finally {

                        afterExecute(task, thrown);

                    }

                } finally {

                    task = null;

                    w.completedTasks++;

    //(5)释放worker的锁。

                    w.unlock();

                }

            }

            completedAbruptly = false;

        } finally {

    //(6)worker结束前执行清理任务。

            processWorkerExit(w, completedAbruptly);

        }

    }

    getTask() 

        从阻塞队列中获取任务,该方法实现了线程空闲过期策略。

    private Runnable getTask() {

        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {

            int c = ctl.get();

    //(1)获取线程池状态。

            int rs = runStateOf(c);

            //(2) 检查阻塞队列是否为空,空则将worker数减一并返回null,之后在 runWorker()方法中会对worker进行回收。

            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

                decrementWorkerCount();

                return null;

            }

            int wc = workerCountOf(c);

    //(3)worker是否设置闲置超时 ,核心worker也是可以设置进行闲置超时回收。

            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    //(4)worker已经闲置超时,并且阻塞队列为空,则将worker数减一,并返回null,之后在 runWorker()方法中会对worker进行回收。

            if ((wc > maximumPoolSize || (timed && timedOut))

                && (wc > 1 || workQueue.isEmpty())) {

                if (compareAndDecrementWorkerCount(c))

                    return null;

                continue;

            }

            try {

    // (5)使用阻塞队列带超时时间的poll(long timeout, TimeUnit unit)方法来实现闲置超时,闲置超时后在下一次进到循环中会执行(4)的代码。

                Runnable r = timed ?

                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                    workQueue.take();

                if (r != null)

                    return r;

                timedOut = true;

            } catch (InterruptedException retry) {

                timedOut = false;

            }

        }

    }

        上面讲完了ThreadPoolExecutor如何进行提交任务以及worker如何获取任务并执行任务。下面讲一下ThreadPoolExecutor中的一些其他常用方法。

    shutdown()

        调用shutdown方法后,线程池就会不再接受新的任务,但是阻塞队列中的任务还是会执行。该方法会立刻返回,并不会等待队列中任务完成后返回。

    public void shutdown() {

    //(1)尝试获取独占锁。

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

    //(2)检查权限。

            checkShutdownAccess();

    //(3)将当前线程池状态修改为SHUTDOWN状态,如果已为SHUTDOWN状态则直接返回。

            advanceRunState(SHUTDOWN);

    //(4)设置worker的中断标识。

            interruptIdleWorkers();

            onShutdown(); // hook for ScheduledThreadPoolExecutor

        } finally {

    //(5)释放锁。

            mainLock.unlock();

        }

    //(6)尝试将线程池状态设置为TERMINATED,以下两种情况会将线程池状态设置为TERMINATED

    //1.当线程池状态为SHUTDOWN并且线程池和阻塞队列都为空

    //2.当线程池状态为STOP并且线程池为空。

        tryTerminate();

    }

    shutdownNow() 

        调用shutdownNow方法后,线程池不再接受新的任务,并且会丢弃工作队列里面的任务,中断正在执行的任务(中断是通过设置中断标识完成的,如果任务没有对中断标识进行响应则不会停止该任务),该方法会立刻返回,返回值为阻塞队列里被丢弃的任务列表。

    public List<Runnable> shutdownNow() {

        List<Runnable> tasks;

    //(1)尝试获取锁。

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

    //(2)检查权限

            checkShutdownAccess();

    //(3)设置线程池状态为stop

            advanceRunState(STOP);

    //(4)设置所有worker中的线程的中断标志。

            interruptWorkers();

    //(5)将阻塞队列中的任务移动到tasks中。

            tasks = drainQueue();

        } finally {

    //(6)释放锁。

            mainLock.unlock();

        }

    //(7)尝试将线程池状态设置为TERMINATED,以下两种情况会将线程池状态设置为TERMINATED

    //1.当线程池状态为SHUTDOWN并且线程池和阻塞队列都为空

    //2.当线程池状态为STOP并且线程池为空。

        tryTerminate();

        return tasks;

    }

        线程池巧妙地使用了原子变量来记录线程池的状态以及线程个数。通过线程池状态来控制任务的执行,每个worker可以处理多个任务。线程池通过线程的复用减少了线程创建和销毁的开销。

        今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。

    相关文章

      网友评论

          本文标题:多线程并发编程17-线程池ThreadPoolExecutor源

          本文链接:https://www.haomeiwen.com/subject/yygjuhtx.html