在Java中一般我们会通过Executors来创建线程池,一般有四种:
- newCachedThreadPool
- newFixedThreadPool
- newSingleThreadExecutor
- newScheduledThreadPool
这四种创建方式其实万变不离其宗,最底层都是使用的ThreadPoolExecutor,我们先来看下ThreadPoolExecutor的构造函数。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
总共有5个参数:
- corePoolSize 是线程池的初始大小,可理解为desiredSize或者最小线程数;
- maximumPoolSize 是最大可申请的线程数;
- keepAliveTime 是当线程数大于corePoolSize时如果现成出于idle空闲状态,那么它将在keepAliveTime时间后被清理;
- TimeUnit 就是keepAliveTime的时间单位,比如秒,微秒等;
- BlockingQueue 是一个阻塞队列,如果熟悉生产者消费者模式或者PV操作那么对于它应该不陌生,这里下面会做进一步介绍;
- ThreadFactory 用于创建线程,一般用默认实现;
- RejectedExecutionHandler 是在线程池无法创建新线程并且队列达到最大限制时的处理策略,默认是
AbortPolicy
,由于这里有多种策略,并且不属于重点内容,不多做介绍;
接下来我们挨个看下Executors里这几种构造函数到底是如何构造线程池的。
newCachedThreadPool
缓存线程池,可以理解为弹性线程池,个人认为比较适用于有特殊时段的高并发请求的场景。
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
可以看到在线程池里默认是没有线程的,但却可以申请无限多的线程(只要系统资源够用),如果现成空闲60秒后会被自动回收,最后那个SynchronousQueue是一个阻塞队列,长度永远为0,peek等操作永远返回null,如果调用take或者put的话会被阻塞,直到有相应的生产者或者消费者,这个特性很关键,因为在判断是否需要创建新线程的时候回用到BlockQueue里的offer方法,如下:
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
首先判断目前的线程数是否小于corePoolSize,如果小于就直接新建一个,否则判断当前线程池是否仍在运行,这里说下ctl.get()
这个方法,ctl
是一个AtomicInteger
类型,ctl.get()
返回一个int类型,int类型一共4个字节,最高3位存储线程池的状态,后29位代表线程个数,一共有5种运行状态:
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
如果仍在运行状态,则对队列执行offer操作,看能否插入成功,
boolean offer(E e);
成功返回true
,失败返回false
。好,关键的来了,SynchronousQueue的特性是长度永远为0,并且它的offer方法默认是立刻返回,这是什么意思呢?就是说在执行offer的时候如果没有消费者等待则立刻返回false,表示插入失败。
此时会走到最后的if-else
执行addWorker
,addWorker会判断是否达到maximumPoolSize
,没有的话就新建,如果无法再申请资源或达到上限则拒绝操作执行reject(command)
。
这是newCachedThreadPool为何可以不停地申请资源创建线程的原因。
newFixedThreadPool
顾名思义,就是固定大小的线程池,老规矩,先看构造函数:
new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
可以看到corePoolSize
和maximumPoolSize
是一样的,不可伸缩,超时时间可以忽略了,因为不会再新建线程,最后的BlockingQueue用的是LinkedBlockingQueue
,同学们注意了,这个队列和前面提到的SynchronousQueue
有啥区别呢?这很关键,因为当决定是否新建线程还是放到等待队列里时就是看这个queue还能否插入,我们来看下LinkedBlockingQueue
的实现:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
可以看到默认队列长度是最大值,由于offer
方法太复杂所以就不放出来了,但效果是判断是否达到队列最大长度,如果没有那么就把任务先加入队列。
所以这就是区别,在没有可用线程的时候线程池会不断地把任务放到LinkedBlockingQueue
中等待可用线程。
newSingleThreadExecutor
单线程线程池,如果所执行的任务抛出异常或者终止,该线程池会自动新建一个线程保证服务的可用。好处是可以保证任务是串行的,不用考虑并发同步等复杂的问题。
new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
配置和上一个类似,只是线程数限制为1。FinalizableDelegatedExecutorService
对ExecutorService做了一层封装,这里不详述了。
newScheduledThreadPool
用于执行定时任务
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
可以看到这里使用了DelayedWorkQueue
,在提交任务的时候
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
这里使用了DelayedWorkQueue
的特性,在取操作的时候会block直到delay到期。由于本次没有详细研究DelayedWorkQueue
所以不多做介绍,有兴趣的同学可以自己读下源码。
在《Effective Java II》和《Java并发编程实践》里都强调了在执行定时任务的时候最好使用线程池,而不要用Timer,因为线程池可以很好的维护线程状态,确保不会异常退出造成混乱。
总结
在使用线程池的时候最好使用Executors封装好的方法,如果想要定制化可以参考ThreadPoolExecutor
的构造函数,自定义参数,切记选择合适的队列类型。相信在了解了底层实现原理后对线程池的控制会更有自信。
网友评论