常见线程池
- Executors是jdk里面提供的创建线程池的工厂类,它默认提供了4种常用的线程池应用,而不必我们去重复构造。
-
newSingleThreadExecutor
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
核心线程数和最大线程数均为1,空闲线程存活0毫秒同样无意思,意味着每次只执行一个线程,多余的先存储到工作队列,一个一个执行,保证了线程的顺序执行。 -
newFixedThreadExecutor(n)
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
当核心线程数和最大线程数固定相等,而空闲存活时间为0毫秒,说明此参数也无意义,工作队列为最大为Integer.MAX_VALUE大小的阻塞队列。当执行任务时,如果线程都很忙,就会丢到工作队列等有空闲线程时再执行,队列满就执行默认的拒绝策略 -
newCacheThreadExecutor(推荐使用)
创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程。
从构造看核心线程数为0,最大线程数为Integer最大值大小,超过0个的空闲线程在60秒后销毁,SynchronousQueue这是一个直接提交的队列,意味着每个新任务都会有线程来执行,如果线程池有可用线程则执行任务,没有的话就创建一个来执行。线程池中的线程数不确定,一般建议执行速度较快较小的线程,不然这个最大线程池边界过大容易造成内存溢出 -
newScheduleThreadExecutor
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
不建议使用 Executors静态工厂构建线程池
线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式。
-
Executors 返回的线程池对象的弊端如下:
-
FixedThreadPool 和SingleThreadPool:
允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。 -
CachedThreadPool 和ScheduledThreadPool:
允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
-
FixedThreadPool 和SingleThreadPool:
-
创建线程池的正确姿势
避免使用Executors创建线程池,主要是避免使用其中的默认实现,那么我们可以自己直接调用ThreadPoolExecutor的构造函数来自己创建线程池。在创建的同时,给BlockQueue指定容量就可以了。
private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
ThreadPoolExecutor线程池
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { }
-
常用参数
corePoolSize:核心线程数量,会一直存在,除非allowCoreThreadTimeOut设置为true
maximumPoolSize:线程池允许的最大线程池数量
keepAliveTime:线程数量超过corePoolSize,空闲线程的最大超时时间,达到这个时间才进行销毁
unit:超时时间的单位。unit 可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。
workQueue:工作队列,保存未执行的Runnable 任务
threadFactory:创建线程的工厂类
handler:当线程已满,工作队列也满了的时候,会被调用。被用来实现各种拒绝策略。
-
阻塞队列,即workQueue
Java并发包中的阻塞队列一共是7个,当然它们都是线程安全的。
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- DealyQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
- 拒绝策略,handler有四个选择
- ThreadPoolExecutor.AbortPolicy()
简单粗暴,直接抛出java.util.concurrent.RejectedExecutionException拒绝异常,这也是默认的拒绝策略。 - ThreadPoolExecutor.CallerRunsPolicy()
如果线程池未关闭,则会在调用者线程中自动重复调用execute()方法 ,直至执行新任务,这会导致主线程提交线程性能变慢。 - ThreadPoolExecutor.DiscardPolicy()
从方法看没做任务操作,即表示不处理新任务,即丢弃。 - ThreadPoolExecutor.DiscardOldestPolicy()
抛弃最老的任务,就是从队列取出最老的任务然后放入新的任务进行执行。
- 提交线程
ExecutorService es = Executors.newFixedThreadPool(3);
es.submit(xxRunnble);
es.execute(xxRunnble);
- execute()
没有返回值,如果不需要知道线程的结果就使用execute方法,性能会好很多。 - submit()
返回一个Future对象,如果想知道线程结果就使用submit提交,而且它能在主线程中通过Future的get方法捕获线程中的异常。
- execute()方法是如何处理
- 获取当前线程池的状态。
- 当前线程数量小于coreSize时创建一个新的线程运行。
- 如果当前线程处于运行状态,并且写入阻塞队列成功。
- 双重检查,再次获取线程状态;如果线程状态变了(非运行状态)就需要从阻塞队列移除任务,并尝试判断线程是否全部执行完毕,同时执行拒绝策略。
- 如果当前线程池为空就新创建一个线程并执行。
- 如果在第三步的判断为非运行状态,尝试新建线程,如果失败则执行拒绝策略。
- 关闭线程池
es.shutdown();
es.shutdownNow();
- shutdown()
不再接受新的任务,之前提交的任务等执行结束再关闭线程池。 - shutdownNow()
不再接受新的任务,试图停止池中的任务再关闭线程池,返回所有未处理的线程list列表。
- 线程池工作流程
- 如果线程池中的线程小于corePoolSize时就会创建新线程直接执行任务。
- 如果线程池中的线程大于corePoolSize时就会暂时把任务存储到工作队列workQueue中等待执行。
- 如果工作队列workQueue也满时,当线程数小于最大线程池数maximumPoolSize时就会创建新线程来处理,而线程数大于等于最大线程池数maximumPoolSize时就会执行拒绝策略。
详细流程,请点击此处查看!
-
FixedThreadPool运行图如下:
执行过程如下:
- 如果当前工作中的线程数量少于corePool的数量,就创建新的线程来执行任务。
- 当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。
- 线程执行完1中的任务后会自动从队列中取任务去执行。
注意:LinkedBlockingQueue是无界队列,所以可以一直添加新任务到线程池。
-
SingleThreadExecutor运行图如下:
执行过程如下:
- 如果当前工作中的线程数量少于corePool的数量,就创建一个新的线程来执行任务。
- 当线程池的工作中的线程数量达到了corePool,则将任务加入LinkedBlockingQueue。
- 线程执行完1中的任务后会从队列中取任务去执行。
注意:LinkedBlockingQueue是无界队列,所以可以一直添加新任务到线程池。由于在线程池中只有一个工作线程,所以任务可以按照添加顺序执行。
-
CachedThreadPool运行图如下:
执行过程如下:
- 首先执行SynchronousQueue.offer(Runnable task)。如果在当前的线程池中有空闲的线程正在执行SynchronousQueue.poll(),那么主线程执行的offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行成功,否则执行步骤2。
- 当线程池为空(初始maximumPool为空)或没有空闲线程时,配对失败,将没有线程执行SynchronousQueue.poll操作。这种情况下,线程池会创建一个新的线程执行任务。
- 在创建完新的线程以后,将会执行poll操作。当步骤2的线程执行完成后,将等待60秒,如果此时主线程提交了一个新任务,那么这个空闲线程将执行新任务,否则被回收。因此长时间不提交任务的CachedThreadPool不会占用系统资源。
SynchronousQueue是一个不存储元素阻塞队列,每次要进行offer操作时必须等待poll操作,否则不能继续添加元素。
总结
线程池主要用来解决线程生命周期开销问题和资源不足问题。通过对多个任务重复使用线程,线程创建的开销就被分摊到多个任务上,而且由于在请求到达时线程已经存在,所以消除线程创建所带来的延迟。这样,就可以立即为请求服务,使应用程序响应更快。另外,通过适当的调整线程中的线程数目可以防止出现资源不足。
网友评论