美文网首页
JUC-(4)线程池(下)

JUC-(4)线程池(下)

作者: 一个菜鸟JAVA | 来源:发表于2020-07-07 18:30 被阅读0次

之前介绍了如何创建线程池以及创建线程池时各参数的意义.现在将如果提交任务到线程池中去执行.

提交任务

提交任务到线程池大致分为三类:

  • 提交Runnable类型任务到线程池,没有返回值.
  • 提交Callable类型任务到线程池,返回异步执行结果.
  • 批量提交任务到线程池,返回批量异步执行结果.

提交Runnable类型任务到线程池

public class App {
    public static void main(String[] args) {
        //创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(2);

        //创建计算任务
        threadPool.execute(new CalTask1(1,2));


    }
}

//计算任务
class CalTask1 implements Runnable{
    private Integer x;
    private Integer y;

    public CalTask1(Integer x, Integer y) {
        this.x = x;
        this.y = y;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName()+":正在计算中");
        try {
            //模拟计算耗时
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }
        System.out.println("结果计算完毕: x + y = "+(x+y));
    }
}

上面代码中,通过调用execute提交执行任务.通过这种方式提交任务到线程池,我们不太好确认任务是否已经执行完毕,并且要获取任务的执行结果也不是太方便.这种方式适合执行那个不要关心任务结果的任务.

提交Callable类型任务到线程池

上面我们说过通过execute这种方式提交任务我们不太好确认线程是否执行完毕.那么现在说的这种方式可以让我们知道任务是否已经执行完毕.那么我们是如何知道任务被执行完成的呢?

Future和Callable

通过submit提交任务到线程池,它会返回一个Future类型的结果.Future用来代表一个异步执行的结果.通过这个我们可以知道任务是否被执行完毕,同时还能获取到任务执行完成之后的结果.关于什么是异步结果举个例子:例如你去商场吃饭,如果饭店人比较多所以你需要排队等待空位.你首先会去前台登记预定,这个时候你关注他们的微信公众号订阅他们的通知(这就是异步结果),然后你就可以去商城别的地方逛逛.等排到你了有了结果,你手机会收到微信通知你就知道轮到你了.
通过Futrue可以获取到任务的结果.这个结果其实就是通过任务实现Callable接口来实现的.相比于Runnable,Callable可以通过返回值来返回任务执行后的结果,然后通过调用Futureget方法来获取任务的返回结果.示例代码如下:

public class App {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(2);

        //创建计算任务
        Future<Integer> future = threadPool.submit(new CalTask2(1, 3));

        Integer result = future.get();

        System.out.println("任务执行结果为:"+result);


    }
}

//计算任务
class CalTask2 implements Callable<Integer>{
    private Integer x;
    private Integer y;

    public CalTask2(Integer x, Integer y) {
        this.x = x;
        this.y = y;
    }

    @Override
    public Integer call() throws Exception {
        //模拟计算耗时
        Thread.sleep(1000L);
        return x+y;
    }
}

当主线程调用get时,该方法会导致主线程阻塞.直到任务计算完成,然后该方法返回获取到计算结果.同时该方法也提供时间参数,它会在指定的时间内等待.如果超过等待时间,该方法还未能执行完成,将抛出TimeoutException异常.但是该任务还是会继续执行完成.

public class App {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        //创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(2);

        //创建计算任务
        Future<Integer> future = threadPool.submit(new CalTask2(1, 3));

        Integer result = future.get(500,TimeUnit.MILLISECONDS);

        System.out.println("任务执行结果为:"+result);


    }
}

//计算任务
class CalTask2 implements Callable<Integer>{
    private Integer x;
    private Integer y;

    public CalTask2(Integer x, Integer y) {
        this.x = x;
        this.y = y;
    }

    @Override
    public Integer call() throws Exception {
        //模拟计算耗时
        Thread.sleep(1000L);
        int i = x + y;
        System.out.println("执行完成");
        return i;
    }
}

修改部分代码,增加get方法超时时间.最后打印结果如下:

Exception in thread "main" java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
    at com.buydeem.ch6.App.main(App.java:16)
执行完成

结果中主线程抛出超时异常,但是任务还是会执行完成,打印出执行完成.

批量提交任务

当你有一批任务需要完成,则可以使用invokeAll批量提交任务.该方法与submit类似.不同之处在于invokeAll可以提交批量任务,而submit一次只能提交一个任务.同时该方法也支持超时设置.

public class App {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        //创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(2);

        //创建批量任务
        List<CalTask2> tasks = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            tasks.add(new CalTask2(i,i+1));
        }
        //该方法会阻塞等待所有线程执行完成
        List<Future<Integer>> futures = threadPool.invokeAll(tasks);

        for (Future<Integer> future : futures) {
            System.out.println(future.get());
        }

    }
}

//计算任务
class CalTask2 implements Callable<Integer>{
    private Integer x;
    private Integer y;

    public CalTask2(Integer x, Integer y) {
        this.x = x;
        this.y = y;
    }

    @Override
    public Integer call() throws Exception {
        //模拟计算耗时
        Thread.sleep(1000L);
        int i = x + y;
        System.out.println("执行完成");
        return i;
    }
}

然而在实际开发中,我们并不想等任务全部完成后进行.有的时候我们执行要其中有一个任务完成即可.这时候就可以使用invokeAny来实现.

public class App {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        //创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(2);

        //创建批量任务
        List<CalTask2> tasks = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            tasks.add(new CalTask2(i,i+1));
        }
        //该方法会阻塞等待任意线程执行完成
        Integer result = threadPool.invokeAny(tasks);

        System.out.println(result);

    }
}

//计算任务
class CalTask2 implements Callable<Integer>{
    private Integer x;
    private Integer y;

    public CalTask2(Integer x, Integer y) {
        this.x = x;
        this.y = y;
    }

    @Override
    public Integer call() throws Exception {
        //模拟计算耗时
        Thread.sleep(1000L);
        int i = x + y;
        System.out.println("执行完成");
        return i;
    }
}

上面程序最后的打印结果并不会固定,但是能确定的是会打印出一个数字结果.而执行完成打印次数至少是一次.批量提交到线程池中的任务,只要其中任意一个线程完成就会结束阻塞.同时如果任务还未被线程执行,那么这部分任务也将不会执行.而已经在线程中执行的任务,会继续执行完成.所以打印的执行完成次数并不固定.

关闭线程池

当我们执行完线程池中的任务后,程序并没有被终止.这是因为我们没有关闭线程池.在jdk中提供了两个关闭线程的方法:shutdownshutdownNow.这两个方法都可以用来关闭线程池,但是背后的逻辑并不一样.

  • shutdown:按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务.也就是说如果在调用shutdown之前,任务已经被提交到线程池.那么这部分线程将继续执行完.但是如果再提交行的任务,将执行线程池设置的拒绝策略.

  • shutdownNow:尝试停止所有正在运行中(阻塞和等待都算)的任务,并返回等待执行的任务列表,并将队列中的任务全部移除.该方法并不能保证能够停止正在处理中任务,但是会尽力尝试.它内部就是通过调用Thread.interrupt取消任务,如果程序没有做响应中断处理,任务将会一直运行下去.

public class App {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        //创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
//        ExecutorService threadPool = new ThreadPoolExecutor(2, 2,
//                0L, TimeUnit.MILLISECONDS,
//                new LinkedBlockingQueue<Runnable>(),new ThreadPoolExecutor.DiscardPolicy());
        //提交多个任务
        for (int i = 0; i < 5; i++) {
            threadPool.submit(new CalTask2(i,i+1));
        }
        //关闭线程池1
        threadPool.shutdown();
        //再次添加任务
        threadPool.submit(new CalTask2(100,200));

    }
}

class CalTask2 implements Callable<Integer>{
    private Integer x;
    private Integer y;

    public CalTask2(Integer x, Integer y) {
        this.x = x;
        this.y = y;
    }

    @Override
    public Integer call() throws Exception {
        //模拟计算耗时
        try {
            Thread.sleep(1000L);
        }catch (InterruptedException e){
            e.printStackTrace();
            System.out.println("中断异常");
            return null;
        }
        int i = x + y;
        System.out.println("执行完成");
        return i;
    }
}

上面的代码的执行结果如下:

Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@7f31245a rejected from java.util.concurrent.ThreadPoolExecutor@6d6f6e28[Shutting down, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
    at com.buydeem.ch6.App.main(App.java:25)
执行完成
执行完成
执行完成
执行完成
执行完成

当我们调用shutdown之后,再提交任务直接抛出异常.因为通过Executors.newFixedThreadPool(2)创建的线程池,使用的是默认的拒绝策略即抛出异常.如果我们使用注释代码中自己创建的线程池再次运行,执行结果并不会报错.而且我们提交的所有任务并没有中断执行,而是全部执行完毕.

修改上面代码如下:

    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        //创建线程池
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
//        ExecutorService threadPool = new ThreadPoolExecutor(2, 2,
//                0L, TimeUnit.MILLISECONDS,
//                new LinkedBlockingQueue<Runnable>(),new ThreadPoolExecutor.DiscardPolicy());
        //提交多个任务
        for (int i = 0; i < 5; i++) {
            threadPool.submit(new CalTask2(i,i+1));
        }
        //关闭线程池
        List<Runnable> list = threadPool.shutdownNow();
        System.out.println("list.size() = " + list.size());

    }
java.lang.InterruptedException: sleep interrupted
list.size() = 3
    at java.lang.Thread.sleep(Native Method)
中断异常
    at com.buydeem.ch6.CalTask2.call(App.java:41)
中断异常
    at com.buydeem.ch6.CalTask2.call(App.java:28)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at com.buydeem.ch6.CalTask2.call(App.java:41)
    at com.buydeem.ch6.CalTask2.call(App.java:28)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

从结果可以出来一共打印出来两个中断异常和两个中断异常,同时还打印出list.size() = 3.因为我们的线程池固定只有2个线程,所有返回未执行的任务数为3.打印的两个中断异常说明调用shutdownNow它会调用线程的中断方法.

取消任务

如果我将任务提交到线程池之后,我想取消任务怎么办?Future为我们提供了一个cancel方法,通过调用该方法我们可以取消任务.
API文档的介绍如下:试图取消对此任务的执行.如果任务已完成,或已取消,或者由于某些其他原因而无法取消,则此尝试将失败.当调用cancel时,如果调用成功,而此任务尚未启动,则此任务将永不运行.这部分还是很好理解.但是它有一个参数mayInterruptIfRunning,这个刚一开始我也没有理解是什么意思.下面通过这段代码来说明:

public class App2 {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(5);

        Future<?> future = service.submit(new Runnable() {
            @Override
            public void run() {
                int count = 0;
                while (count < 100000 && !Thread.currentThread().isInterrupted()) {
                    System.out.println("count = " + count);
                    count++;
                }
            }
        });
        //确保任务正在进行中
        Thread.sleep(50);
        //取消任务
        future.cancel(false);
        //关闭线程池
        service.shutdown();
    }
}

如果调用的为future.cancel(false),会将所有结果全部打印完全.如果调用future.cancel(true)大概率是无法全部打印完整的.API对参数mayInterruptIfRunning的说明如下:如果应该中断执行此任务的线程,则为 true;否则允许正在运行的任务运行完成.其实如果线程还未执行或者已近执行完成这个参数并没有影响,真正有影响的在于任务正在运行中.
如果任务正在运行中,调用future.cancel(false)并不会执行线程的中断方法,所以Thread.currentThread().isInterrupted()中断状态一直为false,所以结果会全部打印完整.如果调用为future.cancel(true),Thread.currentThread().isInterrupted()状态会在调用后变成true,所以结果打印不完整.
需要注意的是,如果你程序中并没有对中断信号做处理,调用cancel传入的参数不管是true还是false都不会导致任务中断.但是能响应中断异常.

相关文章

  • JUC-(4)线程池(下)

    之前介绍了如何创建线程池以及创建线程池时各参数的意义.现在将如果提交任务到线程池中去执行. 提交任务 提交任务到线...

  • JUC-(3)线程池(上)

    为什么要使用线程池 如果使用过阿里巴巴编码规约开发插件的人,当你通过new Thread()方式创建一个线程时,会...

  • 大白话聊聊线程池的工作原理和核心参数

    目录 1、为啥要使用线程池 2、线程池的工作原理 3、线程池都用哪些核心参数 4. 有界队列下的线程池的工作流程 ...

  • 线程池学习笔记

    1、线程池的定义 2、Executors创建线程池的方式 3、ThreadPoolExecutor对象 4、线程池...

  • 线程池

    1、为什么要使用线程池2、线程池的工作原理3、线程池参数4、阻塞队列5、饱和策略6、向线程池提交任务7、线程池的状...

  • 9.jdk线程池详解及实际运用

    4.线程池 参数 1)corePoolSize: 线程池的基本大小,当提交一个任务到线程池中,线程池会创建一个线程...

  • Kevin Learn Android:线程池(ThreadPo

    简介 使用 4 种常见线程池 定长线程池(FixedThreadPool) 特点:只有核心线程 & 不会被回收、线...

  • pthread 创建的动态线程池

    组织结构: 1,缓存工作的任务池,任务节点 2,存储工作线程的池,线程节点 3,任务池投递到工作线程的线程 4,工...

  • java中4种常用线程池

    java中4种常用线程池 一、线程池 线程池:说白了,就是一种线程使用模式。线程过多会带来调度开销,进而影响整体性...

  • JDK多任务执行框架

    1、为什么要使用线程池?2、线程池有什么作用?3、说说几种常见的线程池及使用场景。4、线程池都有哪几种工作队列?5...

网友评论

      本文标题:JUC-(4)线程池(下)

      本文链接:https://www.haomeiwen.com/subject/ctyzqktx.html