之前介绍了如何创建线程池以及创建线程池时各参数的意义.现在将如果提交任务到线程池中去执行.
提交任务
提交任务到线程池大致分为三类:
- 提交Runnable类型任务到线程池,没有返回值.
- 提交Callable类型任务到线程池,返回异步执行结果.
- 批量提交任务到线程池,返回批量异步执行结果.
提交Runnable类型任务到线程池
public class App {
public static void main(String[] args) {
//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);
//创建计算任务
threadPool.execute(new CalTask1(1,2));
}
}
//计算任务
class CalTask1 implements Runnable{
private Integer x;
private Integer y;
public CalTask1(Integer x, Integer y) {
this.x = x;
this.y = y;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+":正在计算中");
try {
//模拟计算耗时
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
System.out.println("结果计算完毕: x + y = "+(x+y));
}
}
上面代码中,通过调用execute
提交执行任务.通过这种方式提交任务到线程池,我们不太好确认任务是否已经执行完毕,并且要获取任务的执行结果也不是太方便.这种方式适合执行那个不要关心任务结果的任务.
提交Callable类型任务到线程池
上面我们说过通过execute
这种方式提交任务我们不太好确认线程是否执行完毕.那么现在说的这种方式可以让我们知道任务是否已经执行完毕.那么我们是如何知道任务被执行完成的呢?
Future和Callable
通过submit
提交任务到线程池,它会返回一个Future
类型的结果.Future
用来代表一个异步执行的结果.通过这个我们可以知道任务是否被执行完毕,同时还能获取到任务执行完成之后的结果.关于什么是异步结果举个例子:例如你去商场吃饭,如果饭店人比较多所以你需要排队等待空位.你首先会去前台登记预定,这个时候你关注他们的微信公众号订阅他们的通知(这就是异步结果)
,然后你就可以去商城别的地方逛逛.等排到你了有了结果,你手机会收到微信通知你就知道轮到你了.
通过Futrue
可以获取到任务的结果.这个结果其实就是通过任务实现Callable
接口来实现的.相比于Runnable
,Callable
可以通过返回值来返回任务执行后的结果,然后通过调用Future
的get
方法来获取任务的返回结果.示例代码如下:
public class App {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);
//创建计算任务
Future<Integer> future = threadPool.submit(new CalTask2(1, 3));
Integer result = future.get();
System.out.println("任务执行结果为:"+result);
}
}
//计算任务
class CalTask2 implements Callable<Integer>{
private Integer x;
private Integer y;
public CalTask2(Integer x, Integer y) {
this.x = x;
this.y = y;
}
@Override
public Integer call() throws Exception {
//模拟计算耗时
Thread.sleep(1000L);
return x+y;
}
}
当主线程调用get
时,该方法会导致主线程阻塞.直到任务计算完成,然后该方法返回获取到计算结果.同时该方法也提供时间参数,它会在指定的时间内等待.如果超过等待时间,该方法还未能执行完成,将抛出TimeoutException
异常.但是该任务还是会继续执行完成.
public class App {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);
//创建计算任务
Future<Integer> future = threadPool.submit(new CalTask2(1, 3));
Integer result = future.get(500,TimeUnit.MILLISECONDS);
System.out.println("任务执行结果为:"+result);
}
}
//计算任务
class CalTask2 implements Callable<Integer>{
private Integer x;
private Integer y;
public CalTask2(Integer x, Integer y) {
this.x = x;
this.y = y;
}
@Override
public Integer call() throws Exception {
//模拟计算耗时
Thread.sleep(1000L);
int i = x + y;
System.out.println("执行完成");
return i;
}
}
修改部分代码,增加get
方法超时时间.最后打印结果如下:
Exception in thread "main" java.util.concurrent.TimeoutException
at java.util.concurrent.FutureTask.get(FutureTask.java:205)
at com.buydeem.ch6.App.main(App.java:16)
执行完成
结果中主线程抛出超时异常,但是任务还是会执行完成,打印出执行完成
.
批量提交任务
当你有一批任务需要完成,则可以使用invokeAll
批量提交任务.该方法与submit
类似.不同之处在于invokeAll
可以提交批量任务,而submit
一次只能提交一个任务.同时该方法也支持超时设置.
public class App {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);
//创建批量任务
List<CalTask2> tasks = new ArrayList<>();
for (int i = 0; i < 5; i++) {
tasks.add(new CalTask2(i,i+1));
}
//该方法会阻塞等待所有线程执行完成
List<Future<Integer>> futures = threadPool.invokeAll(tasks);
for (Future<Integer> future : futures) {
System.out.println(future.get());
}
}
}
//计算任务
class CalTask2 implements Callable<Integer>{
private Integer x;
private Integer y;
public CalTask2(Integer x, Integer y) {
this.x = x;
this.y = y;
}
@Override
public Integer call() throws Exception {
//模拟计算耗时
Thread.sleep(1000L);
int i = x + y;
System.out.println("执行完成");
return i;
}
}
然而在实际开发中,我们并不想等任务全部完成后进行.有的时候我们执行要其中有一个任务完成即可.这时候就可以使用invokeAny
来实现.
public class App {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);
//创建批量任务
List<CalTask2> tasks = new ArrayList<>();
for (int i = 0; i < 5; i++) {
tasks.add(new CalTask2(i,i+1));
}
//该方法会阻塞等待任意线程执行完成
Integer result = threadPool.invokeAny(tasks);
System.out.println(result);
}
}
//计算任务
class CalTask2 implements Callable<Integer>{
private Integer x;
private Integer y;
public CalTask2(Integer x, Integer y) {
this.x = x;
this.y = y;
}
@Override
public Integer call() throws Exception {
//模拟计算耗时
Thread.sleep(1000L);
int i = x + y;
System.out.println("执行完成");
return i;
}
}
上面程序最后的打印结果并不会固定,但是能确定的是会打印出一个数字结果.而执行完成
打印次数至少是一次.批量提交到线程池中的任务,只要其中任意一个线程完成就会结束阻塞.同时如果任务还未被线程执行,那么这部分任务也将不会执行.而已经在线程中执行的任务,会继续执行完成.所以打印的执行完成
次数并不固定.
关闭线程池
当我们执行完线程池中的任务后,程序并没有被终止.这是因为我们没有关闭线程池.在jdk中提供了两个关闭线程的方法:shutdown
和shutdownNow
.这两个方法都可以用来关闭线程池,但是背后的逻辑并不一样.
-
shutdown
:按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务.也就是说如果在调用shutdown
之前,任务已经被提交到线程池.那么这部分线程将继续执行完.但是如果再提交行的任务,将执行线程池设置的拒绝策略. -
shutdownNow
:尝试停止所有正在运行中(阻塞和等待都算)的任务,并返回等待执行的任务列表,并将队列中的任务全部移除.该方法并不能保证能够停止正在处理中任务,但是会尽力尝试.它内部就是通过调用Thread.interrupt
取消任务,如果程序没有做响应中断处理,任务将会一直运行下去.
public class App {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);
// ExecutorService threadPool = new ThreadPoolExecutor(2, 2,
// 0L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>(),new ThreadPoolExecutor.DiscardPolicy());
//提交多个任务
for (int i = 0; i < 5; i++) {
threadPool.submit(new CalTask2(i,i+1));
}
//关闭线程池1
threadPool.shutdown();
//再次添加任务
threadPool.submit(new CalTask2(100,200));
}
}
class CalTask2 implements Callable<Integer>{
private Integer x;
private Integer y;
public CalTask2(Integer x, Integer y) {
this.x = x;
this.y = y;
}
@Override
public Integer call() throws Exception {
//模拟计算耗时
try {
Thread.sleep(1000L);
}catch (InterruptedException e){
e.printStackTrace();
System.out.println("中断异常");
return null;
}
int i = x + y;
System.out.println("执行完成");
return i;
}
}
上面的代码的执行结果如下:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@7f31245a rejected from java.util.concurrent.ThreadPoolExecutor@6d6f6e28[Shutting down, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
at com.buydeem.ch6.App.main(App.java:25)
执行完成
执行完成
执行完成
执行完成
执行完成
当我们调用shutdown
之后,再提交任务直接抛出异常.因为通过Executors.newFixedThreadPool(2)
创建的线程池,使用的是默认的拒绝策略即抛出异常.如果我们使用注释代码中自己创建的线程池再次运行,执行结果并不会报错.而且我们提交的所有任务并没有中断执行,而是全部执行完毕.
修改上面代码如下:
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
//创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(2);
// ExecutorService threadPool = new ThreadPoolExecutor(2, 2,
// 0L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>(),new ThreadPoolExecutor.DiscardPolicy());
//提交多个任务
for (int i = 0; i < 5; i++) {
threadPool.submit(new CalTask2(i,i+1));
}
//关闭线程池
List<Runnable> list = threadPool.shutdownNow();
System.out.println("list.size() = " + list.size());
}
java.lang.InterruptedException: sleep interrupted
list.size() = 3
at java.lang.Thread.sleep(Native Method)
中断异常
at com.buydeem.ch6.CalTask2.call(App.java:41)
中断异常
at com.buydeem.ch6.CalTask2.call(App.java:28)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at com.buydeem.ch6.CalTask2.call(App.java:41)
at com.buydeem.ch6.CalTask2.call(App.java:28)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
从结果可以出来一共打印出来两个中断异常
和两个中断异常,同时还打印出list.size() = 3
.因为我们的线程池固定只有2个线程,所有返回未执行的任务数为3
.打印的两个中断异常说明调用shutdownNow
它会调用线程的中断方法.
取消任务
如果我将任务提交到线程池之后,我想取消任务怎么办?Future为我们提供了一个cancel方法,通过调用该方法我们可以取消任务.
API文档的介绍如下:试图取消对此任务的执行.如果任务已完成,或已取消,或者由于某些其他原因而无法取消,则此尝试将失败.当调用cancel时,如果调用成功,而此任务尚未启动,则此任务将永不运行.
这部分还是很好理解.但是它有一个参数mayInterruptIfRunning
,这个刚一开始我也没有理解是什么意思.下面通过这段代码来说明:
public class App2 {
public static void main(String[] args) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(5);
Future<?> future = service.submit(new Runnable() {
@Override
public void run() {
int count = 0;
while (count < 100000 && !Thread.currentThread().isInterrupted()) {
System.out.println("count = " + count);
count++;
}
}
});
//确保任务正在进行中
Thread.sleep(50);
//取消任务
future.cancel(false);
//关闭线程池
service.shutdown();
}
}
如果调用的为future.cancel(false)
,会将所有结果全部打印完全.如果调用future.cancel(true)
大概率是无法全部打印完整的.API对参数mayInterruptIfRunning
的说明如下:如果应该中断执行此任务的线程,则为 true;否则允许正在运行的任务运行完成
.其实如果线程还未执行或者已近执行完成这个参数并没有影响,真正有影响的在于任务正在运行中.
如果任务正在运行中,调用future.cancel(false)
并不会执行线程的中断方法,所以Thread.currentThread().isInterrupted()
中断状态一直为false
,所以结果会全部打印完整.如果调用为future.cancel(true)
,Thread.currentThread().isInterrupted()
状态会在调用后变成true
,所以结果打印不完整.
需要注意的是,如果你程序中并没有对中断信号做处理,调用cancel
传入的参数不管是true
还是false
都不会导致任务中断.但是能响应中断异常.
网友评论