线程池原理

作者: zhang_wq | 来源:发表于2018-06-28 19:16 被阅读0次

1、由于系统创建和销毁线程都会占用系统资源(CPU时间),如果对于某些执行耗时很少,但是数量很多的任务,大部分的时间都会花在创建和销毁线程,所以引入了线程池的概念;其实原理和数据库连接池(对象池)是一样的,都是为了避免某些不必要的损耗。

2、我们可以使用构造方法去初始化线程池,

ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 10, TimeUnit.MINUTES, new LinkedBlockingQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

其中有几个必要的参数:

(1)corePoolSize: 核心线程池的大小(可以理解为空闲时期线程池中最小数目,但是需要任何时期线程数量大于等于过corePoolSize),必须大于等于0。
(2)maximumPoolSize: 线程池中最大线程数量,必须大于0。
(3)keepAliveTime: 空闲时期非核心线程最大存活时间,必须大于0。
(4)workQueue: 任务队列。
(5)threadFactory: 线程创建工厂。
(6)handler: 饱和策略。

上面几个参数很好理解,也是初始化线程池的必要参数;然后线程池还提供了一个比较有意思的参数:

allowCoreThreadTimeOut: 是否允许核心线程超时,它的默认值是false

从字面意思理解,就是是否让核心线程超时销毁,所以我们一般所理解的核心线程不会被销毁,在这个值设置为true的时候,就是不正确的哦(我就被面试官问到过,然后还自信满满的说核心线程不会被销毁!);具体使用后面解释。

3、现在我们需要使用线程池来执行我们的任务,它提供了

// 使用Future模式,有三种重载
Future future = executor.submit(() -> System.out.println("do work"));

// 普通提交任务方式
executor.execute(() -> System.out.println("do work"));

等几种方法提交任务。但是最终都是使用execute方法去执行任务,只不过submit是对我们的任务进行了封装;所以我们关注execute的逻辑,究竟线程池是怎样帮助我们完成任务的。

/*
 * Proceed in 3 steps:
 *
 * 1. If fewer than corePoolSize threads are running, try to
 * start a new thread with the given command as its first
 * task.  The call to addWorker atomically checks runState and
 * workerCount, and so prevents false alarms that would add
 * threads when it shouldn't, by returning false.
 *
 * 2. If a task can be successfully queued, then we still need
 * to double-check whether we should have added a thread
 * (because existing ones died since last checking) or that
 * the pool shut down since entry into this method. So we
 * recheck state and if necessary roll back the enqueuing if
 * stopped, or start a new thread if there are none.
 *
 * 3. If we cannot queue task, then we try to add a new
 * thread.  If it fails, we know we are shut down or saturated
 * and so reject the task.
 */
// 获取当前运行状态以及线程数量
int c = ctl.get();
// 如果线程数小于核心线程数,则创建新线程(无论线程池中是否有可以执行任务的线程)
if (workerCountOf(c) < corePoolSize) {
    // 创建新线程并且执行任务 
    if (addWorker(command, true))
        return;
    // 核心线程创建失败,获取当前运行状态以及线程数量
    c = ctl.get();
}
// 判断是否可以接受新任务(当前运行状态),以及队列是否满
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    // 再次检测,如果当前运行状态是非RUNNING,而且任务移除成功,那么拒绝任务(会执行饱和策略)
    if (! isRunning(recheck) && remove(command))
        reject(command);
    // 当前运行状态是RUNNING或者移除任务失败,再判断未终止的线程数是否等于0,是则创建新线程
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
// 当运行状态为非RUNNING或者队列满了(逻辑上如此,但是实际在addWorker中如果运行状态是非RUNNING并且传入任务非空,是无法创建新线程的),创建新线程;如果创建失败(由于运行状态、或者最大线程数),则拒绝任务
else if (!addWorker(command, false))
    reject(command);

以上是execute的源码,大致上的逻辑我们都清楚了,有一点需要我们注意,就是核心线程的创建,它并不是有可执行任务的核心线程就不去创建,而是只要当前线程数小于核心线程数的时候,有新任务添加就会直接创建核心线程,然后我们需要明白它是如何判断运行状态的;线程池中提供了几个常量:

// 位移位数 32 - 3
private static final int COUNT_BITS = Integer.SIZE - 3;
// 计量新建线程数量的最大值 000111...111(29个1)
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// RUNNING状态 111000...000(29个0)
private static final int RUNNING    = -1 << COUNT_BITS;
// SHUTDOWN状态 000000...000(32个0)
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// STOP状态 001000...000(29个0)
private static final int STOP       =  1 << COUNT_BITS;
// TIDYING状态 010000...000(30个0)
private static final int TIDYING    =  2 << COUNT_BITS;
// TERMINATED状态 011000...000(29个0)
private static final int TERMINATED =  3 << COUNT_BITS;

以上常量就是用来表示线程池运行状态的,然后记录当前运行状态以及线程数量用

// 将当前线程池设置为RUNNING状态,并计入0个线程
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

所以我们也就可以理解,它用了32位的整型做状态表示以及计数,前三位表示运行状态,后29位用来计数。所以对于源码里面的几个函数我们也就可以理解了

// 判断当前运行状态 c是ctl.get()获取的当前运行状态以及线程数量值,然后与上111000...000
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 计算当前线程数目 c是ctl.get()获取的,然后与上000111...111
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 运行状态(rs)下计入线程数量(wc)
private static int ctlOf(int rs, int wc) { return rs | wc; }
// 判断运行状态大小
private static boolean runStateLessThan(int c, int s) { return c < s; }
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
// 判断是否RUNNING状态
private static boolean isRunning(int c) { return c < SHUTDOWN; }

现在关于整个execute的执行逻辑、判断条件基本理解,所以我们需要理解它是如何添加任务,如何让线程运行起来,执行完任务之后如何去等待继续去执行新任务。
以下是addWorker的源码,我们分为两部分分析,先看CAS判断:

retry:
for (;;) {
    int c = ctl.get();
    // 获取当前运行状态
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    //这里就是我们之前说的,如果运行状态是非RUNNING并且当前任务是非空,是无法创建新线程的
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;

    for (;;) {
        // 获取当前未终止的线程数量
        int wc = workerCountOf(c);
        // 判断数量是否超过限制(计数最大限制、核心线程限制、最大线程数量限制)
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            return false;
        // 比较并增加1,如果成功,那么结束判断,进入创建线程逻辑
        if (compareAndIncrementWorkerCount(c))
            break retry;
        // 重新判断运行状态,如果有变化,则重新进入retry循环,否则继续当前循环
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}

以上是CAS算法判断是否能够新创建线程,如果成功break出retry循环,那么就进入创建线程的逻辑。
然后我们在分析线程创建:

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    // 创建Worker(就是我们的线程),这里Worker中会带一个Thread对象与它(Worker)做双向引用,后续分析Worker的工作原理
    // firstTask就是我们真正的需要执行的任务
    w = new Worker(firstTask);
    // 这就是Worker中的Thread对象
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                // 将新创建的Worker加入HashSet中
                workers.add(w);
                // 记录至今最大的workers数量
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        // 如果创建成功,则启动Worker中的线程,这里很重要,这也是Worker的启动,帮助我们执行任务的关键,需要结合Worker初始化的源码分析,才能更好理解
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    // 如果创建失败,那么做失败处理
    if (! workerStarted)
        addWorkerFailed(w);
}
// 返回是否成功的标志
return workerStarted;

上面就是创建Worker(线程)的逻辑,比较关键的是Worker的初始化和启动,现在我们继续分析Worker的源码,理解它是如何与Thread做绑定,然后帮助我们执行任务的:

Worker(Runnable firstTask) {
    // 这里是标志Worker状态
    setState(-1); // inhibit interrupts until runWorker
    // 需要执行的任务
    this.firstTask = firstTask;
    // 创建Thread,并且Thread中的Runnable对象是Worker本身
    this.thread = getThreadFactory().newThread(this);
}

Worker其实也是实现了Runnable接口,从构造函数我们可以知道,在初始化Worker的时候,将本身和它的Thread对象进行双向引用,再结合addWorker中启动Worker中Thread的逻辑,就明白了,t.start实际上是执行了Worker中的run方法,然后我们继续分析Worker中的run方法,它执行了runWorker方法:

final void runWorker(Worker w) {
    // 获取当前线程,也就是Worker中的Thread
    Thread wt = Thread.currentThread();
    // 获取Worker需要执行的任务(也就是我们实际的任务)
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 改变Worker的状态
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 判断当前任务是否为null,如果是空,则去队列获取任务
        while (task != null || (task = getTask()) != null) {
            // 改变Worker的状态
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 执行任务之前,可以自己重写该方法,从而做响应的预处理
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行我们实际的任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                     // 执行任务之后,可以自己重写该方法,从而在结束之后做相应处理
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 当前Worker至今完成的所有任务总和
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 由于获取任务超时终止当前Worker,这里对Worker做终止处理
        processWorkerExit(w, completedAbruptly);
    }
}

以上是Worker的工作原理,其中最主要的是getTask方法,这里就是保证它不退出一直WAITING或者TIMED_WAITING,等待任务入队的关键(这里有个面试题哟,面试官问过我,当线程执行完任务之后会处于什么状态,很多人可能认为会处于阻塞状态,因为BlockingQueue嘛,但是是不对哦,BlockingQueue中take方法是用了LockSupport.park来使当前线程进入WAITING,而poll(timeout, timeunit)方法则是用LockSupport.parkNanos使线程进入TIMED_WAITING!这里可以了解ReentrantLock;不过我们根据线程状态改变的条件也能推断,这里状态改变的情况);然后我们再来看一下getTask的源码:

/**
 * 我们需要明白一点,当getTask方法返回null的时候,就表示当前调用Worker需要终止
 */
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 如果运行状态为SHUTDOWN并且队列为空或者运行状态是STOP,那么终止Worker
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // 这里就是我们之前说的,那个比较有意思的属性,是否让核心线程超时
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 如果线程数量大于最大数量或者已经超时 并且 线程池中有线程或者队列为空,则尝试结束当前Worker
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 判断是否需要有超时获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

上面就是getTask的逻辑,然后主要就是在

Runnable r = timed ?
    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    workQueue.take();

如果timed是true,也就是当前线程数量大于核心数量或者是我们把allowCoreThreadTimeOut属性设置为true,那么就使用poll超时获取,否则使用take一直等待获取任务;所以其实对于线程池,核心线程也是有可能被销毁的!

到这里,我们基本将线程池的整个工作逻辑都串起来了,也基本明白它是如何帮助我们执行任务;但是这仅仅是主干逻辑,还有很多细节,比如它的shutdown处理、terminal处理以及Worker的状态改变等等。所以看似简单,但是要吃透,还需要更深入的理解。

如果有不正确的地方,请帮忙指正,谢谢!

相关文章

网友评论

    本文标题:线程池原理

    本文链接:https://www.haomeiwen.com/subject/rdcgyftx.html