美文网首页
线程池源码-任务执行

线程池源码-任务执行

作者: 春娇的志明 | 来源:发表于2020-03-09 22:31 被阅读0次

在上一篇文章 线程池源码-任务提交 中,我主要讲解了:

  • 线程池初始化参数
  • 线程池处理任务的整体流程
  • worker 线程的创建

小伙伴们可能对线程池任务执行机制有疑问,今天我们就来探索以下几个问题:

  • worker 线程的启动方式
  • 线程池如何区分核心线程,非核心线程
  • 线程池中的线程如何复用

worker 对象

前面提到,在任务提交之后,线程池可能会创建 worker 对象去处理该任务。

private boolean addWorker(Runnable firstTask, boolean core) {
                // 省略代码
        Worker w = null;
        try {
            // 1.创建 worker 对象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                  // 2.更新 worker 列表,代码省略
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 3.开启 worker 线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 4.如果 worker 线程开启失败,则将该 worker 对象从列表中移除
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

通过上面这段代码,可知:

  • worker 对象内部持有一个 Thread 类型的成员变量 t
  • 成功将 worker 更新到线程池 worker 列表之后,会启动它的线程 t

开启的这个线程,都做了些什么呢?接着往下。


来看一下 Worker 这个类的声明和构造方法 :

  • Worker 类实现了 Runnable 接口,意味着它可以作为一个任务放到线程中执行的
  • 在初始化成员变量 thread 时,把自身作为一个任务放到了 thread 中
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
            // 省略代码     
        final Thread thread;
  
        Runnable firstTask;
  
            Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    }

看到这也许大家都明白了,Worker 的成员变量 t 线程开启之后,又会反过来执行自身,这便是 worker 线程的启动方式。

image

worker 对象的 run() 方法中都做了什么,这是今天的重点——线程池中的线程如何复用。

线程复用

终于看到了期待已久的 while 循环,省略了大量代码,为了让流程看起来更清晰。

    public void run() {
        runWorker(this);
    }

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        try {
            while (task != null || (task = getTask()) != null) {
                // 省略代码
                try {
                    // 执行任务前的操作
                    beforeExecute(wt, task);
                    // 执行提交的任务
                    task.run();
                    // 执行任务后的操作
                    afterExecute(task, thrown);
                } finally {
                    // 将执行完的任务置空,方便后续线程处理任务队列中的任务
                    task = null;
                }
            }
        } finally {
          // 省略代码
        }
    }

可以看到在执行任务的前后都加入了 hook 操作,后面会讲到线程池提供给外部的可扩展接口,其中就包括 beforeExecute(Runnable r, Throwable t), afterExecute(Runnable r, Throwable t) 两个方法。


猜测 while 循环的条件是线程复用的关键,条件成立只需满足下面任意一种情况:

  1. 当前 task 不为空
  2. getTask() 方法返回的结果不为空

第一种情况容易理解,如果当前提交的任务涉及创建新的 worker 时,task 就不为空,并且作为这个 worker 的 firstTask 被执行。


第二种情况就需要看一下 getTask() 方法代码,发现控制线程「死活」的逻辑都在这里。

主要操作如下:

  • 如果线程池状态 SHUTDOWN,并且任务队列为空,停止当前 worker 线程,线程池停止处理任务
  • 如果线程池状态 >= STOP,停止当前 worker 线程,线程池停止处理任务
  • 判断线程是否会受 keepAliveTime 超时时间的影响,从这里也可以看出,线程池是根据线程数量来定义核心线程的,如果线程数量 >= 核心线程数,则认为超出的线程是非核心线程
  • allowCoreThreadTimeOut = true 意味着核心线程过了 keepAliveTime 也会被干掉
  • 如果当前线程受 keepAliveTime 影响,则调用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方法从队列中获取任务,如果超时,则返回 null,并将 timeOut 变量设为 true,那么在下一次 timeOut 时间判定时,就会认为该线程过了超时时间,可能会尝试将其停止
  • 如果不受 keepAliveTime 影响,则直接阻塞式获取任务,直到队列中存在任务,则返回该任务,以此来达到线程的复用

流程图:

image

源码阅读笔记:

private Runnable getTask() {
    boolean timedOut = false; 

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 如果线程池状态 SHUTDOWN,并且任务队列为空,停止当前 worker 线程
        // 如果线程池状态 >= STOP,停止当前 worker 线程,线程池停止处理任务
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 判断线程是否会受 keepAliveTime 超时时间的影响
        // allowCoreThreadTimeOut = true 意味着核心线程过了 keepAliveTime 也会被干掉
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                
        // 再次判断是否可以停止当前线程,其中就包含 timed 条件判定
        // 如果 timed 条件满足,意味着只要任务队列空了,或者有足够的线程应付队列中的任务,就会停止当前线程
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果 timed = true,就意味着当前线程会受 keepAliveTime,因此调用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方法从队列中获取任务,如果超时,则返回 null,并将 timeOut 变量设为 true,那么在下一次 timeOut 时间判定时,就会认为该线程过了超时时间,可能会尝试将其停止
            // 如果 timed = false,则不管 keepAliveTime 时间,阻塞式获取任务,直到队列中存在任务,则返回该任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

总结

通过今天这篇文章,我们了解了:

  • worker 线程的启动方式
  • 线程池如何区分核心线程,非核心线程
  • 线程池线程复用机制

到这里,相信大家对线程池又有了一个新的认识,如果觉得文章对你有帮助,欢迎留言点赞。

相关文章

  • 线程池源码分析

    线程池源码分析 在线程池执行任务的时候,都是主线程(可能是tomcat线程),将任务放到线程池中,再执行线程池的e...

  • 线程池源码-任务执行

    在上一篇文章 线程池源码-任务提交 中,我主要讲解了: 线程池初始化参数 线程池处理任务的整体流程 worker ...

  • java线程池

    线程VS线程池 普通线程使用 创建线程池 执行任务 执行完毕,释放线程对象 线程池 创建线程池 拿线程池线程去执行...

  • 简单分析ThreadPoolExecutor回收工作线程的原理

    最近阅读了JDK线程池ThreadPoolExecutor的源码,对线程池执行任务的流程有了大体了解,实际上这个流...

  • 线程池

    线程池种类 ThreadPoolExecutor 基础线程池 线程执行任务过程 当前执行线程数 < corePoo...

  • Java线程池任务执行流程

    线程池任务执行流程: 当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中...

  • 线程池解析第二章-线程池源码问题总结

    线程池解析第一章-源码解析线程池解析第二章-线程池源码问题总结 上篇文章分析了关于线程池在源码层面对提交任务的处理...

  • 线程池

    线程池ThreadPoolExecutor 运行步骤:1,任务进来 新建核心线程执行任务 直到核心线程池占满 ...

  • 线程池

    什么是线程池? 线程池本质上是一种对象池,用来对线程进行管理 在执行任务时需要将线程从线程池取出 在任务执行完成后...

  • 分析Java线程池Callable任务执行原理

    Java并发编程源码分析系列: 分析Java线程池的创建 分析Java线程池执行原理 上一篇分析了线程池的执行原理...

网友评论

      本文标题:线程池源码-任务执行

      本文链接:https://www.haomeiwen.com/subject/petvdhtx.html