线程池:是生产者-消费者模型的典型应用。应用程序将任务交给线程池,线程池将其放入到BlockingQueue中。线程池中的已经开启的worker线程会循环监听BlockingQueue中的任务来进行消费。
1. 简述:线程池的设计理念
使用Thread开启线程:
- 执行start()方法。线程才会真正异步的执行。
- 执行run()方法,线程同步的执行。
当然,一般我们会执行start方法。
若使用Thread开启线程:每一次子线程均需要经历创建和销毁的生命周期,性能不好。
为了解决这个问题,JDK设计出线程池。
1.1 生产消费者模型
JDK的BlockingQueue,天生实现的是生产者-消费者模型,即队列满了put队列会被阻塞;队列空了后get方法会被阻塞。
work线程将开启while循环的监听
线程池中Worker线程开启后。使用while循环消费BlockingQueue中的Runnable。若while循环时得到的任务为空,那么便去关闭Worker线程。这也就是最大线程数和超过空闲时间的核心线程数能自动回收的原因。
image.png核心线程最大空闲时间:
核心线程数的空闲时间KeepAliveTime
。其实就是阻塞队列获取Running的最大等待时间。当配置了该参数且超过最大等待时间,那么返回一个null任务。而worker线程的run方法中的while循环得到null时,便会结束这个worker线程。
poll获取到的元素为空,那么便会结束监听,worker线程的run方法执行完毕。
- get:向阻塞队列获取元素,当阻塞队列空了之后,put时会被阻塞。
- poll(1,TimeUnit.MILLISECONDS):如果队列为空,阻塞一段时间后依旧为null,则返回null。
当返回null时,由“work线程将开启while循环的监听”可值,将结束worker线程的监听操作。
生产者和拒绝策略:
生产者向BlockingQueue中放入消息时,并没有使用put方法,而是使用的offer方法。
- put:向阻塞队列填充元素,当阻塞队列满了之后,put时会被阻塞。
- offer:向阻塞队列填充元素,当阻塞队列满了之后,offer会返回false。
拒绝策略:因为offer方法返回false,说明阻塞队列已经满了,若最大线程数大于核心线程数,那么继续开启Worker线程。若开启的是最大线程数的Worker线程,那么就会执行配置的拒绝策略。
image.png本质上就是一个内存级别的RabbitMQ
1.2 线程池的原理简述
-
线程池里面的线程为消费者线程(worker线程)【消费者循环去处理任务】,默认情况下,必须有任务的到来才会触发创建消费者。也可以使用
prestartAllCoreThreads
方法,在线程池初始化时,将所有的消费者全部创建出来。 -
消费者监听的是
BlockingQueue
队列(默认实现生产消费者模式),消费者调用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
方法拉取任务,若是在指定时间内获取不到任务,便会将自己注销掉(也就是结束监听)。 -
生产者向BlockingQueue存入任务时,使用的是offer方法,即队列满了之后返回false,然后判断最大线程数是否在开启Worker线程以及任务的拒绝策略。
1.3 线程池任务执行流程
- 当线程池中线程数量小于corePoolSize,每来一个任务,就会创建一个线程执行这个任务。
- 当前线程池线程数量大于等于corePoolSize,则每来一个任务。会尝试将其添加到任务缓存队列中,若是添加成功,则该任务会等待线程将其取出去执行;若添加失败(一般来说任务缓存队列已满),则会尝试创建新的线程执行。
- 当前线程池线程数量等于maximumPoolSize,则会采取任务拒绝策略进行处理。
- 当前线程池线程数量大于corePoolSize,如果某线程空闲时间超过keepAliveTime,线程将会被终止,直到不大于corePoolSize数量。如果允许为核心池(corePoolSize)中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
1.4 拒绝策略:一个也不放弃
有时候,我们不想丢弃任务,且将生产者(主线程)阻塞,那么如何实现拒绝策略?
上文我们得知,生产者放入queue时使用的是offer方法,当阻塞队列满了之后,直接返回false。我们可以采用put的方式,将主线程阻塞。不在向BlockingQueue中填充任务。
ThreadPoolExecutor executor2 = new ThreadPoolExecutor(1, 1, 100
, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(50),((r, executor) -> {
try {
//阻塞主线程
executor.getQueue().put(r);
} catch (InterruptedException e) {
}
}));
详细源码分析
JAVA并发(12)— Lock实现生产者消费者
JAVA并发(13)— ThreadPoolExecutor的实现原理(源码分析)
网友评论