Future

作者: Travis_Wu | 来源:发表于2020-12-13 19:58 被阅读0次

    一、为什么需要 Callable?

    • Runnable 的缺陷
      1. 不能返回一个返回值
      2. 不能抛出 checked Exception
    • Callable 接口
      public interface Callable<V> {
           V call() throws Exception;
      }
      
    • Callable 和 Runnable 的不同之处
      1. 方法名,Callable 规定的执行方法是 call(),而 Runnable 规定的执行方法是 run()
      2. 返回值,Callable 的任务执行后有返回值,而 Runnable 的任务执行后是没有返回值的
      3. 抛出异常,call() 方法可抛出异常,而 run() 方法是不能抛出受检查异常的
      4. 和 Callable 配合的有一个 Future 类,通过 Future 可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果

    二、Future 的主要功能是什么

    • Future 的作用
      Future 最主要的作用是,比如当做一定运算的时候,运算过程可能比较耗时,有时会去查数据库,或是繁重的计算,比如压缩、加密等,在这种情况下,如果我们一直在原地等待方法返回,显然是不明智的,整体程序的运行效率会大大降低。我们可以把运算的过程放到子线程去执行,再通过 Future 去控制子线程执行的计算过程,最后获取到计算结果。这样一来就可以把整个程序的运行效率提高,是一种异步的思想。
    • Callable 和 Future 的关系
      Callable 接口相比于 Runnable 的一大优势是可以有返回结果,那这个返回结果怎么获取呢?就可以用 Future 类的 get 方法来获取 。因此,Future 相当于一个存储器,它存储了 Callable 的 call 方法的任务结果。除此之外,我们还可以通过 Future 的 isDone 方法来判断任务是否已经执行完毕了,还可以通过 cancel 方法取消这个任务,或限时获取任务的结果等。
    • Future 的方法和用法
      public interface Future<V> {
          boolean cancel(boolean mayInterruptIfRunning);
          boolean isCancelled();
          boolean isDone();
          V get() throws InterruptedException, ExecutionException;
          V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutEx 
                    ceptio
      }
      
      
      • get() 方法:获取结果
        get 方法最主要的作用就是获取任务执行的结果,该方法在执行时的行为取决于 Callable 任务的状态,可能会发生以下 5 种情况:
        1. 最常见的就是当执行 get 的时候,任务已经执行完毕了,可以立刻返回,获取到任务执行的结果
        2. 任务还没有结果,这是有可能的,比如我们往线程池中放一个任务,线程池中可能积压了很多任务,还没轮到我去执行的时候,就去 get 了,在这种情况下,相当于任务还没开始;还有一种情况是任务正在执行中,但是执行过程比较长,所以我去 get 的时候,它依然在执行的过程中。无论是任务还没开始或在进行中,我们去调用 get 的时候,都会把当前的线程阻塞,直到任务完成再把结果返回回来。
        3. 任务执行过程中抛出异常,一旦这样,我们再去调用 get 的时候,就会抛出 ExecutionException 异常,不管我们执行 call 方法时里面抛出的异常类型是什么,在执行 get 方法时所获得的异常都是 ExecutionException
        4. 任务被取消了,如果任务被取消,我们用 get 方法去获取结果时则会抛出 CancellationException
        5. 任务超时, get 方法有一个重载方法,那就是带延迟参数的,调用了这个带延迟参数的 get 方法之后,如果 call 方法在规定时间内正常顺利完成了任务,那么 get 会正常返回;但是如果到达了指定时间依然没有完成任务,get 方法则会抛出 TimeoutException,代表超时了



          上图中,右侧是一个线程池,线程池中有一些线程来执行任务。重点在图的左侧,可以看到有一个 submit 方法,该方法往线程池中提交了一个 Task,这个 Task 实现了 Callable 接口,当我们去给线程池提交这个任务的时候,调用 submit 方法会立刻返回一个 Future 类型的对象,这个对象目前内容是空的,其中还不包含计算结果,因为此时计算还没有完成。
          当计算一旦完成时,也就是当我们可以获取结果的时候,线程池便会把这个结果填入到之前返回的 Future 中去(也就是 f 对象),而不是在此时新建一个新的 Future。这时就可以利用 Future 的 get 方法来获取到任务的执行结果了。

        /**
         * 描述:     演示一个 Future 的使用方法
         * main 方法新建了一个 10 个线程的线程池
         * 用 submit 方法把一个任务提交进去
         * 这个任务实现了 Callable 接口
         * 它所做的内容就是先休眠三秒钟,然后返回一个随机数
         * 接下来我们就直接把 future.get 结果打印出来
         * 其结果是正常打印出一个随机数,比如 100192 等
         * 这段代码对应了我们刚才那个图示的讲解
         */
        public class OneFuture {
            public static void main(String[] args) {
                ExecutorService service 
                                   = Executors.newFixedThreadPool(10);
                Future<Integer> future 
                                   = service.submit(new CallableTask());
                try {
                    System.out.println(future.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                service.shutdown();
            }
        
            static class CallableTask implements Callable<Integer> {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(3000);
                    return new Random().nextInt();
                }
            }
        }
        
      • isDone() 方法:判断是否执行完毕
        需要注意的是,这个方法如果返回 true 则代表执行完成了;如果返回 false 则代表还没完成。但这里如果返回 true,并不代表这个任务是成功执行的,比如说任务执行到一半抛出了异常。那么在这种情况下,对于这个 isDone 方法而言,它其实也是会返回 true 的,因为对它来说,虽然有异常发生了,但是这个任务在未来也不会再被执行,它确实已经执行完毕了。所以 isDone 方法在返回 true 的时候,不代表这个任务是成功执行的,只代表它执行完毕了。
      • cancel 方法:取消任务的执行
        cancel 方法是必须传入一个参数,该参数叫作 mayInterruptIfRunning,如果传入的参数是 true,执行任务的线程就会收到一个中断的信号,正在执行的任务可能会有一些处理中断的逻辑,进而停止,这个比较好理解。如果传入的是 false 则就代表不中断正在运行的任务,也就是说,本次 cancel 不会有任何效果,同时 cancel 方法会返回 false
        1. 当任务还没有开始执行时,一旦调用 cancel,这个任务就会被正常取消,未来也不会被执行,那么 cancel 方法返回 true
        2. 如果任务已经完成,或者之前已经被取消过了,那么执行 cancel 方法则代表取消失败,返回 false
        3. 这个任务正在执行,这个时候执行 cancel 方法是不会直接取消这个任务的,而是会根据我们传入的参数做判断
      • isCancelled() 方法:判断是否被取消
        判断是否被取消,它和 cancel 方法配合使用,比较简单
    • 用 FutureTask 来创建 Future
      除了用线程池的 submit 方法会返回一个 future 对象之外,同样还可以用 FutureTask 来获取 Future 类和任务的结果
      public class FutureTask<V> implements RunnableFuture<V>{}
      可以看到,它实现了一个接口,这个接口叫作 RunnableFuture。我们再来看一下 RunnableFuture 接口的代码实现
      public interface RunnableFuture<V> extends Runnable, Future<V> { void run();}
      可以看出,它是 extends Runnable 和 Future 这两个接口的,它们的关系如下图所示:

      既然 RunnableFuture 继承了 Runnable 接口和 Future 接口,而 FutureTask 又实现了 RunnableFuture 接口,所以 FutureTask 既可以作为 Runnable 被线程执行,又可以作为 Future 得到 Callable 的返回值。
      典型用法是,把 Callable 实例当作 FutureTask 构造函数的参数,生成 FutureTask 的对象,然后把这个对象当作一个 Runnable 对象,放到线程池中或另起线程去执行,最后还可以通过 FutureTask 获取任务执行的结果。
      /**
       * 描述:     演示 FutureTask 的用法
       * 首先创建了一个实现了 Callable 接口的 Task
       * 然后把这个 Task 实例传入到 FutureTask 的构造函数中去
       * 创建了一个 FutureTask 实例
       * 把这个实例当作一个 Runnable 放到 new Thread() 中去执行
       * 最后再用 FutureTask 的 get 得到结果,并打印出来。
       * 执行结果是 4950,正是任务里 0+1+2+...+99 的结果
       */
      public class FutureTaskDemo {
          public static void main(String[] args) {
              Task task = new Task();
              FutureTask<Integer> integerFutureTask 
                                                  = new FutureTask<>(task);
              new Thread(integerFutureTask).start();
              try {
                  System.out.println("task运行结 
                                      果:"+integerFutureTask.get());
              } catch (InterruptedException e) {
                  e.printStackTrace();
              } catch (ExecutionException e) {
                  e.printStackTrace();
              }
          }
      }
      
      class Task implements Callable<Integer> {
          @Override
          public Integer call() throws Exception {
              System.out.println("子线程正在计算");
              int sum = 0;
              for (int i = 0; i < 100; i++) {
                  sum += i;
              }
              return sum;
          }
      }
      

    三、Future 的注意点

    • 当 for 循环批量获取 Future 的结果时容易 block,get 方法调用时应使用 timeout 限制
      对于 Future 而言,第一个注意点就是,当 for 循环批量获取 Future 的结果时容易 block,在调用 get 方法时,应该使用 timeout 来限制。
      假设一共有四个任务需要执行,我们都把它放到线程池中,然后它获取的时候是按照从 1 到 4 的顺序,也就是执行 get() 方法来获取的,代码如下所示:
      /**
       * 用一个 list 来保存 4 个 Future
       * 前两个 Future 所对应的任务是慢任务
       * 后两个 Future 对应的任务是快任务
       * 在提交完这 4 个任务之后
       * 用 for 循环对它们依次执行 get 方法,来获取它们的执行结果,
       * 然后再把这个结果打印出来
       * 执行结果如下:
       * 速度慢的任务
       * 速度慢的任务
       * 速度快的任务
       * 速度快的任务
       */
      public class FutureDemo {
          public static void main(String[] args) {
              //创建线程池
              ExecutorService service 
                                     = Executors.newFixedThreadPool(10);
              //提交任务,并用 Future 接收返回结果
              ArrayList<Future> allFutures = new ArrayList<>();
      
              for (int i = 0; i < 4; i++) {
                  Future<String> future;
                  if (i == 0 || i == 1) {
                      future = service.submit(new SlowTask());
                  } else {
                      future = service.submit(new FastTask());
                  }
                  allFutures.add(future);
              }
      
              for (int i = 0; i < 4; i++) {
                  Future<String> future = allFutures.get(i);
                  try {
                      String result = future.get();
                      System.out.println(result);
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  } catch (ExecutionException e) {
                      e.printStackTrace();
                  }
              }
              service.shutdown();
          }
      
          static class SlowTask implements Callable<String> {
              @Override
              public String call() throws Exception {
                  Thread.sleep(5000);
                  return "速度慢的任务";
              }
          }
      
          static class FastTask implements Callable<String> {
              @Override
              public String call() throws Exception {
                  return "速度快的任务";
              }
          }
      }
      

      可以看到,这个执行结果是打印 4 行语句,前面两个是速度慢的任务,后面两个是速度快的任务。虽然结果是正确的,但实际上在执行的时候会先等待 5 秒,然后再很快打印出这 4 行语句。



      这里有一个问题,即第三个的任务量是比较小的,它可以很快返回结果,紧接着第四个任务也会返回结果。但是由于前两个任务速度很慢,所以我们在利用 get 方法执行时,会卡在第一个任务上。也就是说,虽然此时第三个和第四个任务很早就得到结果了,但我们在此时使用这种 for 循环的方式去获取结果,依然无法及时获取到第三个和第四个任务的结果。直到 5 秒后,第一个任务出结果了,我们才能获取到,紧接着也可以获取到第二个任务的结果,然后才轮到第三、第四个任务。
      假设由于网络原因,第一个任务可能长达 1 分钟都没办法返回结果,那么这个时候,我们的主线程会一直卡着,影响了程序的运行效率。
      此时我们就可以用 Future 的带超时参数的 get(long timeout, TimeUnit unit) 方法来解决这个问题。这个方法的作用是,如果在限定的时间内没能返回结果的话,那么便会抛出一个 TimeoutException 异常,随后就可以把这个异常捕获住,或者是再往上抛出去,这样就不会一直卡着了。

    • Future 的生命周期不能后退
      Future 的生命周期不能后退,一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来,也不能让一个已经完成计算的 Future 再次重新执行任务
    • Future 产生新的线程了吗
      有一种说法是,除了继承 Thread 类和实现 Runnable 接口之外,还有第三种产生新线程的方式,那就是采用 Callable 和 Future,这叫作有返回值的创建线程的方式。这种说法是不正确的。
      其实 Callable 和 Future 本身并不能产生新的线程,它们需要借助其他的比如 Thread 类或者线程池才能执行任务。例如,在把 Callable 提交到线程池后,真正执行 Callable 的其实还是线程池中的线程,而线程池中的线程是由 ThreadFactory 产生的,这里产生的新线程与 Callable、Future 都没有关系,所以 Future 并没有产生新的线程。

    四、如何利用 CompletableFuture 实现“旅游平台”问题

    • 旅游平台问题
      什么是旅游平台问题呢?如果想要搭建一个旅游平台,经常会有这样的需求,那就是用户想同时获取多家航空公司的航班信息。比如,从北京到上海的机票钱是多少?有很多家航空公司都有这样的航班信息,所以应该把所有航空公司的航班、票价等信息都获取到,然后再聚合。由于每个航空公司都有自己的服务器,所以分别去请求它们的服务器就可以了,比如请求国航、海航、东航等,如下图所示:
    • 串行
      一种比较原始的方式是用串行的方式来解决这个问题,
      当每一个请求发出去之后,等它响应回来以后,我们才能去请求下一个网站,这就是串行的方式。
      这样做的效率非常低下,比如航空公司比较多,假设每个航空公司都需要 1 秒钟的话,那么用户肯定等不及,所以这种方式是不可取的。
    • 并行
      我们可以并行地去获取这些机票信息,然后再把机票信息给聚合起来,这样的话,效率会成倍的提高。
    • 有超时的并行获取
      并行虽然提高了效率,但也有一个缺点,那就是会“一直等到所有请求都返回”。如果有一个网站特别慢,那么你不应该被那个网站拖累,比如说某个网站打开需要二十秒,那肯定是等不了这么长时间的,所以我们需要一个功能,那就是有超时的获取。
      同样也在并行的去请求各个网站信息。但是我们规定了一个时间的超时,比如 3 秒钟,那么到 3 秒钟的时候如果都已经返回了那当然最好,把它们收集起来即可;但是如果还有些网站没能及时返回,我们就把这些请求给忽略掉,这样一来用户体验就比较好了,它最多只需要等固定的 3 秒钟就能拿到信息,虽然拿到的可能不是最全的,但是总比一直等更好。
      想要实现这个目标有几种实现方案,我们一个一个的来看看。
      1. 线程池的实现
        public class ThreadPoolDemo {
            ExecutorService threadPool 
                               = Executors.newFixedThreadPool(3);
        
            public static void main(String[] args) 
                              throws InterruptedException {
                ThreadPoolDemo threadPoolDemo 
                               = new ThreadPoolDemo();
                System.out.println(threadPoolDemo.getPrices());
            }
        
            private Set<Integer> getPrices() 
                              throws InterruptedException {
                Set<Integer> prices 
                    = Collections.synchronizedSet(new HashSet<Integer>());
        
                threadPool.submit(new Task(123, prices));
                threadPool.submit(new Task(456, prices));
                threadPool.submit(new Task(789, prices));
                Thread.sleep(3000);
                return prices;
            }
        
            private class Task implements Runnable {
                Integer productId;
                Set<Integer> prices;
                public Task(Integer productId, Set<Integer> prices) {
                    this.productId = productId;
                    this.prices = prices;
                }
        
                @Override
                public void run() {
                    int price=0;
                    try {
                        Thread.sleep((long) (Math.random() * 4000));
                        price= (int) (Math.random() * 4000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    prices.add(price);
                }
            }
        }
        
        • 创建一个固定 3 线程的线程池
        • 新建了一个线程安全的 Set,它是用来存储各个价格信息的,把它命名为 Prices,然后往线程池中去放任务。
        • 在该任务里面,我们用一个随机的时间去模拟各个航空网站的响应时间,然后再去返回一个随机的价格来表示票价,最后把这个票价放到 Set 中。
        • 为了实现超时等待的功能,在这里调用了 Thread 的 sleep 方法来休眠 3 秒钟,这样做的话,它就会在这里等待 3 秒,之后直接返回 prices。
        • 如果前面响应速度快的话,prices 里面最多会有三个值,但是如果每一个响应时间都很慢,那么可能 prices 里面一个值都没有。不论你有多少个,它都会在休眠结束之后,也就是执行完 Thread 的 sleep 之后直接把 prices 返回,并且最终在 main 函数中把这个结果给打印出来。
      2. CountDownLatch
        在这里会有一个优化的空间,比如说网络特别好时,每个航空公司响应速度都特别快,你根本不需要等三秒,有的航空公司可能几百毫秒就返回了,那么我们也不应该让用户等 3 秒。所以需要进行一下这样的改进,看下面这段代码:
        public class CountDownLatchDemo {
            ExecutorService threadPool 
                            = Executors.newFixedThreadPool(3);
        
            public static void main(String[] args) 
                             throws InterruptedException {
        
                CountDownLatchDemo countDownLatchDemo
                            = new CountDownLatchDemo();
        
                System.out.println(countDownLatchDemo.getPrices());
            }
        
            private Set<Integer> getPrices() 
                             throws InterruptedException {
        
                Set<Integer> prices 
                    = Collections.synchronizedSet(new HashSet<Integer>());
        
                CountDownLatch countDownLatch 
                    = new CountDownLatch(3);
        
                threadPool.submit(
                           new Task(123, prices, countDownLatch));
        
                threadPool.submit(
                           new Task(456, prices, countDownLatch));
        
                threadPool.submit(
                           new Task(789, prices, countDownLatch));
        
                countDownLatch.await(3, TimeUnit.SECONDS);
                return prices;
            }
        
            private class Task implements Runnable {
                Integer productId;
                Set<Integer> prices;
                CountDownLatch countDownLatch;
                public Task(Integer productId, Set<Integer> prices,
                        CountDownLatch countDownLatch) {
                    this.productId = productId;
                    this.prices = prices;
                    this.countDownLatch = countDownLatch;
                }
        
                @Override
                public void run() {
                    int price = 0;
                    try {
                        Thread.sleep((long) (Math.random() * 4000));
                        price = (int) (Math.random() * 4000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    prices.add(price);
                    countDownLatch.countDown();
                }
            }
        }
        
        • 这段代码使用 CountDownLatch 实现了这个功能,整体思路和之前是一致的
        • 我们新增了一个 CountDownLatch,并且把它传入到了 Task 中。在
        • Task 中,获取完机票信息并且把它添加到 Set 之后,会调用 countDown 方法,相当于把计数减 1。
        • 在执行 countDownLatch.await(3,TimeUnit.SECONDS) 这个函数进行等待时,如果三个任务都非常快速地执行完毕了,那么三个线程都已经执行了 countDown 方法,那么这个 await 方法就会立刻返回,不需要傻等到 3 秒钟。
        • 如果有一个请求特别慢,相当于有一个线程没有执行 countDown 方法,来不及在 3 秒钟之内执行完毕,那么这个带超时参数的 await 方法也会在 3 秒钟到了以后,及时地放弃这一次等待,于是就把 prices 给返回了。
        • 所以这样一来,我们就利用 CountDownLatch 实现了这个需求,也就是说我们最多等 3 秒钟,但如果在 3 秒之内全都返回了,我们也可以快速地去返回,不会傻等,提高了效率。
      3. CompletableFuture
        我们再来看一下用 CompletableFuture 来实现这个功能的用法,代码如下所示:
        public class CompletableFutureDemo {
        
            public static void main(String[] args) throws Exception {
        
                CompletableFutureDemo completableFutureDemo 
                                         = new CompletableFutureDemo();
        
                System.out.println(completableFutureDemo.getPrices());
            }
        
            private Set<Integer> getPrices() {
        
                Set<Integer> prices 
                    = Collections.synchronizedSet(new HashSet<Integer>());
        
                CompletableFuture<Void> task1 
                    = CompletableFuture.runAsync(new Task(123, prices));
        
                CompletableFuture<Void> task2 
                    = CompletableFuture.runAsync(new Task(456, prices));
        
                CompletableFuture<Void> task3 
                    = CompletableFuture.runAsync(new Task(789, prices));
        
                CompletableFuture<Void> allTasks 
                    = CompletableFuture.allOf(task1, task2, task3);
        
                try {
                    allTasks.get(3, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                } catch (ExecutionException e) {
                } catch (TimeoutException e) {
                }
                return prices;
            }
        
            private class Task implements Runnable {
                Integer productId;
                Set<Integer> prices;
                public Task(Integer productId, Set<Integer> prices) {
                    this.productId = productId;
                    this.prices = prices;
                }
        
                @Override
                public void run() {
                    int price = 0;
                    try {
                        Thread.sleep((long) (Math.random() * 4000));
                        price = (int) (Math.random() * 4000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    prices.add(price);
                }
            }
        }
        
        • 这里我们不再使用线程池了,我们看到 getPrices 方法,在这个方法中,我们用了 CompletableFuture 的 runAsync 方法,这个方法会异步的去执行任务。
        • 我们有三个任务,并且在执行这个代码之后会分别返回一个 CompletableFuture 对象: task 1、task 2、task 3
        • 然后执行 CompletableFuture 的 allOf 方法,并且把 task 1、task 2、task 3 传入。
        • 这个方法的作用是把多个 task 汇总,然后可以根据需要去获取到传入参数的这些 task 的返回结果,或者等待它们都执行完毕等。我们就把这个返回值叫作 allTasks,并且在下面调用它的带超时时间的 get 方法,同时传入 3 秒钟的超时参数。
        • 这样一来它的效果就是,如果在 3 秒钟之内这 3 个任务都可以顺利返回,也就是这个任务包括的那三个任务,每一个都执行完毕的话,则这个 get 方法就可以及时正常返回,并且往下执行,相当于执行到 return prices。
        • 在下面的这个 Task 的 run 方法中,该方法如果执行完毕的话,对于 CompletableFuture 而言就意味着这个任务结束,它是以这个作为标记来判断任务是不是执行完毕的。
        • 但是如果有某一个任务没能来得及在 3 秒钟之内返回,那么这个带超时参数的 get 方法便会抛出 TimeoutException 异常,同样会被我们给 catch 住。
        • 这样一来它就实现了这样的效果:会尝试等待所有的任务完成,但是最多只会等 3 秒钟,在此之间,如及时完成则及时返回

    相关文章

      网友评论

        本文标题:Future

        本文链接:https://www.haomeiwen.com/subject/deepgktx.html