一般而言,对于并发多线程程序,需要频繁的创建线程,而许多线程很短时间内都可以执行完毕,此时会出现一个问题,就是线程的频繁创建于销毁所占用的时间和资源甚至超过了线程本身执行所需的资源。线程池的出现就是为了解决这个问题,解决的关键在于线程的复用。顾名思义,线程池就是存放线程的一个池子,需要用到线程时从池子中取线程,用完后归还,并不大量的创建与销毁。
我们先来学习最核心的类:ThreadPoolExecutor
先看他的构造方法:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
每个构造都有很多参数,我们依次来认识这些参数的意义:
- corePoolSize:表示线程池中线程的个数,初始时并没有线程,每到来一个任务便创建一个,当创建的数目超过这个数时,便进入缓存队列
- maximumPoolSize: 表示线程池中最多可创建的线程数,若不做限制,则线程池与一般的来一个任务便创建一个线程没有任何区别
- keepAliveTime:表示线程没有任务执行时存活的时间,这个限制主要用于超过corePoolSize的那一部分线程
- unit:时间单位,都是TimeUnit的一些静态参数
- workQueue:阻塞队列,用来存储等待执行的任务。BlockingQueue是一个接口,常用的实现类下面介绍
- ThreadFactory:线程工厂,主要定义一些创建线程的动作
- RejectedExecutionHandler:当拒绝处理任务时的策略。
一些方法重要的方法:
- 1.execute(Runnable command) :提交一个任务到线程池去执行
- 2.submit() :提交一个任务去执行,并能返回结果,借助Future来获取结果,具体实现在父类中
- 3.shutdown():有序的关闭之前提交的任务,并不再接受新任务,但该方法会等待正在进行的任务结束
- 4.List<Runnable> shutdownNow():停止所有正在执行的任务,并将所有正在等待的任务移除等待队列并作为一个list返回
- 5.boolean prestartCoreThread():初始化一个核心线程
- 6.int prestartAllCoreThreads():初始化所有核心线程
- 7.一些set方法
void setCorePoolSize(int corePoolSize)
void setKeepAliveTime(long time, TimeUnit unit)
void setMaximumPoolSize(int maximumPoolSize)
void setRejectedExecutionHandler(RejectedExecutionHandler handler)
void setThreadFactory(ThreadFactory threadFactory)
- 8.一些get方法
int getActiveCount()
long getCompletedTaskCount() //该线程池已经完成的任务数
int getCorePoolSize()
long getKeepAliveTime(TimeUnit unit)
int getLargestPoolSize() //获取同时存在的最大线程数
int getMaximumPoolSize()
int getPoolSize() //当前线程池中线程数
BlockingQueue<Runnable> getQueue()
RejectedExecutionHandler getRejectedExecutionHandler()
long getTaskCount() //获取曾经计划完成的任务数
ThreadFactory getThreadFactory()
-
9.缓存队列介绍及其策略
1.ArrayBlockingQueue:基于数组的一种队列,需指定大小
2.LinkedBlockingQueue:基于链表的一种队列,默认大小Integer.MAX_VALUE
3.synchronousQueue:不缓存任务,而是直接创建线程执行
4.DelayQueue -
10.任务拒绝策略
当前池子已满而且线程数目达到最大值,接下来到来的任务就会采取拒绝策略,
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
示例:
public class MyTask implements Runnable {
public String tag;
private SimpleDateFormat format;
MyTask(String tag, SimpleDateFormat format){
this.tag = tag;
this.format = format;
}
private String getTime(){
return format.format(new Date());
}
@Override
public void run() {
System.out.println("开始执行:" + tag + " " +getTime());
try {
Thread.currentThread().sleep(2000);
} catch (Exception e) {
System.out.println(tag + ": " + e.getMessage() + " " + getTime());
}
System.out.println("执行完毕:" + tag + " " + getTime());
}
}
public class ThreadPool {
public static final int CORE_SIZE = 3;
public static final int MAXI_SIZE = 6;
public static final int QUEUE_SIZE = 3;
public static final int ALIVE_TIME = 100;
public static final int SIZE = 9;
public static void main(String[] args) {
SimpleDateFormat format = new SimpleDateFormat("dd-MM-ss");
ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_SIZE,MAXI_SIZE,ALIVE_TIME, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(QUEUE_SIZE),new ThreadPoolExecutor.DiscardPolicy(){
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
super.rejectedExecution(r, e);
System.out.println("已抛弃线程:" + ((MyTask)r).tag + " " +format.format(new Date()));
}
});
for (int i = 0;i<SIZE;i++){
MyTask task = new MyTask("task" + i,format);
executor.execute(task);
System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+
executor.getQueue().size());
}
executor.shutdown();
}
}
当SIZE为9时,打印log如下:
线程池中线程数目:1,队列中等待执行的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:1
线程池中线程数目:3,队列中等待执行的任务数目:2
线程池中线程数目:3,队列中等待执行的任务数目:3
线程池中线程数目:4,队列中等待执行的任务数目:3
线程池中线程数目:5,队列中等待执行的任务数目:3
线程池中线程数目:6,队列中等待执行的任务数目:3
开始执行:task1 12-04-45
开始执行:task6 12-04-45
开始执行:task8 12-04-45
开始执行:task0 12-04-45
开始执行:task2 12-04-45
开始执行:task7 12-04-45
执行完毕:task1 12-04-47
执行完毕:task6 12-04-47
开始执行:task3 12-04-47
执行完毕:task8 12-04-47
开始执行:task4 12-04-47
开始执行:task5 12-04-47
执行完毕:task0 12-04-47
执行完毕:task2 12-04-47
执行完毕:task7 12-04-47
执行完毕:task4 12-04-49
执行完毕:task3 12-04-49
执行完毕:task5 12-04-49
可以清楚的看到,任务先添加到线程池中,corePoolSize满之后,进入缓存队列,当缓存对列也满之后,开始创建新进程,直到达到maximumPoolSize。同时也注意到shutdown是可以等待任务完成的,可以用shutdownNow做对比。
当SIZE为10时,打印log如下:
线程池中线程数目:1,队列中等待执行的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:1
线程池中线程数目:3,队列中等待执行的任务数目:2
线程池中线程数目:3,队列中等待执行的任务数目:3
线程池中线程数目:4,队列中等待执行的任务数目:3
线程池中线程数目:5,队列中等待执行的任务数目:3
线程池中线程数目:6,队列中等待执行的任务数目:3
已抛弃线程:task9 12-04-44
线程池中线程数目:6,队列中等待执行的任务数目:3
开始执行:task1 12-04-44
开始执行:task6 12-04-44
开始执行:task8 12-04-44
开始执行:task0 12-04-44
开始执行:task2 12-04-44
开始执行:task7 12-04-44
执行完毕:task1 12-04-46
开始执行:task3 12-04-46
执行完毕:task6 12-04-46
开始执行:task4 12-04-46
执行完毕:task8 12-04-46
开始执行:task5 12-04-46
执行完毕:task0 12-04-46
执行完毕:task2 12-04-46
执行完毕:task7 12-04-46
执行完毕:task3 12-04-48
执行完毕:task5 12-04-48
执行完毕:task4 12-04-48
由于我们采用的DiscardPolicy策略,第10个线程最后被抛弃了。
可以看到ThreadPoolExecutor虽然做了很多简化,但用的时候依然需要配置很多东西,所以官方文档也建议在没有什么特殊要求的情况下,可以使用Executors类所提供的几个静态方法来快速建立和使用线程池
However, programmers are urged to use the more convenient Executors factory methods Executors.newCachedThreadPool() , Executors.newFixedThreadPool(int) and Executors.newSingleThreadExecutor() , that preconfigure settings for the most common usage scenarios.
接下来我们就来学习一下Executors:
这个类没有可用的构造方法,都是一些静态方法:
-
1.newCachedThreadPool()
创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE。虽然容量可以很大,但是他也实现了复用。在有任务来临时,先查看池中有没有以前建立的线程,如果有就服用, 如果没有,就建一个新的线程加入池中。主要用于生存周期较短的异步任务。而且默认空闲线程的生成周期为60s,超时将被回收,需要时再重新建立。
我们可以看一下这个方法的实现:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
可见只是对ThreadPoolExecutor的封装,核心线程数为0,最大进程数为Integer.MAX_VALUE,也就是说新来的任务在没有空闲线程时,都创建新线程,空闲线程空闲60s后回收。
-
2.newFixedThreadPool(int)
创建一种定长线程池,任意时间都只有固定数目的线程活动,超过这个数目就等待,直到有空闲线程出现。多用于一些固定且稳定的并发任务具体实现如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
可见其核心进程和最大进程数是一样的,也就是能存在的最大线程数就是我们所指定的数目,且空闲进程存活时间为0,但也有复用机制,因为核心进程和最大进程数是一样的,由于使用的缓存队列为LinkedBlockingQueue,所以理论上有无限任务进行等待。
-
3.newSingleThreadExecutor()
单例型线程池,有且仅有一个工作线程。实现如下:
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
FinalizableDelegatedExecutorService只是一个包装类,具体还是依赖ThreadPoolExecutor。可见基本与newFixedThreadPool一样,只不过核心线程数和最大线程数都指定为1.
-
4.newScheduledThreadPool(int)
创建一个可安排执行周期任务的线程池。实现如下:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
可见是基于ScheduledThreadPoolExecutor实现的,这个类是继承于ThreadPoolExecutor。构造方法实现如下:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
核心线程数由我们指定,最大线程数为Integer.MAX_VALUE,空闲线程存活时间为0。这个类主要用于实现线程的调度。通过源码我们发现。他首先重写了execute方法(submit方法也一样):
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}
schedule主要是延迟执行一个任务。我们主要使用下面两个方法:
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
scheduleWithFixedDelay主要是指定两次任务的间隔,如下面代码就是在起始延迟1s后,每次任务执行完毕后延迟1s,再开始下一次任务:
SimpleDateFormat format = new SimpleDateFormat("dd-MM-ss-SSSS");
ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
MyTask task = new MyTask("task1" ,format);
System.out.println("start schedule" + format.format(new Date()));
service.scheduleWithFixedDelay(task,1000,1000,TimeUnit.MILLISECONDS);
打印log如下:
start schedule12-04-09-0957
开始执行:task1 12-04-10-0982
执行完毕:task1 12-04-12-0983
开始执行:task1 12-04-13-0984
执行完毕:task1 12-04-15-0984
开始执行:task1 12-04-16-0985
执行完毕:task1 12-04-18-0986
scheduleAtFixedRate主要是指定两次任务开始的间隔时间,若间隔时间小于任务执行时间,则在上一次任务结束后立即开始下一次,示例如下:
SimpleDateFormat format = new SimpleDateFormat("dd-MM-ss-SSSS");
ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
MyTask task = new MyTask("task1" ,format);
System.out.println("start schedule" + format.format(new Date()));
service.scheduleAtFixedRate(task,1000,1000,TimeUnit.MILLISECONDS);
打印log如下:
start schedule12-04-38-0516
开始执行:task1 12-04-39-0532
执行完毕:task1 12-04-41-0532
开始执行:task1 12-04-41-0532
执行完毕:task1 12-04-43-0572
开始执行:task1 12-04-43-0573
执行完毕:task1 12-04-45-0573
开始执行:task1 12-04-45-0573
执行完毕:task1 12-04-47-0580
开始执行:task1 12-04-47-0580
执行完毕:task1 12-04-49-0580
开始执行:task1 12-04-49-0580
注意两个方法都是无限循环的,实际运用要加上结束代码
-
5.newSingleThreadScheduledExecutor()
创建只有一个线程的ScheduledThreadPoolExecutor
网友评论