美文网首页
FutureTask(1) —— 先认识一下FutureTask

FutureTask(1) —— 先认识一下FutureTask

作者: 若琳丶 | 来源:发表于2019-12-14 19:01 被阅读0次

    一、前言

    最近一直在研究Java并发‘’相关的知识,从Thread到ThreadPoolExecutor,从synchronize到AQS。此次研究一下 FutureTask以及其相关的类的工作方式,与FutureTask相关的类包括:Callable、Runnable、Future和ThreadPoolExecutor等,我们一点一点的来慢慢引入。

    二、Callable 和 Runnable

    2.1 Runnable

    Runnable 是一个接口,它只有一个 run 方法。关于这个接口存在的意义,有一篇文章中说的非常贴切。

    创建线程最重要的是传递一个run()方法, 这个run方法定义了这个线程要做什么事情, 它被抽象成了Runnable接口。

    这句话可以作为理解接口的一种角度,接口的本质是定义规则,以及这些规则包含哪些动作。对于 Runnable 接口,它就定义了一个规则 —— 可运行。

    public interface Runnable {
        public abstract void run();
    }
    
    

    从方法中可以看出,Runnable 中的run方法没有返回值,也无法抛出异常。这就意味着,我们用Runnable,就只能去执行一个没有反馈的任务,任务是否执行完毕,执行过程中是否存在异常,主线程无法感知到,全凭 Runnable 自己去处理。如果我们遇到使用多线程去做结果归纳的需求该怎么办?我们希望主线程在子线程执行完毕后拿到子线程的计算结果,进行归纳。

    2.2 Callable

    Callable 也是一个接口,这个接口所定义的规则是 call 方法。

    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }
    
    

    对比Callable接口与Runnable接口, 我们可以发现它们最大的不同点在于:

    • Callable有返回值
    • Callable可以抛出异常

    关于有返回值这点,我们并不意外,因为这就是我们的需求,call方法的返回值类型采用的泛型,该类型是我们在创建Callable对象的时候指定的。

    除了有返回值外,相较于Runnable接口,Callable还可以抛出异常,这点看上去好像没啥特别的,但是却有大用处——这意味着如果在任务执行过程中发生了异常,我们可以将它向上抛出给任务的调用者来妥善处理,我们甚至可以利用这个特性来中断一个任务的执行。而Runnable接口的run方法不能抛出异常,只能在方法内部catch住处理,丧失了一定的灵活性。

    好了,既然Callable可以返回运算结果,我们来尝试获取一下:

        public static void main(String[] args) {
            Callable<String> myCallable = () -> {
                try {
                    Thread.sleep(2000);
                } catch (Exception e) {
                }
                return "done";
            };
            System.out.println("callable 开始执行");
            try {
                String result = myCallable.call();
                System.out.println("Callable 执行的结果是: " + result);
            } catch (Exception e) {
                System.out.println("There is a exception.");
            }
        }
    

    我们成功的调用了call方法,并且拿到了执行结束的结果“done”,看样子像是满足了前面的需求。但是还存在几个问题:

    • call方法是在当前线程中直接调用的, 无法利用多线程。
    • call方法可能是一个特别耗时的操作, 这将导致程序停在myCallable.call()调用处, 无法继续运行, 直到call方法返回。
    • 如果call方法始终不返回, 我们没办法中断它的运行。

    实际上这段代码自始至终只有主线程一条线程去完成这件事的。因此, 理想的操作应当是, 我们将call方法提交给另外一个线程执行, 并在合适的时候, 判断任务是否完成, 然后获取线程的执行结果或者撤销任务, 这种思路的实现就是Future接口。

    三、Future

    Future接口被设计用来代表一个异步操作的执行结果。你可以用它来获取一个操作的执行结果、取消一个操作、判断一个操作是否已经完成或者是否被取消。

    public interface Future<V> {
        V get() throws InterruptedException, ExecutionException;
        V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
        
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        
        boolean isDone();
    }
    

    Future接口一共定义了5个方法:

    • get():该方法用来获取执行结果, 如果任务还在执行中, 就阻塞等待;
    • get(long timeout, TimeUnit unit):该方法同get方法类似, 所不同的是, 它最多等待指定的时间, 如果指定时间内任务没有完成, 则会抛出TimeoutException异常;
    • cancel(boolean mayInterruptIfRunning):该方法用来尝试取消一个任务的执行, 它的返回值是boolean类型, 表示取消操作是否成功.
    • isCancelled():该方法用于判断任务是否被取消了。如果一个任务在正常执行完成之前被cancel掉了, 则返回true
    • isDone():如果一个任务已经结束, 则返回true。注意, 这里的任务结束包含了以下三种情况:
      1. 任务正常执行完毕
      2. 任务抛出了异常
      3. 任务已经被取消

    3.1 cancel 方法需要注意的地方

    关于cancel方法,这里要补充说几点:
    首先有以下三种情况之一的,cancel操作一定是失败的:

    • 任务已经执行完成了

    • 任务已经被取消过了

    • 任务因为某种原因不能被取消
      其它情况下,cancel操作将返回true。值得注意的是,cancel操作返回true并不代表任务真的就是被取消了,这取决于发动cancel状态时任务所处的状态:

    • 如果发起cancel时任务还没有开始运行,则随后任务就不会被执行;

    • 如果发起cancel时任务已经在运行了,则这时就需要看mayInterruptIfRunning参数了:

      • 如果mayInterruptIfRunning 为true, 则当前在执行的任务会被中断
      • 如果mayInterruptIfRunning 为false, 则可以允许正在执行的任务继续运行,直到它执行完

    总结来说,Future提供了三种功能:

    1. 判断任务是否完成;
    2. 能够中断任务;
    3. 能够获取任务执行结果。

    我们回顾一下我们的需求:

    我们将call方法提交给另外一个线程执行, 并在合适的时候, 判断任务是否完成, 然后获取线程的执行结果或者撤销任务。

    我们需要将 Callable 交给线程池,让线程池去开启一条子线程去执行 call方法,并借助 Future 去操作任务。我们看一下具体用法:

        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(
                    5,
                    10,
                    200,
                    TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(5),
                    new ThreadFactoryBuilder().build(),
                    new ThreadPoolExecutor.AbortPolicy());
    
            Map<String, String> map = Maps.newHashMap();
    
            Future<Map<String, String>> future1 = executor.submit(() -> {
                Thread.sleep(3000);
                Map<String, String> map1 = Maps.newHashMap();
                map1.put("name", "zhangsan");
                System.out.println("子线程执行完毕");
                return map1;
            });
            executor.shutdown();
    
            System.out.println("---main over---");
    
            map.putAll(future1.get());
            System.out.println("result: " + map);
        }
    

    执行中取消:

        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                10,
                200,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(5),
                new ThreadFactoryBuilder().build(),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            cancelWhenRunning();
        }
    
        private static void cancelWhenRunning() throws ExecutionException, InterruptedException {
            Map<String, String> map = Maps.newHashMap();
            Future<Map<String, String>> future1 = executor.submit(() -> {
                Thread.sleep(3000);
                Map<String, String> map1 = Maps.newHashMap();
                map1.put("name", "zhangsan");
                System.out.println("子线程执行完毕");
                return map1;
            });
            executor.shutdown();
            System.out.println("---main over---");
    
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
            }
    
            future1.cancel(true);
            map.putAll(future1.get());
            System.out.println("result: " + map);
        }
    

    执行结果:

    ---main over---
    Exception in thread "main" java.util.concurrent.CancellationException
        at java.util.concurrent.FutureTask.report(FutureTask.java:121)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at com.angryjoe.concurrency.treadpool.futrue.FutrueApp2.cancelWhenRunning(FutrueApp2.java:58)
        at com.angryjoe.concurrency.treadpool.futrue.FutrueApp2.main(FutrueApp2.java:21)
    

    mayInterruptIfRunning 为 true,子线程没有执行完成,就被取消掉了。取消之后在调用 get 方法则会抛出异常。
    如果mayInterruptIfRunning 为 false,可以发现,取消之后在调用 get 方法依然会抛出异常,但是子线程可以执行结束。

    ---main over---
    Exception in thread "main" java.util.concurrent.CancellationException
        at java.util.concurrent.FutureTask.report(FutureTask.java:121)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at com.angryjoe.concurrency.treadpool.futrue.FutrueApp2.cancelWhenRunning(FutrueApp2.java:58)
        at com.angryjoe.concurrency.treadpool.futrue.FutrueApp2.main(FutrueApp2.java:21)
    子线程执行完毕
    

    四、FutureTask

    FutureTask是Future接口的一个实现类。

    public class FutureTask<V> implements RunnableFuture<V>
    

    FutureTask类实现了RunnableFuture接口,我们看一下RunnableFuture接口的实现:

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();
    }
    

    FutureTask实现了该接口,也就是相当于它同时实现了Runnable接口和Future接口,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
    有的同学可能会对这个接口产生疑惑,既然已经继承了Runnable,该接口自然就继承了run方法,为什么要在该接口的内部再写一个run方法?
    单纯从理论上来说,这里确实是没有必要的,再多写一遍,我觉得大概就是为了看上去直观一点,便于文档或者UML图展示。
    实际上 FutureTask 与 Future 的用法区别不大:

        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5,
                10,
                200,
                TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(5),
                new ThreadFactoryBuilder().build(),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            futureTask();
        }
    
        private static void futureTask() throws ExecutionException, InterruptedException {
            Map<String, String> map = Maps.newHashMap();
    
            FutureTask<Map<String, String>> mapFutureTask = new FutureTask<>(() -> {
                Thread.sleep(3000);
                Map<String, String> map1 = Maps.newHashMap();
                map1.put("name", "zhangsan");
                System.out.println("子线程执行完毕");
                return map1;
            });
    
            executor.submit(mapFutureTask);
            executor.shutdown();
            System.out.println("---main over---");
    
            map.putAll(mapFutureTask.get());
            System.out.println("result: " + map);
        }
    

    执行结果与 Future 无异。通过看demo可发现两点不同:

    • 在使用 Future 时,我们是直接将 Callable 交给线程池去执行 submit 方法,而且要获取 submit 的返回值 Future
    • 在使用 FutureTask 时,我们先将 Callable 交给 FutureTask,然后再将 FutureTask 交给线程池去执行 submit 方法,此处就不需要 submit 的返回值

    五、总结

    本文层层介绍 Callable、Future、FutureTask的基本特征和使用方法。下一篇将跟随 FutureTask的源码来探究其工作原理。

    PS:当然,我自己是无法写出很浅显易懂富有条理的文章,所以当然有参考~
    https://www.cnblogs.com/dolphin0520/p/3949310.html
    https://segmentfault.com/a/1190000016542779#item-2-1
    这两篇文章讲的非常好,此处推荐阅读原文。
    (侵删)

    如果有解释不清或者缺失的地方,还望在下方留言,大家一起学习,一起交流,一起进步。

    相关文章

      网友评论

          本文标题:FutureTask(1) —— 先认识一下FutureTask

          本文链接:https://www.haomeiwen.com/subject/kkqhnctx.html