在开发过程中,当我们需要使用线程的时候就常常会去new一个Thread,但是这样写会有什么问题呢?
假如并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?
那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?
JAVA的线程池可以解决此问题,线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。
使用线程池的好处
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
线程池状态
@ReachabilitySensitive
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING: 可以接收新任务,和处理阻塞队列任务;
SHUTDOWN: 不接收新任务,但是可以处理阻塞队列任务;
STOP: 既不接收新任务,也不处理阻塞队列任务,直接终止运行中的任务;
TIDYING: 所有任务都已经终止,有效线程数workcout为0,线程池进入TIDYING状态后会调用 terminated() 方法进入TERMINATED 状态;
TERMINATED:terminated()方法执行完成。
ThreadPoolExecutor构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize:核心线程池大小
maximumPoolSize:线程池最大容量大小
keepAliveTime:线程池空闲时,线程存活时间
unit: keepAliveTime的时间单位
workQueue:等待队列,当任务提交时,如果线程池中的线程数量大于等于corePoolSize的时候,把该任务封装成一个Worker对象放入等待队列;
threadFactory:线程工厂
handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:
- AbortPolicy:直接抛出异常,这是默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务。
execute方法
ThreadPoolExecutor被初始化后,通过execute方法提交线程任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { //1
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //2
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //3
reject(command);
else if (workerCountOf(recheck) == 0) //4
addWorker(null, false);
}
else if (!addWorker(command, false)) //5
reject(command);
}
处理步骤:
- 判断当前的活动线程数是否小于核心线程大小。如果小于,则新建一个线程放入线程池中,并启动该任务;
- 如果当前活动的线程数不小于核心线程池大小,判断当前线程池是否是RUNNING状态。如果是, 则将任务添加到阻塞队列workQueue中;
- 重新获取当前线程,并判断是否处于RUNNING状态,如果不是RUNINING状态,则从阻塞队列中删除该任务,并通过handler使用拒绝策略对该任务进行处理,整个方法返回;
- 如果条件3判断失败 ,判断当前线程数是否为0。如果等于0,则执行addWorker方法。需要注意的是,这里的addWorker方法第一参数为null,表示在线程池中创建一个线程,但不去启动;第二个参数为false,表示将线程池的有限线程数量的上限设置为maximumPoolSize;
- 回到步骤2的判断,步骤2判断失败,调用addWorker方法,如果add失败则直接拒绝任务。
addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) //1 core为true,比较核心线程池大小;false,比较最大线程池大小
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); //2 启动线程
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
- 当传入core为false时,检测当前线程数大小是否大于最大线程池大小,如果大于直接返回false。
- 启动线程。
总结执行过程:
图2 ThreadPoolExecutor执行过程
- 调用ThreadPoolExecutor的execute提交线程任务,检查当前线程大小,如果小于核心线程corePoolSize大小,则新创建线程执行任务;
- 如果大于核心线程corePoolSize大小,但是小于最大线程池maximumPoolSize大小,则将线程任务加入到BlockingQueue队列,等待处理;
- 如果大于最大线程池maximumPoolSize大小,则直接reject。
网友评论