上一篇我们讲到 Java 中关于线程的一些小知识,比如开启一个新的线程,线程的生命周期,线程中的变量等。那我们为什么要新开一个线程呢?
1)加快程序的执行速度
2)防止线程阻塞导致线程崩溃
3)在Android 4.0明确必须把耗时操作放在子线程执行,主线程只能执行UI操作。
但我们要知道,线程在系统里其实是个稀缺又宝贵的资源,同时,频繁地开启 / 关闭线程也是很耗费系统资源的,为什么这么说的?
- 开启一个线程的时候就会给系统增加 1 M 的内存,如果我们无限制地开启线程,就可能给系统带来 OOM 的危险。
- 线程开启时,其实是CPU给这个线程分配了时间片,这是因为 CPU 里面存在的一个时间片轮转机制,当CPU给线程分配了时间片,该线程就可以执行(前提是该线程处于一个可执行状态)因此,在多线程的情况下,某一线程没有获得时间片,该线程就需要被挂起,此时还会发生一次上下文切换,而当CPU给该线程分配时间片时,又需要唤醒该线程,此时又会发生一次上下文切换。如果在很多线程的情况下发生上下文切换的时候也是很耗费系统性能的。
- 一个线程的生命周期可以分为 T1:开启线程,T2:执行任务逻辑,T3:关闭线程。在多线程的情况下,如果T2的时间执行的时间相对较少,那就把时间都耗费在频繁的开启 / 关闭线程上了,这显然是不可取的。
因此就提出了 "线程池" 的概念,将一些线程保存到一个容器里面,当需要使用线程时直接从该容器里取,使用完成后再放回容器里。
Java中的线程池
1)我们先来了解一下在 Java 中线程池的 "生命联系":
我们平时使用线程池时经常用到的类是 ThreadPoolExecutor 这个类,进入源码可以发现,他是继承自AbstractExecutorService ,然后进去到 AbstractExecutorService ,发现他实现了 ExecutorService 这个接口,而 ExecutorService 又继承自 Executor 这个接口,因此,实际上我们经常用的 ThreadPoolExecutor 是实现了 Executor这个接口 。
public class ThreadPoolExecutor extends AbstractExecutorService
------------------------------------------------------------
public abstract class AbstractExecutorService implements ExecutorService
-------------------------------------------------------------
public interface ExecutorService extends Executor
2)根据阿里的Java使用手册中指出是不允许直接使用JDK提供的默认线程池,必须使用自定义线程池,在使用线程池时我们经常用到的类是 ThreadPoolExecutor ,他有 6 个方法参数,每个参数是什么意思呢:
//JDK默认的线程池,这里只举例一个,还有其他的默认线程池,在阿里规范手册里都是不允许直接使用的
ExecutorService pool = Executors.newCachedThreadPool()
---------------------------------------------------------------------------------
//自定义线程池
//第一个 int 型参数为corePoolSize
//第二个 int 型参数为maximumPoolSize
//第三个 int 型参数为keepAliveTime
ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(5), new ThreadPoolExecutor.DiscardOldestPolicy());
-
corePoolSize(核心线程数):当提交一个任务时,线程池会新建一个线程,直到当前线程数等于 corePoolSize ,如果当前线程数等于 corePoolSize 时,再提交的任务,会被保存到阻塞队列中,等待被执行。
如果执行了 线程池 的 prestartAllCoreThreads() 方法,线程池会提前创建并启动所有核心线程。 - maximumPoolSize(最大线程数):线程池中允许的最大线程数,当阻塞队列满了,且继续添加任务时,则继续创建线程,执行任务,前提是当前线程数小于最大线程数。
- keepAliveTime(线程空闲时的存活时间):指的是当线程没有执行任务时,继续存活的时间,默认情况下,这个参数只有在当前线程数大于 corePoolSize 时才会生效。
- TimeUnit(时间单位):keepAliveTime的时间单位。
- workQueue(阻塞队列):阻塞队列必须是 BlockingQueue(阻塞队列)。当线程池中的当前线程数超过它的 corePoolSize 时,线程就会进入阻塞队列进行阻塞等待,通过 workQueue,线程池实现了阻塞功能
在选用 workQueue 时最好使用的是有界队列,如果使用无界队列,会造成以下影响:
1) corePoolSize 满时会加入到阻塞队列进行等待,而选用无界队列时,线程池中的数量将不可能超过 corePoolSize。因此 maximumPoolSize 这个参数无效。
2)由于线程池中的数量将不可能超过 corePoolSize,因此 keepAliveTime 这个参数也无效。
3)更重要一点的是,使用无界队列有可能会耗尽系统资源,而使用有界队列有利于防止系统资源耗尽,同时使队列处于一个合适大小的范围内。
- threadFactory(线程的工厂):通过自定义线程工程可以给每个新建的线程设置别名,以及设置所有线程是否是守护线程等。
-
RejectedExecutionHandler(拒绝策略):指的是当阻塞队列满了,且没有空闲线程时,如果此时继续添加线程,则必须采用一种策略来处理这个线程(线程池提供了 4 种策略):
1)AbortPolicy:直接抛出异常,默认策略;
2)CallerRunsPolicy:用调用者所在的线程来执行任务;(调用者还有空来调用线程池,说明调用者还有空余操作"空间",所以此时把任务交由给调用者执行);
3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
4)DiscardPolicy:直接丢弃任务;
或者也可以根据自己的需要定制自己的 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
线程池的工作机制
1)如果当前线程数 少于 核心线程数,则直接在线程池中创建线程执行任务(注意:执行这一步需要获取全局锁);
2)如果运行的线程大于核心线程数,则将任务加入到 BlockingQueue;
3)如果无法加入到 BlockingQueue(队列已满),则创建新的线程来执行任务;
4)如果创建新线程使当前线程超过线程超过最大线程数 (maximumPoolSize) ,则会调用拒绝策略 (RejectedExecutionHandler) 来处理新线程,调用RejectedExecutionHandler.rejectedExecution()方法。
提交任务
在线程池中提交任务有 execute() 和 submit() 两个方法,那你知道两个方法的区别吗?
- execute():使用这个方法用于提交不需要返回任务结果的任务,所以无法知道任务是否被线程池执行成功。
- submit():使用这个方法用于提交需要返回任务结果的任务,会返回一个 future类型的参数,通过这个 future 可以用来判断任务是否被线程池执行成功,同时可以通过 get() 方法来过去返回值, get() 方法会阻塞线程直到任务成功,而使用 get(long timeout,TimeUnit unit) 这个方法会阻塞一段时间任务时间后立即返回,使用这个方法有可能任务还没执行完成就返回。
关闭线程池
在使用完线程池执行完任务后,要记得关闭线程池,否则线程池一直处于等待状态导致程序会一直运行。而关闭线程池也存在两个方法 shutdown() 和 shutdownNow()。它们的原理是调用线程的 interrupt() 方法来中断线程,而无法响应中断的线程可能永远无法停止。
- shutdown():是将线程池设置为 SHUTDOWN 状态,然后停止线程池中没有正在执行任务的线程。
-
shutdownNow():将线程池状态设置为 STOP 状态,然后尝试停止所有正在执行 或暂停任务的线程,并返回等待执行任务的列表。
只要调用了任意一个关闭方法,isShutdown() 方法就会返回 true,只当所有任务都关闭时后再调用 isTerminaed() 方法返回才是 true。
//工作线程
static class Worker implements Runnable {
private String taskName;
private Random random = new Random();
public Worker(String taskName) {
this.taskName = taskName;
}
public String getTaskName() {
return taskName;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()
+ " process the task : " + taskName);
SleepTools.ms(random.nextInt(100) * 5);
}
}
static class CallWorker implements Callable<String> {
private String taskName;
private Random random = new Random();
public CallWorker(String taskName) {
this.taskName = taskName;
}
public String getTaskName() {
return taskName;
}
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName()
+ " process the task : " + taskName);
SleepTools.ms(random.nextInt(100) * 5);
return Thread.currentThread().getName() + ":" + random.nextInt(100) * 5;
}
}
public static void main(String[] args) {
//自定义线程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(5), new ThreadPoolExecutor.DiscardOldestPolicy());
//系统自带的
// ExecutorService pool = Executors.newCachedThreadPool()
//执行execute
for (int i = 0; i < 6; i++) {
Worker worker = new Worker("Worker_"+i);
pool.execute(worker);
}
//执行submit
for (int i = 0;i<6;i++){
CallWorker callWorker = new CallWorker("CallWorker_"+i);
//获得任务执行完返回的 future
Future<String> future = pool.submit(callWorker);
try {
//获取返回的结果
String result = future.get();
System.out.println(result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
//使用完线程池后,记得关闭线程池,否则系统一直等待
pool.shutdown();
// pool.shutdownNow();
}
}
致敬学习~
网友评论