美文网首页Java-多线程
线程池工作原理分析

线程池工作原理分析

作者: kevinsEegets | 来源:发表于2020-06-30 13:57 被阅读0次

现实案例
某工艺品加工厂有3台加工机器用来生产订单所需的产品,正常 情况下3台机器能够保证所有订单按需声场完毕,如下图所示:


CgoCgV6n2fKAChGnAKAv_edvYJg266.gif

如果订单量突然大幅增加,3台机器已经处于满负荷状态,一时间无法完成新增订单任务.那么此时应该怎么办?解决的办法就是硬着头皮接下新来的订单,但是会将新来的订单暂存在仓库中,当有加工机器闲置下来之后,再用来生产仓库中的订单,如下图所示:


![111.gif](https://img.haomeiwen.com/i18406403/ad13ce77023de2de.gif?imageMogr2/auto-orient/strip)

如果订单量持续快速增长,导致仓库也存储满了.怎么办呢?正常情况下加工厂肯定会通过购买新的加工机器来满足订单需求,如下图所示:


222.png

有了仓库和新购买的加工机器加持,加工厂业务还是能够正常流转.但是当某些极端情况发生,比如双十一搞活动之后订单爆单了,这时新增的订单任务脸仓库以及所有的加工机器都已经无法容纳,说明加工厂已经不能接受新的订单任务了,因此只能拒绝所有新的订单.


CgoCgV6n2jmAAbSOAAjqRyhtMHc301.png

线程池的工作流程同上面描述的加工厂完成订单任务非常相似,并且在线程池中的构造器中,通过传入的参数可以设置默认有多少台加工机器、仓库的大小、可以购买新的加工机器的最大数量等等.

线程池结构

Ciqah16n2kaAO4ocAAE8gZsl0_8253.png

从上图可以大致看出,在线程池内部主要包含以下几部分:

  • worker集合:保存所有的核心线程和非核心线程(类似加工厂的加工机器), 其本质就是一个HashSet.
private final HashSet<Worker> worker = new HashSet<>();
  • 等待任务队列: 当核心线程个数达到corePoolSize时,新提交的任何会被先保存在等待任务队列中(类似加工厂中的仓库), 其本质就是一个阻塞队列BlockingQueue.
 private final BlockingQueue<Runnable> workQueue;
  • ctl: 是一个AtomicInteger类型, 用于记录池中线程的数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }  //计算当前运行状态
    private static int workerCountOf(int c)  { return c & CAPACITY; }   //计算当前线程数量
    private static int ctlOf(int rs, int wc) { return rs | wc; }        //通过状态和线程数生成`ctl`

线程池主要有一下集中运行状态:

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS; //默认状态,接收新任务并处理排队任务;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;  //不接收新任务,但处理排队任务,调用shutdown()会处于该状态;
    private static final int STOP       =  1 << COUNT_BITS;  //不接收新任务,也不处理排队任务,并中断正在运行的任务,调用shotdownNow()会处于该状态;
    private static final int TIDYING    =  2 << COUNT_BITS; //所有任务都已终止,workerCount为零时,线程会转换到TIDYING状态,并将运行terminate()方法;
    private static final int TERMINATED =  3 << COUNT_BITS; //terminate()运行完成后线程池转为此状态

参数分析

线程池的构造器如下:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

构造参数说明。

corePoolSize:表示核心线程数量。
maximumPoolSize:表示线程池最大能够容纳同时执行的线程数,必须大于或等于 1。如果和 corePoolSize 相等即是固定大小线程池。
keepAliveTime:表示线程池中的线程空闲时间,当空闲时间达到此值时,线程会被销毁直到剩下 corePoolSize 个线程。
unit:用来指定 keepAliveTime 的时间单位,有 MILLISECONDS、SECONDS、MINUTES、HOURS 等。
workQueue:等待队列,BlockingQueue 类型。当请求任务数大于 corePoolSize 时,任务将被缓存在此 BlockingQueue 中。
threadFactory:线程工厂,线程池中使用它来创建线程,如果传入的是 null,则使用默认工厂类 DefaultThreadFactory。
handler:执行拒绝策略的对象。当 workQueue 满了之后并且活动线程数大于 maximumPoolSize 的时候,线程池通过该策略处理请求。

注意:当 ThreadPoolExecutor 的 allowCoreThreadTimeOut 设置为 true 时,核心线程超时后也会被销毁。

流程解析

当我们调用execute或者submit,将一个任务提交给线程池,线程池收到这个任务请求后,有以下集中处理情况:

  • 1: 当前线程池zh9ong运行的线程数量还没有达到corePoolSize大小时,线程池会创建一个新线程执行提交的任务,无论之前创建的线程是否处于空闲状态.
    CODE
  
public class LessThanCoreCount {
    public static void main(String[] args) {
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);  //创建一个固定数目,并且可重用的线程

        for (int i = 1; i < 5; i++) {
            final int taskId = i;
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程: " + Thread.currentThread().getName() + "  正在执行 task: " + taskId);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        fixedThreadPool.shutdown();
    }
}

上面代码创建了3个固定数量的线程池,每次提交的任务耗时100毫秒, 每次提交任务之前都会延迟2秒,保证线程池中的工作线程都已经执行完毕,执行效果如下:

线程: pool-1-thread-1  正在执行 task: 1
线程: pool-1-thread-2  正在执行 task: 2
线程: pool-1-thread-3  正在执行 task: 3
线程: pool-1-thread-1  正在执行 task: 4

可以看出虽然线程1和线程2都已经执行完毕并且处于空闲状态,但是线程池还是会尝试创建新的线程去执行新提交的任务,直到线程数量达到corePoolSize.

  • 2: 当前线程池中运行的线程数量已经达到corePoolSize大小时,线程池会把任务加入到等待队列中,直到某一个线程空闲了, 线程池会根据我们设置的等待队列规则,从队列中取出一个新的任务执行.
    CODE
   public static void main(String[] args) {
        ThreadPoolExecutor fiexdThreadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
        for (int i = 1; i < 5; i++) {
            final int taskId = i;
            fiexdThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程: " + Thread.currentThread().getName() + "  正在执行 task: " + taskId);
                    try {
                        Thread.sleep(4000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.out.println("此时等待队列中有 " + fiexdThreadPool.getQueue().size() + "个元素");
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

上述corePoolSize核心线程数为2,代码提交的任务耗时4秒,因此前2个任务会占用线程池中的2个核心线程.此时有新的任务提交给线程池时, 任务会被缓存到等待队列中,结果如下:

此时等待队列中有 0个元素
线程: pool-1-thread-1  正在执行 task: 1
此时等待队列中有 0个元素
线程: pool-1-thread-2  正在执行 task: 2
------------------------------------------------------------------------------------------------
此时等待队列中有 1个元素
此时等待队列中有 2个元素
线程: pool-1-thread-1  正在执行 task: 3
线程: pool-1-thread-2  正在执行 task: 4

可以看到虚线以上通过2个核心线程直接执行提交的任务,因此等待队列中的数量为0,而虚线以下表明和核心线程正在被占用,新提交的任务都被放入等待队列中, 等线程1和线程2空闲时再此处理等待队列中剩余的线程元素

  • 3: 如果线程数大于corePoolSize数量但是还没有达到最大线程数maximumPoolSize,并且等待队列已满,则线程池会创建新的线程来执行任务.
    CODE
     public static void main(String[] args) {
        //核心线程为2,最大线程数为10, 等待队列长度为2, 线程池的线程空闲时间为0L
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 10, 
                0L, 
                TimeUnit.MILLISECONDS, 
                new LinkedBlockingDeque<Runnable>(2));
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程: " + Thread.currentThread().getName() + "正在执行 task: " + taskId);
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.out.println("此时等待队列中有 " + threadPoolExecutor.getQueue().size() + " 个元素");
        }
        threadPoolExecutor.shutdown();
    }

上述代码创建了一个核心线程数为2, 最大线程数为10, 等待队列长度为2,线程池线程空闲时间为0秒的线程池,执行效果如下:

此时等待队列中有 0 个元素
线程: pool-1-thread-1正在执行 task: 0
此时等待队列中有 0 个元素
线程: pool-1-thread-2正在执行 task: 1
----------------------------------------1-------------------------------------------------------------
此时等待队列中有 1 个元素
此时等待队列中有 2 个元素
此时等待队列中有 2 个元素
线程: pool-1-thread-3正在执行 task: 4
----------------------------------------2-------------------------------------------------------------
线程: pool-1-thread-1正在执行 task: 2
线程: pool-1-thread-2正在执行 task: 3

解释说明:

  • 虚线1表明线程池数量已经达到corePoolSize核心线程

  • 虚线2表明等待队列已满

  • thread-3标示等待队列满之后创建新的线程执行任务

  • 4: 如果提交的任务,无法被核心线程直接执行,又无法加入等待队列,又无法创建非核心线程直接执行,线程池将根据拒绝处理器定义的策略处理这个任务,比如在ThreadPoolExecutor中,如果没有为线程池设置RejectedExecutionHandler拒绝策略,此时线程池会抛出RejectedExecutionException异常,即线程池拒绝接受这个任务.
    CODE

    public static void main(String[] args) {
        //核心线程为2,最大线程数为3, 等待队列长度为2, 线程池的线程空闲时间为0L
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 
                0L, 
                TimeUnit.MILLISECONDS, 
                new LinkedBlockingDeque<Runnable>(2));
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            threadPoolExecutor.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程: " + Thread.currentThread().getName() + "正在执行 task: " + taskId);
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.out.println("此时等待队列中有 " + threadPoolExecutor.getQueue().size() + " 个元素");
        }
        threadPoolExecutor.shutdown();
    }

修改最大线程数为3, 并提交6次任务给线程池(提交的任务线程(6) > 最大线程数(3)+核心线程数(2)), 执行效果如下:

此时等待队列中有 0 个元素
线程: pool-1-thread-1正在执行 task: 0
此时等待队列中有 0 个元素
线程: pool-1-thread-2正在执行 task: 1
此时等待队列中有 1 个元素
此时等待队列中有 2 个元素
此时等待队列中有 2 个元素
线程: pool-1-thread-3正在执行 task: 4
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.eegets.rxjavademo.threadpool.LessThanCoreCount$1@511d50c0 rejected from java.util.concurrent.ThreadPoolExecutor@60e53b93[Running, pool size = 3, active threads = 3, queued tasks = 2, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at com.eegets.rxjavademo.threadpool.LessThanCoreCount.main(LessThanCoreCount.java:98)
线程: pool-1-thread-1正在执行 task: 2
线程: pool-1-thread-2正在执行 task: 3

程序会报异常RejectedExecutionException, 拒绝策略是线程池的一种保护机制,目的就是当这种无节制的线程资源申请发生时,拒绝新的任务来保护线程池,默认拒绝策略会直接报异常,但是JDK中一共提供了4种保护策略, 如下:

CgoCgV6n2y6AVJ4mAAMbNVvmTTM118.png

实际上拒绝策略都是实现自接口 RejectedExecutionException,开发者也可以通过实现此接口,定制自己的拒绝策略。

文章来源: https://kaiwu.lagou.com/course/courseInfo.htm?courseId=67#/detail/pc?id=1865

相关文章

网友评论

    本文标题:线程池工作原理分析

    本文链接:https://www.haomeiwen.com/subject/jgtffktx.html