Java线程池必知必会
知其然更要知其所以然。同学们在使用线程池的过程中,有没有理解我们为什么要这么做,怎么配置线程池才能有更好的效果呢?
为什么要使用线程池
线程池实际上就是一种多线程的使用方式,从设计上来讲,是一个生产消费者模型。有同学说,直接在项目中直接new一个thread,不是非常爽吗,一行代码的事儿,为什么还要专门去开一个线程池来管理,真麻烦。其实不然,线程池带来的好处可是很多的。
降低资源消耗
线程池通过重复使用已创建的线程执行任务,而不必每次都创建新的线程,减少系统资源损耗。
线程实现一般有三种方式。一个是内核线程实现,一个是用户线程实现,另外一种就是用户线程+内核线程混合实现。
java采用的是使用内核线程的高级接口——轻量级进程(LIGHT WEIGHT PROCESS,LWP)每个LWP都对应着一个内核线程,为1:1的关系。简单来说,一个java的线程就对应一个内核线程。
这意味着,如果要对线程进行创建、阻塞、销毁,都会进行系统调用。这种代价是很高的,系统需要进行内核态和用户态的切换。同时,LWP需要消耗一定的内核资源(像线程栈空间等资源),所以数量上也受限制。自然也就不能创建过多的线程。
提高响应速度
上面的部分其实已经告诉我们这一点了,因为在重复使用已经创建好的线程时,避免了线程的创建时间,所以可以提高响应速度。
提高线程的可管理性
线程池本身提供了很多的对线程的管理操作。比如对线程数量、任务数量、线程名称、拒绝策略等等,都可以进行管理。而new Thread()本身很难做到这些。
比如new Thread()的线程执行任务速度非常慢,导致多次创建线程,存在非常多正在执行任务的线程,那么系统资源将被消耗殆尽。线程池可以通过上述的管理方法,将此问题kill掉。
比如在应用程序中,两个服务资源消耗占比不一样,那么可以通过线程池进行更合理的资源分配。
线程池的执行过程是怎么样的
在讨论线程池如何配置之前我们有必要先看一下线程池的创建和执行过程,方便我们对线程池的配置有更深入的理解。
下面是线程池的创建方法,可以看到都是些参数校验、变量赋值,没有其他复杂操作。
public ThreadPoolExecutor(int corePoolSize,// 核心线程数量
int maximumPoolSize,// 线程池中最大的线程数量
long keepAliveTime,// 线程存活时间
TimeUnit unit,// 上面时间的单位
BlockingQueue<Runnable> workQueue,// 任务队列。存储任务
ThreadFactory threadFactory,// 线程工厂,可以用工厂来创建线程
RejectedExecutionHandler handler// 拒绝策略
) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
我们在创建了线程池之后,就会给线程池提交任务,让线程池异步执行。那么这个任务究竟经过了怎样的过程才得道成仙的呢?我们结合线程池的源码进行分析。我在下面的方法中添加了中文注释,同学们可以参考。不想看源码的可以直接看线程池的执行流程图。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 如果当前存在的工作线程数量小于核心线程数量,先尝试添加一个核心工作线程。成功就直接返回
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果上述添加没成功,则尝试添加任务到工作队列去。添加成功了之后需要重新检查一下当前线程池状态。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池已经不是运行状态,且删除任务成功,那么执行拒绝任务策略、
if (! isRunning(recheck) && remove(command))
reject(command);
// 否则,检查当前的工作线程是不是已经为空了,如果是,需要再次添加线程。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果添加到队列也失败了,那么尝试开启一个新的非核心线程执行任务。如果未成功,则执行拒绝任务策略。
else if (!addWorker(command, false))
reject(command);
}
线程池执行过程
线程池的代码实践
代码分为任务类、线程配置类、测试类。熟练的同学可以跳过~
任务类如下:
@Slf4j
@Data
@AllArgsConstructor
public class Task implements Runnable {
private int taskNumber;
@Override
public void run() {
log.info("开始, {}", Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("结束, {}", Thread.currentThread().getName());
}
}
线程池配置如下:
@Configuration
@Slf4j
public class ThreadPoolConfiguration {
/**
* 核心线程数 20
*/
@Value("${custom-core-pool-size}")
private int corePoolSize;
/**
* 最大线程数 30
*/
@Value("${custom-maximum-pool-size}")
private int maximumPoolSize;
/**
* 线程存活时间 60s
*/
@Value("${custom-keep-alive-time}")
private long keepAliveTime;
/**
* 线程队列中的最大数量 100
*/
@Value("${custom-thread-queue-number}")
private int threadQueueNumber;
@Bean
public ThreadPoolExecutor getThreadPoolExecutor() {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
this.getBlockingQueue(), this.getThreadFactory(), this.getRejectedExecutionHandler());
}
@Bean
public BlockingQueue<Runnable> getBlockingQueue() {
return new ArrayBlockingQueue<>(threadQueueNumber);
}
/**
* 线程工厂
*
* @return
*/
@Bean
public ThreadFactory getThreadFactory() {
return new ThreadFactory() {
private AtomicInteger seq = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("测试—thread-" + seq.incrementAndGet());
return thread;
}
};
}
/**
* 拒绝策略
*
* @return
*/
@Bean
public RejectedExecutionHandler getRejectedExecutionHandler() {
return (runnable, executor) -> {
if (!executor.isShutdown()) {
log.warn("数据过多,模拟存入DB,数据:{}", runnable);
}
};
}
}
测试类如下:
@RunWith(SpringRunner.class)
@SpringBootTest
public class ThreadPoolServiceImplTest {
@Resource
private ThreadPoolExecutor threadPoolExecutor;
@Test
public void checkThread() {
for (int i = 0; i < 200; i++) {
threadPoolExecutor.execute(new Task(i));
}
try {
TimeUnit.SECONDS.sleep(40);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
线程池参数怎么配置才合理
线程池有这么多参数,应该如何配置呢?
- 线程数量
一般我们会将任务分为CPU密集任务和IO密集型任务。(CPU密集任务就是计算量比较大的任务,如一些复杂值计算。IO密集型任务就像数据库连接请求发出后,等待数据库返回数据)
CPU密集型任务,适合配置尽量小的线程数,比如配置N+1(N代表CPU数量)个线程,Java中可以通过Runtime.getRuntime().availableProcessors()方法获取cpu个数【使用docker的同学可能需要注意下这个配置】
IO密集型任务,适合分配较多的线程,具体的数量可以进行测试一下。理论上IO等待时间越长,线程数越多,那么CPU资源才能更好的被利用起来。
- 存活时间
这个参数可以根据业务的峰值来进行处理,如果任务比较多,执行时间比较短,可以尝试将线程存活时间适当的放大一些。提高线程的复用率。
此参数一般用来控制非核心线程的存活,但是如果通过调用java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut方法,也可以触发核心线程的超时回收。
- 任务队列
下面是常用的几个队列类型。同学们可以在使用java.util.concurrent.Executors的new_XX_ThreadPool的方法中看到。
-
ArrayBlockingQueue 基于数组的有序有界(FIFO)阻塞队列。应用较少。
-
LinkedBlockingQueue 基于链表的有序无界(FIFO)阻塞队列。吞吐量一般要高于ArrayBlockingQueue。为正常任务的选择,
-
SynchronousQueue 不存储元素的阻塞队列,每个插入操作必须要等待别的线程进行消费操作才可以。否则将一直处于阻塞状态。适用于一些执行周期短的任务。吞吐量一般高于LinkedBlockingQueue。
-
PriorityBlockingQueue 有优先级的无界阻塞队列。比如vip的优先级高些?
- 线程工厂
上面的demo中给大家展示了手写的创建方式。也可以通过guava包下的工具类一行搞定。命名非常重要,我们可以对不同业务的线程池进行不同步的命名,这样方便监控和异常排查。
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
- 拒绝策略
ThreadPoolExecutor里面定义了几个静态内部类,是JDK的提供的几种实现。如果JDK提供的这几种不能满足需要,还可以像我上面的demo一样,实现自己的拒绝策略。
-
AbortPolicy 直接抛出异常。
-
CallerRunsPolicy 用调用者执行当前的任务。
-
DiscardOldestPolicy 丢弃队列里头部任务,并执行当前任务
-
DiscardPolicy 啥也不干,忽略。方法为空方法体。
线程池使用过程中容易碰到的问题?
-
使用无界队列的时候,设置最大线程数是没有意义的,队列不满是不会触发新建非核心线程的,所以线程数会维持在核心线程数量。这个在流程图和源码解析上我们也可以看到。无界队列也比较容易出现问题。
-
使用线程池尽量不要使用Executors的方式,使用ThreadPoolExecutor的方式能更明确的创建线程,避免资源耗尽的风险。——《码出高效》
-
设置线程的时候按需将线程池划分开,设置合理的名称和参数。
线程池如何进行监控?
我们可以通过线程池提供的几个方法返回的属性,进行线程池的监控。下面举几个例子,类似的方法还有获取当前的核心线程数量以及线程池线程数量等等。
// 获取当前任务数量,因为是动态变化的,所以只能是一个近似值
java.util.concurrent.ThreadPoolExecutor#getTaskCount
// 获取工作队列
java.util.concurrent.ThreadPoolExecutor#getQueue
// 获取当前处理完成的任务数量
java.util.concurrent.ThreadPoolExecutor#getCompletedTaskCount
我们在线程执行的前后也可以通过继承线程池的方式,重写其beforeExecute、afterExecute和terminated方法,用于监控执行前后、关闭前的一些信息。
@Slf4j
public class CustomThreadPool extends ThreadPoolExecutor {
public CustomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public CustomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public CustomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public CustomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
log.info("beforeExecute------>thread:{},runnable:{}", t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
log.info("afterExecute------>thread:{},runnable:{}", t, r);
}
@Override
protected void terminated() {
log.info("terminated------>关闭了,线程池的任务数量:{}", getQueue().size());
}
}
网友评论