- 线程的频繁创建在高并发及大数据量时是非常消耗资源的
- 因此 Java 提供了线程池,Java5 在 java.util.concurrent 中添加了 Exectuor 异步执行框架
- 通过线程池减少线程创建和销毁的次数,重复利用,根据系统情况调整线程池数量,防止创建过多线程
- java.util.concurrent 中包含了几个比较重要的类:
类名 | 描述 |
---|---|
Executor | 线程池接口 |
ExecutorService | 在基础 Executor 线程池接口上生命了生命周期管理方法、任务执行状况跟踪方法 |
ScheduledExecutorService | 一个定时调度任务的接口 |
ScheduledThreadPoolExecutor | ScheduledExecutorService 的实现,实现了可定时调度任务的线程池 |
ThreadPoolExecutor | 线程池,可以通过调用 Executors 以下静态工厂方法来创建线程池并返回一个 ExecutorService 对象 |
ThreadPoolExecutor 构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) //后两个参数为可选参数
- corePoolSize:核心线程数,如果运行的线程少于 corePoolSize 则创建新线程来执行新任务,即使线程池中的其他线程是空闲的
-
maximumPoolSize:最大线程数,corePoolSize和maximumPoolSize设置的边界自动调整池大小:
- 当前线程数 >= corePoolSize(且任务队列已满时,线程会创建新线程来处理任务)
- 当前线程数 = maxPoolSize(且任务队列已满时,线程池会拒绝处理任务而抛出异常)
- keepAliveTime:如果线程数多于 corePoolSize 则这些多余的线程的空闲时间超过 keepAliveTime 时将被终止
- unit:keepAliveTime 参数的时间单位
-
workQueue:保存任务的阻塞队列,与线程池的大小有关:
- 当运行的线程数少于 corePoolSize 时,新任务直接创建新线程来执行任务而无需再进队列
- 当运行的线程数等于或多于 corePoolSize 新任务添加时则选加入队列,不直接创建线程
- 当队列满时,新任务就会创建新线程
- threadFactory:使用 ThreadFactory 创建新线程,默认使用 defaultThreadFactory 创建线程
- handle:定义处理被拒绝任务的策略,默认使用 ThreadPoolExecutor.AbortPolicy 任务被拒绝时将抛出 RejectExecutorException
Executors
- Executors 提供了一系列静态工厂方法用于创建各种线程池,根据具体应用场景而选择不同的线程池
newFixedThreadPool
- 创建可重用且固定线程数的线程池,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程
- 如果线程池中的某个线程由于异常而结束时,线程池就会再添加一个新线程
方法定义
//使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
使用例子
public class Main {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5);
IntStream.range(0, 6).forEach(i -> executorService.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
String threadName = Thread.currentThread().getName();
System.out.println("finished: " + threadName);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
}
}
//输出结果为:
//finished: pool-1-thread-1
//finished: pool-1-thread-2
//finished: pool-1-thread-3
//finished: pool-1-thread-4
//finished: pool-1-thread-5
//finished: pool-1-thread-1
newSingleThreadExecutor
- 创建一个单线程的 Executor,它只会用唯一的线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
- 如果该线程因为异常而结束就新建一条线程来继续执行后续的任务
方法定义
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
//corePoolSize和maximumPoolSize都等于,表示固定线程池大小为1
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
使用例子
public class Main {
private static int num = 100;
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
IntStream.range(0, 100).forEach(i -> executorService.execute(() -> {
String runnableName = "Runnable"+i+":";
System.out.println(runnableName + num--);
}));
try {
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (!executorService.isTerminated()) {
executorService.shutdownNow();
}
}
System.out.println("执行结束");
}
}
/** 输出结果为:
Runnable0:100
Runnable1:99
Runnable2:98
.....
Runnable98:2
Runnable99:1
执行结束
**/
newScheduledThreadPool
- 创建一个可延迟执行或定期执行的线程池
方法定义
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
使用例子
public class Main {
public static void main(String[] args) {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
IntStream.range(0, 2).forEach(i -> executorService.scheduleAtFixedRate(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("finished: " + threadName);
},1000, 2000, TimeUnit.MILLISECONDS));
}
}
//输出结果为:
//finished: pool-1-thread-1
//finished: pool-1-thread-2
//finished: pool-1-thread-1
//finished: pool-1-thread-2
//finished: pool-1-thread-2
//finished: pool-1-thread-1
newCachedThreadPool
- 创建可缓存的线程池,如果线程池中的线程在 60 秒未被使用就将被移除,在执行新的任务时,当线程池中有之前创建的可用线程就重用可用线程,否则就新建一个线程
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public class Main {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
IntStream.range(0, 6).forEach(i -> executorService.execute(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("finished: " + threadName);
}));
}
}
//输出结果为:
//finished: pool-1-thread-1
//finished: pool-1-thread-2
//finished: pool-1-thread-2
//finished: pool-1-thread-2
//finished: pool-1-thread-2
//finished: pool-1-thread-2
//线程池会缓存线程,尽可能的利用线程资源
ExecutorService 常用方法
- shutdown:方法等待提交的任务执行完成并不再接受新任务,在完成全部提交的任务后关闭
- shutdownNow:方法将强制终止所有运行中的任务并不再允许提交新任务
- awaitTermination:阻塞,直到所有任务在关闭请求之后完成执行,或发生超时,或当前线程中断
- isTerminated:当所有任务关闭后返回 true,除非 shutdown、shutdownNow 被调用,否则永远不为 true
- submit:提交可执行的任务(Runnable、Thread、Callable)并执行,返回一个 Future 对象
网友评论