概述
引入
我们之前记录了Executor
接口,它是用来将子任务的创建和启动两个操作进行解耦而出现的,我们在上一节中的“使用思路“构建了n个子线程用来不停的执行到来的任务。但是我们的例子使用并不方便,比如:
- 我们只提供无返回结果的子任务执行,如果有结果穿出就得自己在子任务中做变量同步
- 只提供了执行接口,并利用守护线程和用户线程的关系完成退出,没有提供标准的线程池的关闭功能
- 只有单独执行,没有批量操作
我们本次介绍的新的接口ExecutorService
也是基于这些方面对Executor
进行了拓展。
摘要
本文主要介绍ExecutorService
的各个接口的设计意图,并对上一节我们设计的例子进行了进一步的完善。
类介绍
类定位
ExecutorService
是线程池框架的一个重要的借口,它定义了一个作为线程池的实现类必须要实现的几个方法。
注意
在实现ExecutorService
接口时要注意线程安全,此类是有可能同时被多个线程操作的,比如同时加入任务,同时调用关闭等等。
源码解读
提交任务
/**
* 提交一个有返回值的任务,并返回一个可以检测此任务状态和取消此任务的 Future 。
* 此函数只是提交任务,具体是直接执行还是排队等着就看各自实例的实现了。【有点像 Executors.execute()】
*
*
* @param task 任务
* @param <T> 返回值类型
* @return 代表此任务的 Future
* @throws RejectedExecutionException 拒绝执行此任务时抛出
* @throws NullPointerException 入参为空时抛出
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一个没有返回值的任务,和完成此任务后返回的值,其他和上面的一样
*
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 返回的结果是 null ,其他的和上面一样
*
*/
Future<?> submit(Runnable task);
批量提交任务
/**
* 批量提交任务并进行执行、等待。
* 注意这里不同上面单次提交任务,这里提交后要等待任务全部执行完成后才能返回,
* 如果你主线程中有其他的并行操作而且追求调用效率,可以自己用循环调用 submit ,如果图方便的话还是可以直接
* 调用这个的。
* 入参的集合结构不能变,入参的集合结构不能变,入参的集合结构不能变,重要的话说三次
*
* @param tasks 任务集合
* @param <T> 任务集返回值类型
* @return 一串代表任务的 Future ,和入参集合的遍历顺序相同
* @throws InterruptedException 等待执行结果时遇到了线程中断时抛出,未完成的任务会被取消
* @throws NullPointerException 入参集合为 null 或者入参集合中有元素为 null时抛出
* @throws RejectedExecutionException 如果有任务被拒绝执行时抛出
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 批量执行任务、并限时等待。
* 时间到达时未完成的任务会被取消。其他的和上面函数相同
*
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 批量执行任务,返回第一个完成执行的任务,并取消其他的任务。
*
* 其他的和上面的 invokeAll() 一样
*
* @throws ExecutionException 如果提交的所有任务都没有执行成功
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 增加了限时要求。
*
*
* @throws TimeoutException 在给定时间内没有一个执行成功时抛出
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
状态检测
/**
* 判断线程池是否被关闭【不接受新任务】
*/
boolean isShutdown();
/**
* 判断线程池是否已经终结【不接受新任务而且现有的所有任务都结束,可以安全回收此实例的那种】
*/
boolean isTerminated();
关闭
/**
* 开始进行有序的关闭,不允许再提交任务,现有的任务会继续执行。
*
* 此方法会立刻返回,你需要调用 awaitTermination() 等待现有任务执行完
*
*
* @throws SecurityException 【后面再详细了解吧】
*/
void shutdown();
/**
* 尝试终止所有正在执行的任务,停止等待的任务,然后把等待的任务列表返回。
*
* 终止正在执行的任务我们仅做了线程中断,至于子线程什么时候反应,要多久反应,
* 此方法不会阻塞等待,如果有需要就用 awaitTerminantion() 阻塞主线程等待
*
* 如果有子线程对中断不反应,继续执行他的任务,我们也没办法,我们只能告诉它“该停了”,
* 其他的也没什么用了。
*
* @return 提交后未执行的任务
* @throws SecurityException
*/
List<Runnable> shutdownNow();
使用示例
示例代码——MyExecutorService
package com.gateway.concurrent.pool;
import java.util.*;
import java.util.concurrent.*;
public class MyExecutorService implements ExecutorService {
private final Thread[] threads = new Thread[POOL_SIZE];
private final BlockingQueue<Runnable> tasks = new LinkedBlockingDeque<>();
private volatile boolean ifAcceptNewTask = true;
private volatile boolean ifTryToStopThread = false;
private final static int POOL_SIZE = 5;
private final static long SLEEP_UNIT = 1000;
public MyExecutorService() {
for (int i=0;i<POOL_SIZE;i++){
threads[i] = new Thread(()->{
while (true){
// 取任务
try {
Runnable runnable = null;
runnable = tasks.take();
runnable.run();
} catch (InterruptedException e) {
// 取任务时被中断
e.printStackTrace();
if (ifTryToStopThread){// 要停止线程了
System.out.println("此线程收到结束中断,退出执行");
break;
}
}
}
});
threads[i].start();
}
}
@Override
public void shutdown() {
// 不接受新任务
this.ifAcceptNewTask = false;
}
@Override
public List<Runnable> shutdownNow() {
this.ifAcceptNewTask = false;
this.ifTryToStopThread=true;
List<Runnable> unExecutedTasks = new ArrayList<>();
tasks.drainTo(unExecutedTasks);
for (int i =0;i<POOL_SIZE;i++){
if (threads[i].isAlive()){
threads[i].interrupt();
}
}
return unExecutedTasks;
}
@Override
public boolean isShutdown() {
return this.ifAcceptNewTask;
}
@Override
public boolean isTerminated() {
for (int i=0;i<POOL_SIZE;i++){
if (threads[i].isAlive()){
return false;
}
}
return true;
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
Long deadLine = System.currentTimeMillis()+unit.toNanos(timeout);
while (!isTerminated()){
// 超时
if (System.currentTimeMillis() > deadLine){
return false;
}
// 等待
if (System.currentTimeMillis() - deadLine > SLEEP_UNIT){
Thread.currentThread().sleep(SLEEP_UNIT);
}
}
return true;
}
@Override
public <T> Future<T> submit(Callable<T> task) {
// TODO 这里加一下对线程池状态的判断,不要直接加
FutureTask<T> temp = new FutureTask<>(task);
try {
tasks.put(temp);
} catch (InterruptedException e) {
e.printStackTrace();
// 被打断干脆就不等了,其实讲真估计也不会到这一步,
// 毕竟LinkedBlockingQueue是无界到,也不会满
throw new RejectedExecutionException("任务队列已满,阻塞等待时被打断",e);
}
return temp;
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return this.submit(()->{
task.run();
return result;
});
}
@Override
public Future<?> submit(Runnable task) {
return this.submit(task,null);
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
if (Objects.isNull(tasks)){
throw new NullPointerException("入参不能为空");
}
// 创建任务
Iterator<Callable<T>> iterator = (Iterator<Callable<T>>) tasks.iterator();
List<Future<T>> result = new ArrayList<>();
while (iterator.hasNext()){
result.add(this.submit(iterator.next()));
}
// 等待任务完成
Iterator<Future<T>> resultIterator = result.iterator();
while (resultIterator.hasNext()){
try {
resultIterator.next().get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
return result;
}
// 思路差不多,不写了
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
return null;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return null;
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
@Override
public void execute(Runnable command) {
if (Objects.isNull(command)){
throw new NullPointerException("入参不能为空");
}
try {
this.tasks.put(command);
} catch (InterruptedException e) {
e.printStackTrace();
// 被打断干脆就不等了,其实讲真估计也不会到这一步,
// 毕竟LinkedBlockingQueue是无界到,也不会满
throw new RejectedExecutionException("任务队列已满,阻塞等待时被打断",e);
}
}
}
示例代码——Main
函数
package com.gateway.concurrent.pool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class Pool1 {
public static void main(String args[]){
MyExecutorService myExecutorService = new MyExecutorService();
List<Future<String>> futureTasks = new ArrayList<>();
for (int i = 0; i < 40; i++) {
futureTasks.add(myExecutorService.submit(new TestCalss("任务" + i)));
}
// 主线程继续做一些别的事
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < futureTasks.size(); i++) {
try {
System.out.println(futureTasks.get(i).get());
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("等待过程中被打断,重新等待这个任务结束");
i--;
} catch (ExecutionException e) {
e.printStackTrace();
System.out.println("任务执行失败");
}
}
// 关闭线程池
myExecutorService.shutdown();
try {
// 如果可以正常关闭,最好
// 当然,我们自己写到代码,MyExecutorService 是肯定不能正常关闭的
// 我们这里的等待时间是给任务队列中尚未执行的任务的
if (myExecutorService.awaitTermination(1000, TimeUnit.NANOSECONDS)){
return;
}
myExecutorService.shutdownNow();
// 我们这里的等待时间是给线程池中的线程停止正在执行的任务的
if (myExecutorService.awaitTermination(1000, TimeUnit.NANOSECONDS)){
return;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class TestCalss implements Callable<String> {
private String name;
public TestCalss(String name) {
this.name = name;
}
@Override
public String call() throws Exception {
Long sleepTime = (long) (Math.random() * 1000);
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
return name + " 完成,用时 " + sleepTime + " 毫秒";
}
}
}
核心逻辑介绍
没啥说的,个人感觉相比于上一个版本,最大的进步就是:
- 可以返回值了
- 可以主动控制线程池的关闭回收了【这次我们没有将线程池中的线程设置为守护线程,你能正常运行完程序就表示你成功关闭了线程池里的五条线程】
使用思路
使用更加简便,我们一般都是作为线程池的使用者的,所以在大多数情况下不是很关系线程池的内部逻辑,我们调用的思路是:
- 创建/获得一个线程池实例
- 任务入线程池
- 获得任务结果
- 完成所有操作后关闭线程池
网友评论