美文网首页
Java线程-Callable、Future、FutureTas

Java线程-Callable、Future、FutureTas

作者: 骑着乌龟去看海 | 来源:发表于2018-08-18 09:37 被阅读87次

    一. 前言

      在前面,我们已经了解了创建线程的两种方式:继承Thread或者实现Runnable接口,由于Thread也实现了Runnable接口,实际上上面两种方式是一种方式,不过这种方式有一个问题就是在任务完成之后无法获取执行的结果。而我们今天要学习的Callable就可以解决该问题。

    二. 简介

      Callable和Future是JDK1.5之后引入的,也是用于创建线程,不过在任务执行完成之后可以得到任务的执行结果,我们来简单看下这两个接口及相关的实现。

    1. Callable接口
    @FunctionalInterface
    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }
    

    Callable接口是一个泛型接口,可以返回结果,并可以抛出异常,只有一个call方法,类似于Runnable的run方法,用于执行任务,但run方法没有返回值,并且不能抛出checked Exception。而call方法是有返回值,且返回的类型就是传进来的V类型。

    而Callable 接口一般是配合ExecutorService线程池来使用的,在ExecutorService中声明了若干个submit方法:

    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    

    目前只需要知道Callable一般是和ExecutorService配合来使用的,而有关线程池方面的内容,我们后续自然会学习到。

    2. Future接口

      Future表示的是异步计算的结果,提供了查看任务是否完成,取消任务,获取结果等接口。而针对获取结果接口,只有在任务执行完成之后,才能获取,否则的话,该方法会阻塞直到任务执行完成。取消接口的话,一旦任务执行完成,就不能取消。

    If you would like to use a Future for the sake of cancellability but not provide a usable result, you can declare types of the form Future<?> and return null as a result of the underlying task. (如果出于可取消性的考虑而不想提供可用的结果,那么可以声明 Future<?> 类型并返回 null作为任务的执行结果)

    public interface Future<V> {
    
        boolean cancel(boolean mayInterruptIfRunning);
    
        boolean isCancelled();
    
        boolean isDone();
    
        V get() throws InterruptedException, ExecutionException;
    
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    Future接口提供了如下5个接口:

    1. cancel,尝试取消任务,如果任务已经执行完成,已被取消或者由于其他原因无法取消,则取消失败,返回false;如果该任务尚未start,那么调用cancel后,该任务将取消成功,返回true;如果任务已经在执行中,那么通过参数mayInterruptIfRunning来判断执行该任务的线程是否可用中断,也就是是否取消正在执行但还没有执行完成的任务,如果该值为true,则可以取消;
    2. isCancelled,表示任务是否被取消成功,如果任务在正常完成之前被取消,则返回true;
    3. isDone,表示任务是否已经完成,已经完成返回true;完成可能是由于正常执行完成、抛出异常或取消,在这些情况下,该接口都返回true;也就是,在调用cancel方法返回之后,该方法的调用将返回true;
    1. get,获取线程的执行结果,如果任务尚未执行完成,则该接口将会处于阻塞,直到线程执行完成;而该方法有可能会抛出以下几个异常:
      • CancellationException - if the computation was cancelled
      • ExecutionException - if the computation threw an exception
      • InterruptedException - if the current thread was interrupted while waiting
    2. get,有参数的get方法,该方法也是用于获取执行结果,如果在指定的时间内没有获取到结果,则会抛出超时异常:
      • TimeoutException - if the wait timed out

    也就是说,通过Future我们可以判断任务是否执行完成,取消任务,获取任务执行完成之后的结果。而Future是一个接口,我们来看以下它的实现类FutureTask。

    3. FutureTask

    我们先来看一下FutureTask的实现:

    public class FutureTask<V> implements RunnableFuture<V> {
        public FutureTask(Callable<V> callable)
        public FutureTask(Runnable runnable, V result)
    }
    

    我们可以看到,FutureTask继承自RunnableFuture,我们再来看下RunnableFuture:

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }
    

    可以看到,RunnableFuture同时继承了Runnable接口和Future接口,所以说它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

    4. CompletableFuture

      虽然Future提供了异步执行的能力,但Future对于结果的获取却不方便,只能通过阻塞或者轮询的方式得到结果,所以在JDK 8.0之后引入了CompletableFuture这个类来优化这些问题,CompletableFuture大概有50多个方法,功能十分强大,比如说 将多个异步计算合并为一个,后面的依赖于前面执行的结果等,我们先来看一下它的继承结构:

    public class CompletableFuture<T> implements Future<T>, CompletionStage<T> 
    

    CompletableFuture算是Future的增强类,因为该类在实现Future的同时又实现了CompletionStage 这个接口,而 CompletionStage 接口表示的是阶段,表示某个阶段完成之后要做什么,该接口提供了多种方法,接下来我们来学习下。

    4.1 生成 CompletableFuture

    我们可用借助CompletableFuture 的几个静态方法来生成 CompletableFuture 对象:

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                           Executor executor)
    public static CompletableFuture<Void> runAsync(Runnable runnable)
    public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                       Executor executor)
    

    以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。

    也可以通过completedFuture返回一个已经计算好的CompletedFuture的对象:

    public static <U> CompletableFuture<U> completedFuture(U value)
    
    4.2 CompletionStage 接口

      我们来看下CompletionStage 的接口。CompletionStage 的接口虽然很多,但相似的方法一般都会分为三类,一个是使用当前线程执行,一个是使用共享的线程池,另一个是使用自定义的(参数传递)Executor来执行,类似于下面这种:

    thenXXX() :使用当前线程继续执行接下来的内容
    thenXXXAsync() :使用共享线程池执行接下来的内容
    thenXXXAsync(with Executor) :使用参数中提供的Executor来执行接下来的内容
    
      1. thenApply,返回一个新的CompletionStage,当该阶段执行完成后,该阶段的结果作为参数进行下一步执行,比如将CompletableFuture的返回结果String类型的结果转换为Integer:
    completableFuture.thenApply(s -> Integer.parseInt(s));
    
      1. thenAccept,对结果的消费处理,只能消费一次,该方法没有返回值:
    completableFuture.thenAccept(s -> System.out.println("->" + s));
    
      1. thenRun,该阶段执行完成之后,执行下一个任务,比如说如果你想在CompletableFuture完成之后执行一个Runnable任务,就可以使用该操作,一旦CompletableFuture完成,就会执行指定的Runnable任务,下面是使用示例:
    completableFuture.thenRun(() -> System.out.println("then running"));
    
      1. thenCombine,对两个CompletableFuture的结果进行组合然后返回,比如将当前CompletableFuture 的结果和等待的 CompletableFuture 的结果进行一些组合然后返回:
    completableFuture.thenCombine(completableFuture1, (s1, s2) -> "Combine:" + s1 + " : " + s2);
    
      1. thenAcceptBoth,等待两个CompletableFuture结束,然后消费结果,所以该方法自然没有返回值。只有当两个任务都完成之后才执行:
    completableFuture.thenAcceptBoth(completableFuture, 
        (s1, s2) -> System.out.println("Consume:" + s1 + ":" + s2));
    
      1. runAfterBoth,相比于AcceptBoth,runAfterBoth虽然也会等待两个CompletableFuture都结束再运行,但是不再依赖两个CompletableFuture的运行结果:
    completableFuture.runAfterBoth(stringCompletableFutureV1, 
        (Runnable) () -> System.out.println("do some jobs here"));
    
      1. applyToEither,两个CompletableFuture 有竞争关系,哪个CompletableFuture先结束,就会对哪个的返回值做处理,然后第二个CompletableFuture的结果将不会再处理:
    completableFuture.applyToEither(stringCompletableFutureV1, 
        (Function<String, Integer>) Integer::parseInt);
    
      1. acceptEither,同样,对两个CompletableFuture 先结束的进行消费:
    completableFuture.acceptEither(stringCompletableFutureV1, 
        (Consumer<String>) s -> System.out.println("->" + s));
    
      1. runAfterEither,同样,对两个CompletableFuture先结束的 再执行新的任务:
    completableFuture.runAfterEither(stringCompletableFutureV1, 
        (Runnable) () -> System.out.println("other job"));
    
      1. thenCompose,表示使用CompletableFuture的结果来产生一个全新的CompletableFuture,then的含义是等待当前的CompletableFuture运行结束,然后再运行我们指定的Supplier:
    completableFuture.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + ":" + s));
    
      1. exceptionally,对当前的CompletableFuture抛出的异常进行一些处理,类似于fallback,比如说返回异常内容来代替当前抛出的异常,当然你也可以在该方法里面直接抛出当前CompletableFuture抛出的异常:
    completableFuture.exceptionally(throwable -> "fallback");
    
      1. whenComplete,当前CompletableFuture将会把运行结果作为参数传递给指定的function,然后我们可以通过参数来判断当前CompletableFuture是否成功完成还是抛出了异常,然后进行后续处理:
    completableFuture.whenComplete((s, throwable) -> System.out.println(s));
    
      1. handle,和whenComplete相比,该类型不仅可以获取到当前CompletableFuture是成功了还是抛出了异常,还可以对结果进行转换(如果当前CompletableFuture成功完成了):
    completableFuture.handle((s, throwable) -> {
        if (throwable == null) {
            return Integer.parseInt(s);
        }
        return -1;
    });
    
    4.3 CompletableFuture实现类

      CompletableFuture中,除了实现上述方法以及Future接口的实现方法之外,还有一些方法,比如除了常规的阻塞get及超时get方法之外,还有一个getNow方法:

    public T getNow(T valueIfAbsent) {
        Object r;
        return ((r = result) == null) ? valueIfAbsent : reportJoin(r);
    }
    

    result字段代表任务的执行结果,所以首先判断是否为null,为null则表示任务还没有执行结束,直接返回参数中传递的默认值;如果result不为null,表示说明任务已经执行结束,然后直接返回;如果要了解源码的执行流程,可以参考本章后面引用的链接地址。

    另外两个常用的方法,allOf,anyOf方法,在CompletionStage接口的方法中操作的CompletionStage对象,要么是一个,要么是两个,而这两个静态方法可以组合多个对象。

    1. allOf,allOf可以接收多个CompletableFuture参数,该方法会等待所有的CompletableFuture都执行完成之后组合完成,再执行接下来的操作;
    2. anyOf,anyOf和allOf类似,也是组合多个,但只要有一个CompletableFuture完成之后便执行后续的操作;

    这部分内容参考自:
    一字马胡 - 深度学习Java Future(一)
    一字马胡 - 深度学习Java Future(二)

    3. 运行示例

    接下来,我们通过几个简单的例子来看一下这几个接口的使用。

    3.1 Callable和Future

    我们先来看一下Callable和Future的使用:

    public class CallableTest {
        public static void main(String[] args) {
            try {
                callable();
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public static void callable() throws ExecutionException, InterruptedException {
            ExecutorService executor = Executors.newCachedThreadPool();
            Future<String> future = executor.submit(new Callable<String>() {
                public String call() throws InterruptedException {
                    System.out.println("child thread start --");
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println("child thread end --");
                    return "hello callable";
                }
            });
            executor.shutdown();
            String result = future.get();
            System.out.println("return value: " + result);
        }
    }
    

    其中,匿名类Callable可以替换为lambda表达式:

    public static void callable() throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        Future<String> future = executor.submit(() -> {
            System.out.println("child thread start --");
            TimeUnit.SECONDS.sleep(3);
            System.out.println("child thread end --");
            return "hello callable";
        });
        executor.shutdown();
        String result = future.get();
        System.out.println("return value: " + result);
    }
    

    output:

    child thread start --
    child thread end --
    return value:hello callable
    
    3.2 Callable和FutureTask

    至于FutureTask,我们可以通过FutureTask的构造方法来构造对象,然后通过ExecutorService的execute方法来执行:

    public static void callable() throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        FutureTask<String> future = new FutureTask<>(() -> {
            System.out.println("child thread start --");
            TimeUnit.SECONDS.sleep(3);
            System.out.println("child thread end --");
            return "hello callable";
        });
        executor.execute(future);
        executor.shutdown();
        String result = future.get();
        System.out.println("return value: " + result);
    }
    

    当然也可以借助Thread类而不是ExecutorService来实现:

    public static void callable() throws ExecutionException, InterruptedException {
        FutureTask<String> future = new FutureTask<>(() -> {
            System.out.println("child thread start --");
            TimeUnit.SECONDS.sleep(3);
            System.out.println("child thread end --");
            return "hello callable";
        });
        new Thread(future).start();
        String result = future.get();
        System.out.println("return value: " + result);
    }
    

    本文参考自:
    海子-Java并发编程:Callable、Future和FutureTask
    https://docs.oracle.com/javase/8/docs/api/
    IBM - 通过实例理解 JDK8 的 CompletableFuture
    鸟窝-Java CompletableFuture 详解
    《Java并发编程实战》

    相关文章

      网友评论

          本文标题:Java线程-Callable、Future、FutureTas

          本文链接:https://www.haomeiwen.com/subject/scgxiftx.html