一、线程池介绍
一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。
二、线程池的结构
核心线程数
线程池正常运行,活跃的线程数量
最大线程数
线程满负荷运行的线程数量,最大线程数=核心线程数+救急线程数量
急救线程的存活时间
当任务量减少,空闲的救急线程的存活时间
工作队列
存放核心线程暂时没有执行到的任务
拒绝策略
当线程池满负荷运行,并且工作队列也放满的时候,对接下来放入线程池的任务做的一个处理的策略
三、JDK线程池的运行流程
image.png创建一个线程池后,一般都会是循环的将任务放到线程池中。
1.线程池会直接创建核心线程去执行任务。
2.当核心线程数达到给定的值时候,再接收的任务会放到队列中
3.当队列中存放的任务数量已经满了的时候,线程池开启救急线程去执行任务
4.线程池里面的工作线程达到了最大线程数,并且队列放满了,并且还有新的任务放进来的时候,就会执行拒绝策略。
四、JDK四种线程池
1、固定线程数的线程池(newFixedThreadPool)
这种线程池里面的线程被设计成存放固定数量的线程,具体线程数可以考虑为CPU核数*N(N可大可小,取决于并发的线程数,计算机可用的硬件资源等)。可以通过下面的代码来获取当前计算机的CPU的核数。
int processors = Runtime.getRuntime().availableProcessors();
FixedThreadPool 是通过 java.util.concurrent.Executors 创建的 ThreadPoolExecutor 实例。这个实例会复用 固定数量的线程处理一个共享的无边界队列 。任何时间点,最多有 nThreads 个线程会处于活动状态执行任务。如果当所有线程都是活动时,有多的任务被提交过来,那么它会一致在队列中等待直到有线程可用。如果任何线程在执行过程中因为错误而中止,新的线程会替代它的位置来执行后续的任务。所有线程都会一致存于线程池中,直到显式的执行 ExecutorService.shutdown() 关闭。由于阻塞队列使用了LinkedBlockingQueue,是一个无界队列,因此永远不可能拒绝任务。LinkedBlockingQueue在入队列和出队列时使用的是不同的Lock,意味着他们之间不存在互斥关系,在多CPU情况下,他们能正在在同一时刻既消费,又生产,真正做到并行。因此这种线程池不会拒绝任务,而且不会开辟新的线程,也不会因为线程的长时间不使用而销毁线程。这是典型的生产者----消费者问题,这种线程池适合用在稳定且固定的并发场景,比如服务器。下面代码给出一个固定线程数的DEMO,每个核绑定了5个线程。
2、缓存的线程池(newCachedThreadPool)
核心池大小为0,线程池最大线程数目为最大整型,这意味着所有的任务一提交就会加入到阻塞队列中。当线程池中的线程60s没有执行任务就终止,阻塞队列为SynchronousQueue。SynchronousQueue的take操作需要put操作等待,put操作需要take操作等待,否则会阻塞(线程池的阻塞队列不能存储,所以当目前线程处理忙碌状态时,所以开辟新的线程来处理请求),线程进入wait set。总结下来:①这是一个可以无限扩大的线程池;②适合处理执行时间比较小的任务;③线程空闲时间超过60s就会被杀死,所以长时间处于空闲状态的时候,这种线程池几乎不占用资源;④阻塞队列没有存储空间,只要请求到来,就必须找到一条空闲线程去处理这个请求,找不到则在线程池新开辟一条线程。如果主线程提交任务的速度远远大于CachedThreadPool的处理速度,则CachedThreadPool会不断地创建新线程来执行任务,这样有可能会导致系统耗尽CPU和内存资源,所以在使用该线程池是,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题。
3、单个线程的线程池(newSingleThreadExecutor)
SingleThreadExecutor是使用单个worker线程的Executor,作为单一worker线程的线程池,SingleThreadExecutor把corePool和maximumPoolSize均被设置为1,和FixedThreadPool一样使用的是无界队列LinkedBlockingQueue,所以带来的影响和FixedThreadPool一样。对于newSingleThreadExecutor()来说,也是当线程运行时抛出异常的时候会有新的线程加入线程池替他完成接下来的任务。创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行,所以这个比较适合那些需要按序执行任务的场景。比如:一些不太重要的收尾,日志等工作可以放到单线程的线程中去执行。日志记录一般情况会比较慢(数据量大一般可能不写入数据库),顺序执行会拖慢整个接口,堆积更多请求,还可能会对数据库造成影响(事务在开启中),所以日志记录完全可以扔到单线程的线程中去,一条条的处理,也可以认为是一个单消费者的生产者消费者模式。
4、固定个数的线程池(newScheduledThreadPool)
相比于第一个固定个数的线程池强大在 ①可以执行延时任务,②也可以执行带有返回值的任务
五、JDK线程池的拒绝策略介绍
所有拒绝策略都实现了接口 RejectedExecutionHandler
/**
* A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
*
* @since 1.5
* @author Doug Lea
*/
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
CallerRunsPolicy策略
当线程数线程池的最大线程数并且阻塞队列已满的情况下,后到的数据会执行拒绝策略,让调用线程(提交任务的线程)直接执行此任务
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler
AbortPolicy策略(默认的拒绝策略)
拒绝新任务的加入,并抛出 RejectedExecutionException 的异常
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler
DiscardPolicy策略
拒绝的任务的处理程序,它会自动丢弃拒绝的任务。
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler
DiscardOldestPolicy策略
将已满的队列中最开始的那个任务与当前提交的任务替换掉
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler
六、创建线程池的方式
方式一:创建jdk提供的线程池
创建固定大小线程池 newFixedThreadPool()
ExecutorService threadPool = Executors.newFixedThreadPool(5);
public class Test1 {
public static void main(String[] args) {
int loopNumber = 10;
ExecutorService threadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < loopNumber; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("当前线程"+Thread.currentThread().getName());
}
});
}
threadPool.shutdown();
}
}
用以上的方式可以创建jdk自带的线程池。
方式二:自定义创建线程池
上面的这种方式其实看源码最终还是调用了ThreadPoolExecutor这个类的构造方法。
下面是jdk的源码。
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default thread factory.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
corePoolSize:核心线程数
maximumPoolSize:最大线程数
keepAliveTime:救急线程的存活时间
unit:存活时间的单位(纳秒,毫秒,微秒,秒,分钟,小时,天)
workQueue:工作队列
handler:拒绝策略(默认的拒绝策略(抛弃新加入的任务):new AbortPolicy();)
两种方式比较(推荐方式二)
说明:Executors 返回的线程池对象的弊端如下:
1)FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2)CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM。
七、线程池注意事项
线程池开启后执行完记得要关闭
如果不及时关闭线程池,会造成cpu资源的浪费,并会造成堆溢出,抛出OOM异常。
方式一:executor.shutdown();
执行此函数后线程池不再接收新任务,并等待所有任务执行完毕后销毁线程。此函数不会等待销毁完毕
方式二:executor.shutdownNow();
立即结束所有线程,不管是否正在运行,返回未执行完毕的任务列表
线程池的submit和execute方法区别
- 接收的参数不一样;
submit()可以接收 callable 和 runnable,execute()只能接收 runnable - submit()有返回值,而execute()没有;
例如,有个validation的task,希望该task执行完后告诉我它的执行结果,是成功还是失败,然后继续下面的操作。 - submit()可以进行Exception处理;
例如,如果task里会抛出checked或者unchecked exception,而你又希望外面的调用者能够感知这些exception并做出及时的处理,那么就需要用到submit,通过对Future.get()进行抛出异常的捕获,然后对其进行处理。
网友评论