线程脑图
多线程.pngThreadPool 线程池
线程池的构造参数
1. corePoolSize:核心线程数
2. maximumPoolSize:最大线程数
3. keepAliveTime:当线程数,大于核心线程数时,多出来的线程所存活的时间,超过该时间将自动销毁
4. unit:keepAliveTime的单位(TimeUnit.SECONDS等)
5. workQueue:阻塞队列,用于存放超过核心线程数的Runnable。
6. threadFactory:创建线程的工厂类,实现了ThreadFactory接口。线程池的创建线程的逻辑,就是用了该接口的newThread方法
7. handler:拒绝策略,当超过队列的容量和最大线程数时。应当采取的策略。AbortPolicy,DiscardPolicy,DiscardOldPolicy,CallerRunsPolicy
---------------------------------------------------------------------线程池构造函数源码---------------------------------------------------------------------
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
ThreadPoolExecutor工作原理
1. 如果工作线程数,小于设定的核心线程数则创建核心线程来执行任务
2. 如果核心数达到上限,将Runnable加入创建ThreadPoolExecutor时的阻塞队列(workQueue)
3. 阻塞队列达到容量上限,创建非核心线程,执行任务
4. 工作线程数量达到最大值(maximumPoolSize),执行拒绝策略(handler)
[图片上传失败...(image-aa80be-1595847718289)]
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
//ctl 为 AtomicInteger,用来记录当前线程状态和线程数量
int c = ctl.get();
// 如果工作线程数,小于设定的核心线程数则创建核心线程来执行该任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))//添加任务到核心线程 创建线程在addWorker中通过Work类执行了ThreadFactory的newThread()方法
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//能够添加任务到队列中。若队列中的元素达到队列上限,offer方法将返回false
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//如果当前线程池的状态为不可运行状态,就将刚才的任务从队列中移除,并对该任务执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))//添加任务到非核心线程
reject(command);//如果失败,执行设置好的拒绝策略
}
Executor框架:
ThreadPoolExecutor:
1. newSingleThreadExecutor:创建一个单线程的线程池,只有一个线程,存放任务的队列容量为Integer.Max
2. newFixedThreadPool:创建指定线程数N的线程池,核心线程数和最大线程数都为N,即线程数限制为N,存放任务的队列容量为Integer.Max.
3. newCachedThreadPool:创建一个线程池,该线程池根据需要创建新线程,但是将在可用时重新使用先前构造的线程。最大线程数为Integer.Max。存放任务的队列为,SynchronousQueue,它是一个不存储任务的阻塞队列,如果队列中有任务,则阻塞住请求的线程,直到创建新的线程,或者空闲的线程将这个任务取走执行。该线程池设置了keepAliveTime(60秒),空闲线程超出该规定时间就会自行销毁。
ScheduledThreadPoolExecutor:
可以理解未,能够执行延时任务的一个ThreadPoolExecutor。schedule方法可以设置执行任务时的延时时间。
ForkJoinPool:
ForkJoinPool采用工作窃取算法,将一个大任务根据阈值分割成很多个子任务,最后根据场景是否要合并子任务运算结果;
线程池状态:
阿里推荐的ThreadFactory是什么:
1. ThreadFactory是一个接口,里面有newThread方法。线程池里线程的创建,最终对是调用该方法创建的。
2. 线程池默认的DefaultThreadFactory:从代码中可以看出,线程池中默认的线程工厂类,给每个线程设置了group,name,非守护线程,以及线程优先级。我们可以模仿该类,实现自己的ThreadFactory
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
shutDown和shutDownNow:
- shutDown:停止所有未阻塞的线程。源码interruptIdleWorkers()中的w.tryLock()。能请求到这把锁时,执行interrup()方法中断。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
//Work中的 trylock方法
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
- shutdownNow:停止所有正在执行的任务(interrupt)。返回所有还没有执行的任务,并将还没有执行的任务,从队列中删除。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
拒绝策略:
概念:当队列的任务满了,运行的线程数也达到了最大线程数时,此时如果再来任务的话,就会按照用户的拒绝策略来执行。
1. AbortPolicy: 拒绝任务,直接抛异常
2. DiscardPolicy: 抛弃该请求任务
3. DiscardOldPolicy: 抛弃最久的没被处理的请求任务
4. CallerRunsPolicy: 由当前线程,执行此任务
网友评论