1.Executors
Executors提供的常用线程池:
线程池的构造方法具体如下:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
参数说明:
a.corePoolSize: 线程池中常驻线程数;
b.maximumPoolSize: 线程池能够容纳的最大线程数,当等待队列满了,才增大存活线程,最大为maximumPoolSize;
**c.keepAliveTime: **多余空闲线程存活的时间,如果当线程池中线程数量超过corePoolSize时,当有空闲线程的时间超过keepAliveTime时间时,就会将多余的空闲线程销毁,只保留corePoolSize线程;
d.unit: keepAliveTime的单位;
e.workQueue: 任务队列,被提交但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
**f. threadFactory: **线程工厂,用于创建线程,一般用默认即可;
**g. handler: ** 拒绝策略,当任务太多来不及处理时,如何拒绝任务,主要有AbortPolicy,CallerRunsPolicy,DiscardOldestPolicy,DiscardPolicy,NewThreadRunsPolicy等.
1.1 newFixedThreadPool线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool指定常驻线程数,线程池中的线程数是固定的,但是由于使用了阻塞new LinkedBlockingQueue<Runnable>(),默认最大值长度为Integer.MAX_VALUE(即2 147 483 647),现实业务场景中几乎不会用到这么大的容量,因此如果LinkedBlockingQueue没有设置大小,可以将其视为无界队列。newFixedThreadPool使用无界队列,没有拒绝策略,如果线程等待太多会导致LinkedBlockingQueue队列长度十分长,会导致JVM的OMM。
1.2 newSingleThreadExecutor线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newSingleThreadExecutor指定常驻线程数为1,线程池中的线程数是固定的1,,但是由于使用了new LinkedBlockingQueue<Runnable>(),默认最大值长度为Integer.MAX_VALUE(即2 147 483 647),现实业务场景中几乎不会用到这么大的容量,因此如果LinkedBlockingQueue没有设置大小,可以将其视为无界队列。newSingleThreadExecutor使用无界队列,没有拒绝策略,如果线程等待太多会导致LinkedBlockingQueue队列长度十分长,会导致JVM的OMM。
1.3 newCachedThreadPool线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newCachedThreadPool可以动态的调整线程池线程数,核心线程数被设置为0,最大线程数设置为Integer.MAX_VALUE,使用有界阻塞队列SynchronousQueue。由于最大线程数几乎无上限,当主线程提交的速度高于线程池处理的速度的时候,newCachedThreadPool会不断的创建新的线程,极端情况下会导致CPU和内存资源耗尽。
1.4 newScheduledThreadPool周期性线程池
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
newScheduledThreadPool可以指定延迟时长和周期性执行,使用无界队列DelayQueue。
1.5 newWorkStealingPool(jdk8新增,ForkJoinPool的扩展)
/**
* Creates a work-stealing thread pool using all
* {@link Runtime#availableProcessors available processors}
* as its target parallelism level.
* @return the newly created thread pool
* @see #newWorkStealingPool(int)
* @since 1.8
*/
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
具体原理待完成
1.6 阻塞队列:
阻塞队列:
a.阻塞添加 当队列满了的时候插入队列,会使线程阻塞,当队列不再满时,再唤醒线程执行插入队列;
b.阻塞删除 当队列为空的时候执行删除,删除线程将会被阻塞,直到队列不再为空的时候再唤起删除线程。
1.7 线程池的拒绝策略:
1.AbortPolicy 默认策略,即丢弃任务,抛出异常;
- DiscardPolicy 丢弃任务,不抛出异常;
- DiscardOldestPolicy 丢弃队列中等待最久的任务,并执行当前任务;
- CallerRunsPolicy 当队列满时,新线程提交的直接在主线程中执行(不在线程池中执行),相当于submit被阻塞,后续提交的任务被阻塞,只有主线程执行提交的线程之后,后续的线程才会被提交到线程池中。
1.8 线程池submit()和execute()的区别
execute()方法无返回值
submit() 可以有返回值,如使用Future接收返回值。
1.9 线程池可以catch到线程的Exception吗
答案:不能
1.10 线程池中线程的回收
1.当从等待队列中获取不到线程的时候(即返回null),就会触发线程尝试回收,当线程数大于核心线程数,且线程空闲时间超过设置的空闲时间阈值,那么就会使用CAS乐观锁触发销毁线程操作,避免并发操作导致保留的线程数小于核心线程数。
2.当线程池执行了shutdown时,拒绝接受新提交的线程,判断线程池中的线程是否处于空闲状态,如果处于空闲状态则使用CAS回收,否则等待执行完毕再回收。
3.当线程池执行了shutdownNow时,拒绝接受新提交的线程,同时立马关闭线程池(不使用CAS),线程池里的任务不再执行。
线程池个数的选择:
1.计算密集型任务
线程数 = CPU个数 + 1
加1的原因是,不会因为某个线程异常或者暂停而导致CPU在执行周期内轮空;
2.对于IO型或者是阻塞类型
线程数 = CPU个数 * 2
或者
线程数 = CPU个数 * CPU利用率 (0~1之间)* (1 + (等待时间 / 计算时间))
对于这种情况比较麻烦,需要考虑的因素很多,如线程要链接数据库时,那么数据库连接池可用的连接数限制了线程的个数大小。
网友评论