美文网首页
Future,FutureTask,CompletableFut

Future,FutureTask,CompletableFut

作者: 会跳的八爪鱼 | 来源:发表于2023-05-15 21:27 被阅读0次

    Runnable,Callable,FutureTask

    我们平时创建一个线程的时候,一般都会采用三种,一种是实现Runnable,一种是继承Thread类,还有就是实现Callable。Runnable与Callable实现方式的不同在于,Callable配合Future接口使用可以获取线程的运行状态,阻塞获取线程的结果。而Runnable方式却不行。
    通过查看executor.submit()方法可知,Callable在执行过程中会生成一个RunnableFuture对象,而这个对象的既实现了Runnable接口,又实现了Future接口。

    public <T> Future<T> submit(Callable<T> task) {
      if (task == null) throw new NullPointerException();
      RunnableFuture<T> ftask = newTaskFor(task);
      execute(ftask);
      return ftask;
    }
    ......
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
       return new FutureTask<T>(callable);
    }
    
    public interface RunnableFuture<V> extends Runnable, Future<V> {
       void run();
    }
    
    //FutureTask只是定义了一系列接口,真正完成相应功能的是FutureTask类
    public interface Future<V> {
       boolean cancel(boolean mayInterruptIfRunning);
       boolean isCancelled();
       boolean isDone();
       V get() throws InterruptedException, ExecutionException;
       V get(long timeout, TimeUnit unit)
           throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    而FutureTask实现了RunnableFuture接口,间接实现了Callable与Future接口。该类充当了Callable与Future之间的桥梁,该类在运行结果之后会将结果写入outcome属性中。当线程没有执行完成,调用 future.get()方法会将线程包装成WaitNode对象放入到waitnode链式结构中,并使用LockSupport.pack()方法阻塞住,直到callable执行结果或者异常才会结束阻塞。
    从这里我们可以看出,result = c.call();其实是单线程执行的,所以可以获取到结果。

    public class FutureTask<V> implements RunnableFuture<V> {
    
       /** 此处是我们自定义的callable对象,FutureTask会在执行run()方法时调用它的run() */
       private Callable<V> callable;
       /** 执行结果或异常会被保存到这个对象中,此对象没有被volatile修饰,需要使用cas赋值 */
       private Object outcome;
       /** 运行callable任务的线程 */
       private volatile Thread runner;
       /** 等待线程,当调用get()方法时会添加到阻塞链中 */
       private volatile WaitNode waiters;
    
       public FutureTask(Callable<V> callable) {
           if (callable == null)
               throw new NullPointerException();
           this.callable = callable;
           this.state = NEW;       // ensure visibility of callable
       }
      /**
       *此处可以看到executor.submit()方法实际上调用的还是callable的call()。
       */
       public void run() {
           // 把当前线程对象赋值给runner
           if (state != NEW ||
               !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                            null, Thread.currentThread()))
               return;
           try {
               Callable<V> c = callable;
               if (c != null && state == NEW) {
                   V result;
                   boolean ran;
                   try {
                       // 单线程执行
                       result = c.call();
                       ran = true;
                   } catch (Throwable ex) {
                       result = null;
                       ran = false;
                       setException(ex);
                   }
                   if (ran)
                       // 设置结果,将结果赋值给outcome
                       set(result);
                }
           } finally {
               runner = null;
               int s = state;
               if (s >= INTERRUPTING)
                   handlePossibleCancellationInterrupt(s);
           }
       }
    
       public V get() throws InterruptedException, ExecutionException {
           int s = state;
           if (s <= COMPLETING)
               s = awaitDone(false, 0L);
           return report(s);
       }
       private int awaitDone(boolean timed, long nanos)
           throws InterruptedException {
           final long deadline = timed ? System.nanoTime() + nanos : 0L;
           WaitNode q = null;
           boolean queued = false;
           for (;;) {
               // 如果线程中断,移除阻塞链中的所有节点。
               if (Thread.interrupted()) {
                   removeWaiter(q);
                   throw new InterruptedException();
               }
    
               int s = state;
               if (s > COMPLETING) {
                   if (q != null)
                       q.thread = null;
                   return s;
               }
               else if (s == COMPLETING) // cannot time out yet
                   Thread.yield();
               else if (q == null)
                   q = new WaitNode();
               else if (!queued)
                   queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                        q.next = waiters, q);
               else if (timed) {
                   nanos = deadline - System.nanoTime();
                   if (nanos <= 0L) {
                       removeWaiter(q);
                       return state;
                   }
                   LockSupport.parkNanos(this, nanos);
               }
               else
                   LockSupport.park(this);
           }
       }
    

    所以从FutureTask的源码,我们可以通过FutureTask完成Thread获取结果,或者取消的操作

    FutureTask task = new FutureTask(new Callable() {
       @Override
       public Object call() throws Exception {
           System.out.println("通过Callable方式执行任务");
           Thread.sleep(3000);
           return "返回任务结果";
       }
    });
    new Thread(task).start();
    

    Future的局限性:
    ①调用get()方法时,很容易就会造成长时间阻塞。
    ②不能并发执行或者组合执行多个任务。
    例如,如果程序需要调用订单信息和会员信息,使用FutureTask来完成则是:

    ExecutorService service = Executors.newFixedThreadPool(2);
    Future<Integer> f1 = service.submit(() -> {
        //查询订单信息
        TimeUnit.SECONDS.sleep(50);
        return 1;
    });
    Future<Integer> f2 = service.submit(() -> {
         //查询商品信息
         TimeUnit.SECONDS.sleep(10);
         return 2;
    });
    int n1 = f1.get();
    int n2 = f2.get();
    // 根据f1返回执行其他操作
    otherMethod(f1);
    otherMethod(f2);
    

    如图,在上例中,f2先结束,所以我们知道f1结束后,两个线程都会结束。但是实际环境,可能f1与f2的执行时间无法准确的预估,必须要等到两个线程都结束之后才能接着往下执行。

    CompletableFuture

    针对这种问题,JUC推出了CompleteFuture
    CompleteFuture常用方法,方法详情可以查看Java8 CompletableFuture 用法全解

    方法 用途
    supplyAsync(Supplier<U> supplier) 创建一个有返回结果的任务
    runAsync(Runnable runnable) 创建一个没有返回结果的任务
    thenApply(Function<? super T,? extends U> fn) 添加一个新的任务到前一个任务后面,即前一个任务的回调,有返回值
    thenAccept(Consumer<? super T> action)/thenRun(Consumer<? super T> action) 作用与thenApply相同,但是没有返回值
    whenComplete(BiConsumer<? super T,? super Throwable> action) 作用与thenApply相同,但是参数里面存在异常,可以在自定义BiConsumer中处理异常,没有返回结果
    handle(BiFunction<? super T,Throwable,? extends U> fn) 作用与thenApply相同,但是参数里面存在异常,可以在自定义BiConsumer中处理异常,可自定义返回结果
    thenCompose() 用来连接两个有依赖关系的任务,结果由第二个任务返回
    thenCombine() 合并任务,有返回值
    allOf() 当所有的任务都完成后,返回一个新的 CompletableFuture
    anyOf() 当任务一个任务完成后,返回一个新的CompletableFuture
    exceptionally 任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中

    使用场景:调用多个服务接口的场景或者多个任务之间存在依赖关系的场景

    //主线程等待多个程序调用执行完成
    CompletableFuture.allOf(CompletableFuture.supplyAsync(()->{
       //查询订单信息
       orderApi.call();
       return 1;
       }), CompletableFuture.supplyAsync(()->{
       //查询商品信息
       goodsApi.call();
       return 1;
    })).join();
    
    CompletableFuture future = CompletableFuture.supplyAsync(() -> {
        //查询订单信息
        TimeUnit.SECONDS.sleep(50);
         return 1;
    }).thenApply((n1) -> {
        otherMethod(n1);
        return n1 + 2;
    });
    CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
         //查询商品信息
         TimeUnit.SECONDS.sleep(10);
         return 2;
    }).thenApply((n2) -> {
        otherMethod(n2);
        return n2 + 2;
    });
    System.out.println(future.get());
    System.out.println(future2.get());
    

    CompletableService

    如果多个线程同时执行,但是按照线程的结束顺序依次调用某个方法,则可以使用CompleteService

    ExecutorService executorService = Executors.newFixedThreadPool(2);
    CompletionService completionService = new ExecutorCompletionService(executorService);
    completionService.submit(()->{
       return 1;
    });
    completionService.submit(()->{
       return 2;
    });
    for(int i=0;i<2;i++){
       Future<Integer> future = completionService.take();
       otherMethod(future.get());
    }
    

    CompleteService结合了ExecutorService与BlockQueue,当线程执行完后,会将结果放在BlockQueue中,接着程序可以依次从BlockQueue中取出结果
    通过查看源码可以知道大概的执行流程:

    public ExecutorCompletionService(Executor executor) {
       if (executor == null)
           throw new NullPointerException();
       this.executor = executor;
       this.aes = (executor instanceof AbstractExecutorService) ?
           (AbstractExecutorService) executor : null;
       // 创建了一个阻塞队列,用来存储线程执行结果
       this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
    ......
    public Future<V> submit(Callable<V> task) {
       if (task == null) throw new NullPointerException();
       RunnableFuture<V> f = newTaskFor(task);
       /**
        *CompleteService的submit()方法,依旧使用的是我们自己传入的executorSerivce对象,所以它的作用和RunnableFuture相同,充当了ExecutorService与BlockQueue的桥梁
         */ 
       executor.execute(new QueueingFuture(f));
       return f;
    }
    

    在submit()方法中,completeService将创建的FutureTask对象包装成了一个QueueingFuture对象,从上面FutureTask的源码可知,callable执行完成后会调用FutureTask的done()方法,这个方法在FutureTask中是空实现,但在QueueingFuture 则是将结果保存到queue中

    private class QueueingFuture extends FutureTask<Void> {
    QueueingFuture(RunnableFuture<V> task) {
       super(task, null);
       this.task = task;
    }
    protected void done() { completionQueue.add(task); }
    private final Future<V> task;
    }
    // FutureTask中线程执行结束运行的方法
    private void finishCompletion() {
       ...
       done(); 
       ...
    }
    

    ForkJoinPool

    ForkJoinPool是一个并行计算框架,可以将大任务分为很多个小任务,然后多线程并行执行多个小任务。它继承自AbstractExecutorService,虽然ExecutorService也可以实现多线程执行,但是它不能实现任务分割并打入队列中。
    ForkJoin主要包含ForkJoinPool 并行线程池、ForkJoinTask并行任务、ForkJoinWorkerThread这三部分。其中如果创建ForkJoinPool 对象不传入数字,默认生成cpu核数的线程。

    ForkJoinPool可以拆分任务的原理在于主要是因为它采用了一种工作窃取(work-stealing)的机制。所有的工作线程都可以尝试窃取提交到池子里的任务来执行,执行中产生的子任务又可以提交到池子中。
    ForkJoinPool中的每个workQueue都维护了一个WorkQueue的任务队列,并且用base、top指向任务队列队尾和队头。work-stealing机制就是工作线程挨个扫描任务队列,如果队列不为空则取队尾的任务并执行。从网上摘到示例图:


    ForkJoinPool工作窃取流程

    ForkJoinTask抽象类提供了三种实现类,分别是:

    • RecursiveTask 有返回值的并行任务
    • RecursiveAction 无返回值的并行任务

    下面是使用ForkJoinPool完成的计算1-10000的累加值:

    class CountTask extends RecursiveTask<Integer> {
       private int min;
       private int max;
    
       CountTask(int min, int max) {
           this.min = min;
           this.max = max;
       }
    
       @Override
       protected Integer compute() {
           int sum = 0;
           if ((max - min) < 20) {
               for (int i = min; i <= max; i++) {
                   sum += i;
               }
           } else {
               int mid = (min + max) / 2;
               CountTask leftTask = new CountTask(min, mid);
               CountTask rightTask = new CountTask(mid + 1, max);
               leftTask.fork();
               rightTask.fork();
               return leftTask.join() + rightTask.join();
           }
           return sum;
       }
    }
    public static void main(String[] args) throws Exception {
        CountTask countTask = new CountTask(0, 10000);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
    
        ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(countTask);
        System.out.println(forkJoinTask.get());
        forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
        forkJoinPool.shutdown();
    }
    
    

    我们所熟知的stream并行流就使用了ForkJoinPool,可以按照下面的程序打印出当前的线程名称

    List<Integer> list = new ArrayList<>(Arrays.asList(1,2,3,4,5,6,7));
    list.parallelStream().forEach(i->{
    System.err.println(Thread.currentThread().getName()+"---"+i);
    });
    

    参照:future介绍
    completablefuture
    completablefuture详解
    forkjoin

    相关文章

      网友评论

          本文标题:Future,FutureTask,CompletableFut

          本文链接:https://www.haomeiwen.com/subject/xfoeddtx.html