一般情况下我们创建一个线程我们会直接继承一个Thread 类或者实现一个Runnable接口,然后通过new Thread().start()去启动一个线程执行相应的操作。可是这样来说的话,我们需要对于每一个操作都需要创建一个线程,比如我们在处理web请求的时候,如果我们针对每一个请求都创建一个线程来处理它的话,而且一次请求处理的时间比较短。这就意味着我们需要频繁的创建和销毁一个线程,这样也就会大大降低系统的效率,因为创建和销毁线程不但需要时间而且会消耗系统资源。那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?这个时候我们就需要一个线程池了。
使用线程池有什么好处?
- 降低资源消耗:通过线程复用,可以减少线程的创建和销毁带来的消耗。
- 提高响应速度:一般任务被submit就可以执行。
- 提高线程的可管理性:对提交的线程进行统一的管理监控协调等等。
ThreadPoolExecutor
Java中 的线程池是通过ThreadPoolExecutor实现。它位于java.util.concurrent包下面,通过它我就可以直接使用线程池提供的好处和功能了。来看下它的简单结构类图:
ThreadPoolExecutor类图.png
来看下它的简单结构,从上面的类图中可以看到:它继承了AbstractExecutorService接口,AbstractExecutorService继承了ExecutorService接口,ExecutorService继承了Executor接口。Executor是一个顶层接口。
我们通常创建一个线程池的时候首先使用的就是newThreadPoolExecutor(),来看看ThreadPoolExecutor构造函数做了什么事情?来看看源码
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
- corePoolSize:核心线程池的大小。默认情况下,一个线程池被创建后是没有线程的,只有当任务过来的时候才会创建线程。只要任务的数量没有达到corePoolSize的个数限制都会不断创建新的线程。但是当线程池被创建以后并且调用了prestartAllCoreThreads()或者prestartCoreThread()方法后就会直接创建了corePoolSize线程。
- maximumPoolSize:最大线程池的数量。当线程池中的数量已经达到了corePoolSize后,并且有新的任务不断的提交过来,这个时候就会把新提交的任务执行的任务的阻塞队列中去。当队列满了以后,这个时候就会创建新的线程来处理任务,线程池中的线程数量将会超过corePoolSize。而maximumPoolSize就是能够创建的最大的线程数量。
- keepAliveTime:只有当线程池中的线程数量大于corePoolSize之后才会生效。也就是当对于大于corePoolSize的线程,它在没有接受到新的处理任务的空转等待时间是keepAliveTime。超过的时候就会被销毁回收。
- unit:keepAliveTime的单位信息。包括(TimeUnit.DAYS:天;TimeUnit.HOURS:小时;TimeUnit.MINUTES:分钟;TimeUnit.SECONDS:秒;TimeUnit.MILLISECONDS:毫秒;
TimeUnit.MICROSECONDS:微妙;TimeUnit.NANOSECONDS:纳秒。) - workQueue:线程阻塞队列,用来存放待处理任务的线程队列。
- 1.ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
- 2.LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于
- 3.ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
- 4.SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
- 5.PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
- 6.DelayQueue:一个无界的阻塞队列但是只有当等待时间过了之后才能取出保存在队列中的元素。
- 7.LinkedTransferQueue:基于链表实现的无界队列,按照FIFO
- threadFactory:(ThreadFactory)用于创建线程的工厂,所有的线程都是通过这个工厂来创建的。
- handler:(RejectedExecutionHandler)当任务不能被线程池所处理的时候会被调用来进行相应的处理,这是一个接口。
- 1.CallerRunsPolicy:对于线程池无法处理的任务直接使用调用任务的线程来处理
- 2.AbortPolicy:对于线程池无法处理的任务直接拒绝,抛出异常
- 3.DiscardPolicy:对于线程池无法处理的任务直接丢弃不执行。
- 4.DiscardOldestPolicy:对于线程池无法处理的任务直接将阻塞队列里面等待时间最长的任务舍弃,然后把该任务加到队列尾部。
Executor接口
我这边就按照从上到下的顺序来分析ThreadPoolExecutor的实现和功能,首先从顶层的Executor接口来看:
public interface Executor {
/**
* 在将来某一个时候来运行给定的任务,这个任务可以是一个新的线程也可以是一个线程池线程
*/
void execute(Runnable command);
}
很简单就是一个接口,接受一个Runnable参数,用来接受提交的任务,没有返回值。
ExecutorService接口
public interface ExecutorService extends Executor {
/**
* 关闭线程池,不再接受新的任务,但是此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出
*/
void shutdown();
/**
* 这个会尝试停止正在执行的任务,并且暂停等待运行的任务,并且吧没有机会执行的任务列表返回。它是通过通过调用Thread.interrupt()方法来终止线程的,所以ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
*/
List<Runnable> shutdownNow();
/**
返回线程池是否关闭的状态
*/
boolean isShutdown();
/**
* 当所有的任务都完成了才会返回true.
*/
boolean isTerminated();
/**
* 在给定的时间内,都会被阻塞,除非任务完成或者到达超时时间
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交一个可以有返回值的任务,返回值的结果保存在Future中
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一个Runnable任务,并且通过制定的Result 返回结果到Future中
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一个Runnable任务
*/
Future<?> submit(Runnable task);
/**
* 执行给定的多个任务集合,并且把他们的执行都成功执行完成之后将状态和结果保存的Future列表中返回。
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 相对于上面的一个方法,增加了一个超时时间
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 运行给定的方法,可是只要有一个任务执行成功就会立即返回结果
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 和上面的方法类似,指定了一个超时时间
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
如上ExecutorService丰富了线程池接受任务的形式,还可以提交Callable类型的任务,可以有返回值的任务,还提供了关闭线程池的任务接口定义。
AbstractExecutorService抽象类
public abstract class AbstractExecutorService implements ExecutorService {
/**
* 返回一个给定Runnable任务的FutureTask。
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
/**
* 返回一个给定Callable任务的FutureTask。
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
f = ecs.take();
}
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
for (int i = 0; i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
}
AbstractExecutorService抽象类增加了newTaskFor方法还有doInokeAny方法,其他的就是对ExecutorService接口中任务提交和任务调用方法的具体实现。newTaskFor主要是用于创建一个FutureTask任务,而submit方法的实现都大同小异,都是首先判断提交的任务是否为空,不为空则调用newTaskFor创建RunnableFuture任务,然后通过调用顶层的execute方法来添加任务。重点看下invokeAny和invokeAll的实现,invokeAny主要是通过调用doInvokeAny来实现业务逻辑;
网友评论