个人博客地址 http://dandanlove.com/
再使用线程池之前,我们应该了解为什么需要使用线程池。进行执行任务(task)的时候我们一般情况是new Thread进行执行,如果进行大量的并发任务的时候呢?
大多数并发应用程序是围绕执行任务(task)进行管理的。所谓的任务就是抽象、离散的工作单元(unit of work)。
当我们进行大量并发操作的时候,每任务每线程(thread-per-task)方法存在一些实际的缺陷,尤其是在需要创建大量的线程时会更加突出。
- 线程生命周期的消耗(创建和销毁)
- 资源消耗量(内存占用和管理,竞争CPU资源)
- 稳定性(线程数量超过JVM限制是,可能会出现OutofMemoryError)
在这种情况下为线程管理出现了线程池(ThreadPool),它是管理一个工作者线程的同构池(homogeneous pool)。线程池是与工作队列(work queue)紧密绑定的。
Executor是一个简单的借口,它是基于生产者-消费者模式,用于异步任务执行,而且支持很多不同类型的任务执行策略。
Executor框架使用的是Runnable作为其任务的基本表达形式。Runnable只是相当有限的抽象,但是Run不能反悔一个值或者抛出受检查的异常。Callable恰好可以将Runnable做封装返回其执行结果。
一个Executor执行的任务的生命周期有4个阶段:创建、提交、开始和完成。Future可以描述任务的生命周期,并提供了相关的方法来获得、取消以及验证任务是否已经完成还是被取消。
public interface Executor {
void execute(Runnable var1);
}
@FunctionalInterface
public interface Runnable {
void run();
}
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
public interface Future<V> {
boolean cancel(boolean var1);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException;
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default rejected execution handler.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
- corePoolSize:线程池的核心线程数,一般情况下不管有没有任务都会一直在线程池中一直存活,只有在 ThreadPoolExecutor 中的方法allowCoreThreadTimeOut(boolean value) 设置为 true 时,闲置的核心线程会存在超时机制,如果在指定时间没有新任务来时,核心线程也会被终止,而这个时间间隔由第3个属性 keepAliveTime 指定。
- maximumPoolSize:线程池所能容纳的最大线程数,当活动的线程数达到这个值后,后续的新任务将会被阻塞。
- keepAliveTime:控制线程闲置时的超时时长,超过则终止该线程。一般情况下用于非核心线程,只有在 ThreadPoolExecutor 中的方法 allowCoreThreadTimeOut(boolean value) 设置为 true时,也作用于核心线程。
- unit:用于指定 keepAliveTime 参数的时间单位,TimeUnit 是个 enum 枚举类型,常用的有:TimeUnit.HOURS(小时)、TimeUnit.MINUTES(分钟)、TimeUnit.SECONDS(秒) 和 TimeUnit.MILLISECONDS(毫秒)等。
- workQueue:线程池的任务队列,通过线程池的 execute(Runnable command) 方法会将任务 Runnable 存储在队列中(阻塞队列)。
1)ArrayBlockingQueue:
是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
2)LinkedBlockingQueue:
一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于 ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
3)SynchronousQueue:
一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
4)PriorityBlockingQueue:一个具有优先级的无限阻塞队列。- threadFactory:线程工厂,它是一个接口,用来为线程池创建新线程的。
newCachedThreadPool
newCachedThreadPool是一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute() 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资源。注意,可以使用 ThreadPoolExecutor 构造方法创建具有类似属性但细节不同(例如超时参数)的线程池。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newSingleThreadExecutor
newSingleThreadExecutor 创建是一个单线程池,也就是该线程池只有一个线程在工作,所有的任务是串行执行的,如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它,此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newFixedThreadPool
newFixedThreadPool 创建固定大小的线程池,每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小,线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
newScheduledThreadPool
newScheduledThreadPool 创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求。
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
网友评论