引用自:http://blog.iluckymeeting.com/2018/01/06/JavaThreadPool/
线程池管理原则
- 降低系统资源消耗
- 提高系统响应速度
- 提高线程的可管理性
- 提高系统的稳定性
Java线程池实现原理
Java通过线程池的工厂类Executors创建线程池,底层最终都是实例化了ThreadPoolExecutor来创建线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
- corePoolSize
线程池中的核心线程数,即使线程当前是空闲状态,也会保留在线程池中不会被回收,除非设置了allowCoreThreadTimeOut参数
- maximumPoolSize
线程池中允许创建的最大线程池数量
- keepAliveTime
线程最大空闲时间,如果线程池中的线程数量大于核心线程数量设置corePoolSize,则此参数起作用。当空闲线程等待任务到来的时间大于这个值,线程将被回收。
- unit
指的时keepAliveTime的单位
- workQueue
如果线程池中已经没有空闲线程了,而线程数量已经达到了maximumPoolSize时,意味着不能再创建新的线程,这时新的任务将被保存到阻塞队列中等待有空闲的线程出现。
- threadFactory
创建线程的工厂类
- handler
当暂存任务的阻塞队列被填满时,新到来的任务既不能在阻塞队列里暂存等待处理,也没有空闲的线程来处理,需要要指定handler来处理这些任务
总结一下Java线程池的原理:
当有任务需要处理时,先在线程池中寻找空闲线程处理,如果没有空闲线程,而此时线程池中的线程数量没有达到corePoolSize,则创建新的线程处理任务;如果此时线程池中的线程数量已经达到了corePoolSize,则将任务放入阻塞队列中等待有线程空闲。当阻塞队列空间被用完,仍然没有空闲线程时,如果线程池中的线程数量小于等于maximumPoolSize,则创建新线程处理任务,这时如果出现了空闲线程,并且空闲时间大于keepAliveTime,则此线程会被回收。当线程池的线程数量达到maximumPoolSize,并且阻塞队列被填满,如果此时有新任务需要处理,新任务会被reject
线程池阻塞队列
等待被处理的任务将被暂存在阻塞队列中,任务需要实现Runnable接口,队列需要实现BlockingQueue接口。jdk提供了以下几种阻塞队列实现:
- ArrayBlockingQueue 基于数组的阻塞队列,元素按FIFO原则进出队列
- DelayQueue 创建于堆上的基于优先级的无界阻塞队列,队头是最先达到delay时间的元素
- LinkedBlockingQueue 基于链表的阻塞队列,元素按FIFO原则进出队列
- LinkedTransferQueue 基于链表的阻塞队列,生产者将阻塞等待消息者读取元素
- PriorityBlockingQueue 创建于堆上的带有优先级的无界阻塞队列
- SynchronousQueue 插入操作会被阻塞,直到收到别的线程取走元素的响应
任务reject策略
当线程池中的线程数已达到最大允许线程数,并且没有空闲线程,阻塞队列也已填满,这时再有新的任务到达到时,新任务会被拒绝。jdk定义了RejectedExecutionHandler接口,并提供了几个不同的实现以提供多种任务丢弃策略。
- AbortPolicy 丢弃并抛出异常
- CallerRunsPolicy 直接在调用方线程中执行,如果executor已被关闭则会被直接丢弃
- DiscardOldestPolicy 将阻塞队列中最老的任务丢弃,然后再次发起处理请求
- DiscardPolicy 直接将任务丢弃
Java线程池采用的默认策略是AbortPolicy,丢弃任务并抛异常,如果需要特别的处理策略,可以自己实现接口RejectedExecutionHandler。
Java线程池的常见使用
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
创建可以有nThreads个线程的线程池,核心线程数和最大线程数都是nThreads,阻塞队列是基于LinkedBlockingQueue的链表阻塞队列。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
创建一个线程数动态变化的线程池,被始线程数0,最大线程数是Integer.MAX_VALUE,当线程空闲超过60秒会被回收,阻塞队列是基于SynchronousQueue的,队列中并不存储元素,如果没有空闲的线程,直接创建新线程处理任务。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
创建一个线程池,当任务到达处理时间时会由线程池中的线程处理,核心线程数corePoolSize,最大线程数Integer.MAX_VALUE,当线程数超过核心线程数后,一旦线程空闲就会被回收,阻塞队列是基于DelayedWorkQueue,delay时间最小的会排在队头。
ThreadPoolExecutor实现
上面几个线程池的常见用法在底层都是通过ThreadPoolExecutor来实现的,在ThreadPoolExecutor实现中有一个重要的AtomicInteger类型的变量ctl,它存储了线程池的两个重要信息:线程数量、线程池状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
COUNT_BITS变量定义了ctl变量中有29位来表示线程数量,所以线程池的容量CAPACITY是(2^29)-1个。ctl中记录的线程数量,不仅仅是指处理激活状态的可以处理任务的线程,如果一个线程准备被回收,它已不能处理任务,但是并没有完成回收,那么这个线程也会被计算在线程数量之内。
下面介绍一下线程池状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
ctl变量的高3位表示线程池状态,共有以下几种状态:
- RUNNING 可以接收新任务,并处理阻塞队列中的任务,状态码111
- SHUTDOWN 不接收新任务,但是会处理阻塞队列中的任务,状态码000
- STOP 不接收新任务,不处理阻塞队列中的任务,正在处理的任务会被打断执行,状态001
- TIDYING 所有任务都已被处理,线程数为0,进入此状态时会调用terminated()方法,状态码010
- TERMINATED terminated()方法执行完成后进入此状态,状态码011
线程池可能的状态转换有以下几种: - RUNNING -> SHUTDOWN 当shutdown()方法被调用的线程池状态由RUNNING转换为SHUTDOWN
- (RUNNING/SHUTDOWN) -> STOP 当shutdownNow()方法被调用时,线程池状态由RUNNING或SHUTDOWN状态转换为STOP
- SHUTDOWN -> TIDYING 当线程池处理SHUTDOWN状态以后,如果阻塞队列已空,并且线程池中线程数为0,则线程池转为TIDYING状态
- STOP -> TIDYING 当线程池处理STOP状态,并且线程池里线程数为0,则线程池进行TIDYING状态
- TIDYING -> TERMINATED 当terminated()方法被调用完成后进入此状态
任务的执行
任务的执行是通过调用execute方法
void execute(Runnable command);
方法实现如下:
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) //如果当前线程数小于核心线程数,则创建新线程处理任务
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //如果线程池状态是RUNNING,并且任务被成功存入阻塞队列
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //再次检查线程池状态,如果已不是RUNNING,则将任务由阻塞队列中移除,并将任务reject
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) //如果线程池状态已不是RUNNING或者任务放入队列失败,则创建新线程处理任务,如果失败则reject任务
reject(command);
网友评论