线程池

作者: minehdcxy | 来源:发表于2021-09-22 01:50 被阅读0次

线程池基础概念

线程池是什么
  • 线程池是一种基于池化思想的线程管理工具
线程池解决了哪些问题
  • 降低资源消耗(操作系统创建和销毁线程,维护线程状态需要很大的开销,会影响系统性能)
  • 提高响应速度(任务被执行不需要重新创建线程)
  • 提高线程的可管理性(线程的统一分配,统一管理)
线程池类图
截屏2021-09-21 下午7.30.34.png
线程池关键变量
  • ctl是一个AtomicInteger, 用这一个变量维护线程的状态和数量, 前三位是状态后29位是线程数量
  • Worker是核心操作的对象,里面维护了一个thread和一个task, 它继承与AQS,实现了一个独占的不可重入锁,所以可以使用tryLock来判断当前线程状态。
  • corePoolSize 代表核心线程数,线程池并不是维护特定的几个线程让他们不被释放而是选择性的留下小于核心线程数的Worker的thread。
  • maximumPoolSize最大线程数,限制整个线程池里线程的最大数量,超过了就要采用拒绝策略。
  • 每次执行完任务之后所有Worker需要从消息队列里取Task,如果为空就要被销毁,从Workers这个HashSet中移除,然后交给jvm回收。线程池会判断当前的核心线程数量,然后选择性的去将一部分线程彻底wait,一部分wait一部分时间,从而实现了核心线程不会被销毁的特性。
  • 线程池任务调度流程图


    截屏2021-09-22 上午1.49.59.png
线程的状态
private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

####线程池源码分析
execute(执行任务)
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //1.检查线程个数是否小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            //2.以核心线程添加的方式加入添加任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //3.检查线程池运行状态,如果为运行状态则将任务添加到队列中,如果任务队列没有满则添加成功
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //4.重新检查线程池状态,如果是没有在运行(线程池的shutdown逻辑可能在另一个线程被调用)则将任务移除,同时拒绝该任务
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                 //5.如果当前的线程数量为0,添加一个持有空任务的工人(也就是不知道具体要做啥的消费者,后面会对这个做具体介绍)
                addWorker(null, false);
        }
        //6.说明线程池不在运行状态或者任务队列已经满了,尝试去执行addWorker()
        else if (!addWorker(command, false))
            //7.如果添加失败则执行拒绝策略
            reject(command);
    }
  • 注意:
  1. 为了保证并发高效执行任务的同时,线程池的内部逻辑不会错乱,使用了ctl(AtomicInteger)这个线程安全的变量来维护线程池状态和线程数量,在每一步操作之前需要先判断线程池状态和线程的数量是否超过阈值
addWorker(添加工人,也就是添加消费者)
  • 接上面的执行任务,在判断完线程的执行状态和线程的数量之后如果符合条件都会执行到这个方法,不符合的都采取了拒绝的策略
/**
* @param core 是否为核心线程
* @param firstTask 要执行的第一个任务,因为线程(消费者线程)是可复用的,后面这个task会被替换掉,所以这个消费者线程也会去执行其他的任务
*/
 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        1.开启一个死循环
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 2. 检查线程池状态不符合条件的都返回false(如果当前线程为shutdown状态并且,当前的?worker所持有的任务为空,但是任务队列不为空,也会返回false,表示的意思是,如果当前线程池已经shutdown了,就不允许在添加新新的消费线程了)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //3. 在开启一个死循环
            for (;;) {
                //4.检查线程数量是否大于阈值,如果没有超过的话,通过CAS的方式去改变线程池的数量,成功了则跳出循环,否则的话一直内部循环,也是为了防止并发场景出现
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //5.校验通过,初始化一个工人,Worker的内部实现逻辑下面介绍
            w = new Worker(firstTask);
            final Thread t = w.thread;
            //6.获取到工人的工作线程
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //7. 将工人添加到集合中
                        workers.add(w);
                        int s = workers.size();
                        //8.更新线程数量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                     //9.执行工人的消费线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

  • 注意
    1.这一段代码主要是使用了一个HashSet将工人维护起来,添加成功之后启动工人的消费线程
Worker实现
  • Worker构造方法和成员变量
 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            //worker的第一个任务,可以为null,上面我们说道runnable为空的情况
            this.firstTask = firstTask;
            //worker的私有线程,使用线程工厂创建,并将自己作为runnable传进去
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker. */
        public void run() {
            //工人开始消费,执行到这里其实已经是是在子线程执行了,相当于是是在worker的消费线程执行我们提交进来的runnable,这样做的目的也是为了解耦合,把我们提交进来的任务和工人的消费线程解耦
            runWorker(this);
        }
}
  • 注意:
  1. Worker继承自AbstractQueuedSynchronizer, 这样可以使Worker本身实现一把不可重入锁,同时也可以通过tryLock的方式去检测当前worker的消费线程是否正在执行。
Worker runWorker()
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            1.当任务非空时直接执行任务,否则需要通过getTask去取任务
        
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                1. 不符合执行条件触发中断,需要开发这自己去响应和判断这个中断,否则线程不会结束
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    2.触发在执行任务之前的操作,给开发这回调
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //3.执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //4.将工人从工作集中删除,导致的原因可能是任务异常或者任务正常执行结束,正常执行结束的话,如果线程池状态没有stop, 需要在替换一个worker。
            processWorkerExit(w, completedAbruptly);
        }
    }
  • 注意:
  1. 在1处的while循环处如果task为空,需要通过getTask方法去重新获取一个task,重新获取的task为空的条件有
    1)线程池的线程数量超过了maximumPoolSize
    2)线程池的状态为STOP
    3)线程池的状态为SHUT_DOWN并且任务队列为空
    4)工人的消费线程超时未取到任务,allowCoreThreadTimeOut字段决定了是否核心线程和普通线程一样会有超时逻辑(超时未取到任务也会退出),采用的逻辑是{@code allowCoreThreadTimeOut || workerCount > corePoolSize}),如果当前的线程数量小于核心线程数,在根据allowCoreThreadTimeOut配置决定时候要执行核心线程的超时策略,源码如下:
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            //如果不允许核心线程超时的话,当前线程是否采取超时策略取决于当前线程是否小于核心线程数,否则的话就都采取超时策略
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    //定时任务采用的poll,在keepAliveTime时间之后如果没有取到任务就退出
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                   //take会无限期的wait当前线程,在allowCoreThreadTimeOut为false且当前的线程数量小于核心线程数的时候,直到有新任务进入队列才能重新signal(具体要看队列的实现)。
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
shutDown
 public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            //中断所有工人的消费线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
  • 使用interruptIdleWorkers来尝试中断线程
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
  • interruptIdleWorkers中会通过tryLock来判断Worker是否正在执行
  • 与shutDownNow不同的是,shutDownNow使用的是下面的方法
private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
  • 没有判断当前工人的消费线程是否在执行,直接中断。

相关文章

  • java线程池

    线程VS线程池 普通线程使用 创建线程池 执行任务 执行完毕,释放线程对象 线程池 创建线程池 拿线程池线程去执行...

  • java----线程池

    什么是线程池 为什么要使用线程池 线程池的处理逻辑 如何使用线程池 如何合理配置线程池的大小 结语 什么是线程池 ...

  • Java线程池的使用

    线程类型: 固定线程 cached线程 定时线程 固定线程池使用 cache线程池使用 定时调度线程池使用

  • Spring Boot之ThreadPoolTaskExecut

    初始化线程池 corePoolSize 线程池维护线程的最少数量keepAliveSeconds 线程池维护线程...

  • 线程池

    1.线程池简介 1.1 线程池的概念 线程池就是首先创建一些线程,它们的集合称为线程池。使用线程池可以很好地提高性...

  • 多线程juc线程池

    java_basic juc线程池 创建线程池 handler是线程池拒绝策略 排队策略 线程池状态 RUNNIN...

  • ThreadPoolExecutor线程池原理以及源码分析

    线程池流程: 线程池核心类:ThreadPoolExecutor:普通的线程池ScheduledThreadPoo...

  • 线程池

    线程池 [TOC] 线程池概述 什么是线程池 为什么使用线程池 线程池的优势第一:降低资源消耗。通过重复利用已创建...

  • java 线程池使用和详解

    线程池的使用 构造方法 corePoolSize:线程池维护线程的最少数量 maximumPoolSize:线程池...

  • 线程池

    JDK线程池 为什么要用线程池 线程池为什么这么设计 线程池原理 核心线程是否能被回收 如何回收空闲线程 Tomc...

网友评论

      本文标题:线程池

      本文链接:https://www.haomeiwen.com/subject/kakigltx.html