书接上文,<a href="http://www.jianshu.com/p/aa5884bcd032">Java线程池</a>。
接下来记录一下线程池的工作机制和原理
线程池的两个核心队列:
- 线程等待池,即线程队列BlockingQueue。
- 任务处理池(PoolWorker),即正在工作的Thread列表(HashSet<Worker>)。
线程池的核心参数:
- 核心池大小(corePoolSize),即固定大小,设定好之后,线程池的稳定峰值,达到这个值之后池的线程数大小不会释放。
- 最大处理线程池数(maximumPoolSize),当线程池里面的线程数超过corePoolSize,小于maximumPoolSize时会动态创建与回收线程池里面的线程池资源。
线程池的运行机制:
举个栗子。假如有一个工厂,工厂里面有10个人,每个工人同时只能做一件事情。因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;当10个工人都有任务时,如果还来任务,就把任务进行排队等待。
如果说新任务数目增长的速度远远大于工作做任务的速度,那么此时工厂的主管可能就需要采取补救措施了,比如重新招4个工人进来;然后就将任务分配给这4个刚招进来的工人处理。
如果说这14个工人做任务的速度还是不够,此时工厂主管就要考虑不再接受新的任务或者抛弃前面的一些任务了。当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管就要考虑辞掉4个临时工了,只保持原来10个工人,比较额外的工人是需要花费的。
而这个栗子中永远等待干活的10个工人机制就是workerQueue。这个栗子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。也就是说corePoolSize就是线程池的大小,maximumPoolSize在我看来就是一种线程池任务超过负荷的一种补救措施,即任务量突然过大时的一种补救措施。再看看下面图好好理解一下。工人永远在等待干活,就像workerQueue永远在循环干活一样,除非,整个线程池停止了。
线程池里面的线程的时序图如下图所示:
线程的时序图自定义线程池与ExecutorService
自定义线程池需要用到ThreadFactory,本节将通过创建一个线程的例子对ExecutorService及其参数进行详细讲解。
1.认识ExecutorService家族
ExecutorService家族成员如下所示:
ExecutorService家族使用startUML画的,我是UML菜鸟,所以凑合着看下。
上图中主要元素说明如下:
Executor:线程池的顶级接口,但是严格意义上讲Executor并不是一个线程池,而只是一个执行线程的工具。
ExecutorService:真正线程池接口。这个接口继承了Executor接口,并声明了一些方法:
submit、invokeAll、invokeAny以及shutDown等。
AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法。
ThreadPoolExecutor:ExecutorService的默认实现,继承了类AbstractExecutorService。
ScheduledExecutorService:与Timer/TimerTask类似,解决那些需要任务重复执行的问题。
ScheduledThreadPoolExecutor:继承ThreadPoolExecutor的ScheduledExecutorService接口实现,周期性任务调度的类实现。
Executors是个线程工厂类,方便我们快速地创建线程池。
2.利用ThreadFactory创建一个线程
java.util.concurrent.ThreadFactory提供了一个创建线程的工厂的接口。
ThreadFactory源码如下:
public interface ThreadFactory{
@override
public Thread newThread(Runnable r);
}
我们可以看到上面的接口类中有一个newThread()的方法,为此我们自己手动定义一个线程工厂类,有木有激动啊,呵呵,下面我们就手动写一个自己的线程工厂类吧!
MyThreadFactory.java
public class MyThreadFactory implements ThreadFactory{
@Override
public Thread newThread(Runnable r){
return new Thread(r);
}
}
上面已经创建好了我们自己的线程工厂类,但是啥都没有做,就是直接new了一个Thread就返回回去了,我们一般在创建线程的时候,都需要定义其线程的名字,因为我们在定义了线程的名字之后就能在出现问题的时候根据监视工具来查找错误的来源,所以我们来看下官方实现的ThreadFactory吧!
这个类在java.util.concurrent.Executors类中的静态类中DefaultThreadFactory
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory{
private static final AtomicInteger poolNumber=new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber=new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory(){
SecurityManager s=System.getSecurityManager();
group=(s!=null)?s.getThreadGroup():Thread.currentThread().getThreadGroup();
namePrefix="pool-"+poolNumber.getAndIncrement()+"-thread-";
}
public Thread newThread(Runnable r){
Thread t=new Thread(group,r,namePrefix+threadNumber.getAndIncrement(),0);
if((t.isDaemon())
t.setDaemon(false);
if(t.getPriority()!=Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
3.了解线程池的拒绝策略(RejectExecutionHandler)
当调用ThreadPoolExecutor的execute方法时,而此时线程池处于一个饱和的状态,并且任务队列也已经满了那么就需要做丢弃处理,RejectExecutionHandler就是这样的一个处理接口类。
RejectExecutionHandler.java
public interface RejectedExecutionHandler {
/**
* Method that may be invoked by a {@link ThreadPoolExecutor} when
* {@link ThreadPoolExecutor#execute execute} cannot accept a
* task. This may occur when no more threads or queue slots are
* available because their bounds would be exceeded, or upon
* shutdown of the Executor.
*
* <p>In the absence of other alternatives, the method may throw
* an unchecked {@link RejectedExecutionException}, which will be
* propagated to the caller of {@code execute}.
*
* @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
* @throws RejectedExecutionException if there is no remedy
*/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
在JDK里面有4中拒绝策略,如下图所示:
线程池拒绝策略- AbortPolicy:一言不合就抛异常(默认使用策略)。
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,直接丢弃。
来看下源码吧:
AbortPolicy : 一言不合就抛异常的
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
CallerRunsPolicy:调用者所在线程来运行任务
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
DiscardOldestPolicy :丢弃队列里面最近的一个任务,并执行当前任务
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
DiscardPolicy : 不处理,直接丢弃
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
思考问题:
为什么有任务拒绝的情况发生呢:
这里先假设有一个前提:线程池里面有一个任务队列,用于缓存所有待处理的任务,正在处理的任务将从任务队列中移除。因此,在任务队列长度有限的情况下,就会出现现任务的拒绝情况,需要一种策略来处理发生这种已满无法加入的情况。另外,在线程池关闭的时候,也需要对任务加入队列操作进行额外的协调处理。
4.ThreadPoolExecutor详解
ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要想透彻的了解Java线程池,必须先了解这个大BOSS,下面来看下其源码:
4种构造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,RejectedExecutionHandler handler) {
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
通过源码我们清楚的看到,最终构造函数调用了最后一个构造函数,后面的那个构造函数才是真正的构造函数,接下来研究一下参数。
- int corePoolSize:核心池大小,这个参数跟后面讲的线程池原理有很大的关系。在创建了线程池之后,默认情况下,线程池中并没有任何线程,而是等待所有的任务到来之时才进行创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法 ,从这个两个方法的名字可以知道是预创建线程的意思,即在没有任务来临之前先创建好corePoolSize个线程或者一个线程。默认情况下,在创建好线程池之后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数量达到corePoolSize后,就会把达到的任务放到缓存队列中去。
- int maximumPoolSize:线程池最大线程数量,这是个非常重要的参数,它表示在线程池中最多能创建线程的数量;在corePoolSize和maximumPoolSize的线程数会被自动释放,而小于corePoolSize的则不会。
- long keepAliveTime:表示线程没有执行任务时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会生效,直到线程池数量不大于corePoolSize,即只有当线程池数量大于corePoolSize数量,超出这个数量的线程一旦到达keepAliveTime就会终止。但是如果调用了allowCoreThreadTimeout(boolean)方法,即使线程池的线程数量不大于corePoolSize,线程也会在keepAliveTime之后就终止,知道线程池的数量为0为止。
- TimeUnit unit:参数keepAliveTime的时间单位,一个时间单位枚举类。
- BlockingQueue workQueue:一个阻塞队列,用来存储等待执行任务的队列,这个参数选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列就是(ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue;)。
- ThreadFactory ThreadFactory:线程工厂,主要用来创建线程;可以是一个自定义的线程工厂,默认就是Executors.defaultThreadFactory()。用来在线程池中创建线程。
- RejectedExecutionHandler handler:表示当拒绝处理任务时的策略,也是可以自定义的,默认是我们前面的4种取值:
- ThreadPoolExecutor.AbortPolicy(默认的,一言不合即抛异常的)
- ThreadPoolExecutor.DiscardPolicy(一言不合就丢弃任务)
- ThreadPoolExecutor.DiscardOldestPolicy(一言不合就把最近的任务给抛弃,然后执行当前任务)
- ThreadPoolExecutor.CallerRunsPolicy(由调用者所在线程来执行任务)
所以想自定义线程池就可以从上面的几个参数入手。接下来具体看下代码,了解一下实现原理:
// 默认异常处理机制
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
//任务缓存队列,用来存放等待执行的任务
private final BlockingQueue<Runnable> workQueue;
//线程池的主要状态锁,对线程状态(比如线程大小、runState等)的改变都需要这个锁
private final ReentrantLock mainLock = new ReentrantLock();
//用来存放工作集
private final HashSet<Worker> workers = new HashSet<Worker>();
//volatile 可变变量关键字,写的时候用mainLock做锁,读的时候无锁,高性能
private volatile long keepAliveTime;
//是否允许核心线程超时
private volatile boolean allowCoreThreadTimeOut;
//核心线程数量
private volatile int corePoolSize;
//线程最大线程数量
private volatile int maximumPoolSize;
//任务拒绝策略
private volatile RejectedExcutionHandler handler;
结合之前的知识,大概就能猜出里面是怎么实现的了,具体可以参考一下JDK的源代码,这样我们就能做到了解原理又会用了。
5.自定义实现一个简单的Web请求连接池
我们来自定义一个简单的Web请求线程池。模仿Web服务的需求场景说明如下:
- 服务器可容纳的最小请求数是多少。
- 可以动态扩充的请求数大小是多少。
- 多久回收多余线程数即请求数。
- 用户访问量打了怎么处理。
- 线程队列机制采取有优先级的排队的执行机制。
根据上面的场景,看下这个线程池如何编写?
public class MyExecutors extends Executors{
//利用默认线程工厂和PriorityBlockingQueue队列机制,当然了,我们还可以自定义ThreadFactory和继承queue进行自定义扩展
public static ExecutorService newMyWebThreadPool(int minSpareThreads,int maxThreads,int maxIdleTime){
return new ThreadPoolExecutor(minSpareThread,maxThreads,maxIdleTime,TimeUnit.MILLISECONDS,
new PriorityBlockingQueue<Runnable>());
}
}
6.线程池在工作中的错误使用
- (1)分不清楚线程是单例还是多对象。
- (2)线程池数量设置很大。
- (3)注意死锁问题
网友评论