java.util.concurrent.ThreadPoolExecutor
先判断设定线程池是否已经全部使用,如果全部使用则判断队列是可用,如果队列也满了判断是否超过规定的最大线程数,如果超过则按照规定的拒绝策略执行。
newSingleThreadExecutor
ExecutorService executorService = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
适用场景:任务少 ,并且不需要并发执行。
newCachedThreadPool
ExecutorService executorService = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
创建一个可以无限扩大的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
适用于服务器负载较轻,执行很多短期异步任务。
newFixedThreadPool
ExecutorService executorService = Executors.newFixedThreadPool(1);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于可以预测线程数量的业务中,或者服务器负载较重,对当前线程数量进行限制。
newScheduledThreadPool
ExecutorService executorService =Executors.newScheduledThreadPool(1);
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。
参数解析
corePoolSize:核心线程池的大小,如果核心线程池有空闲位置,这是新的任务就会被核心线程池新建一个线程执行,执行完毕后不会销毁线程,线程会进入缓存队列等待再次被运行。
maximunPoolSize:线程池能创建最大的线程数量。如果核心线程池和缓存队列都已经满了,新的任务进来就会创建新的线程来执行。但是数量不能超过maximunPoolSize,否侧会采取拒绝接受任务策略,我们下面会具体分析。
keepAliveTime:非核心线程能够空闲的最长时间,超过时间,线程终止。这个参数默认只有在线程数量超过核心线程池大小时才会起作用。只要线程数量不超过核心线程大小,就不会起作用。
unit:时间单位,和keepAliveTime配合使用。
workQueue:缓存队列,用来存放等待被执行的任务。
threadFactory:线程工厂,用来创建线程,一般有三种选择策略。
- ArrayBlockingQueue;
- LinkedBlockingQueue;
- SynchronousQueue;
handler:拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略,四种策略为。
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
几种方法的使用
submit(),提交一个线程任务,有返回值,适用于需要处理返回着或者异常的业务场景。
execute(),执行一个任务,没有返回值 。
shutdown(),表示不再接受新任务,但不会强行终止已经提交或者正在执行中的任务 。
shutdownNow(),对于尚未执行的任务全部取消,正在执行的任务全部发出interrupt(),停止执行 。
java.util.concurrent.ForkJoinPool
将大任务分解成若干个小任务,当小任务均执行结束后,将任务做一个整合
newWorkStealingPool
ExecutorService executorService = Executors.newWorkStealingPool();
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
使用一个无限队列来保存需要执行的任务,可以传入线程的数量,不传入,则默认使用当前计算机中可用的cpu数量,使用分治法来解决问题,使用fork()和join()来进行调用。
jdk1.8以上版本支持。
创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行,适用于大耗时的操作,可以并行来执行。
参数解析
int parallelism:可并行级别,最小为1。Fork/Join框架根据这个级别设定并行执行的线程数,但parallelism不等于线程数。默认构造函数中将改值设为CPU的核数。
ForkJoinWorkerThreadFactory factory:线程创建工厂。它是一个函数式接口,需要实现一个newThread方法。
UncaughtExceptionHandler handler:异常处理器。处理任务中抛出的异常,默认为null。
boolean asyncMode:是否为异步模式,默认为false,即LIFO模式。指定线程任务队列的处理方式,通常有LIFO、FIFO两种模式
几种方法
execute(): 先对任务做非空校验再讲任务压入ForkJoinPool中的执行队列
submit():与execute() 大体相同,返回任务对象本身,返回后可以通过get()方法获取执行结果
invoke():将任务压入队列后会执行join()方法,阻塞当前线程。
shutDown(): 执行该方法后ForkJoinPool不再接受新任务,但队列中未执行的任务和正在执行中的任务仍然可以继续执行。
shutDownNow(): 停止所有未执行的任务并且拒绝新任务,正在执行中的任务可继续执行。
网友评论