美文网首页
Future,FutureTask,CompletableFut

Future,FutureTask,CompletableFut

作者: 会跳的八爪鱼 | 来源:发表于2023-05-15 21:27 被阅读0次

Runnable,Callable,FutureTask

我们平时创建一个线程的时候,一般都会采用三种,一种是实现Runnable,一种是继承Thread类,还有就是实现Callable。Runnable与Callable实现方式的不同在于,Callable配合Future接口使用可以获取线程的运行状态,阻塞获取线程的结果。而Runnable方式却不行。
通过查看executor.submit()方法可知,Callable在执行过程中会生成一个RunnableFuture对象,而这个对象的既实现了Runnable接口,又实现了Future接口。

public <T> Future<T> submit(Callable<T> task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task);
  execute(ftask);
  return ftask;
}
......
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
   return new FutureTask<T>(callable);
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
   void run();
}
//FutureTask只是定义了一系列接口,真正完成相应功能的是FutureTask类
public interface Future<V> {
   boolean cancel(boolean mayInterruptIfRunning);
   boolean isCancelled();
   boolean isDone();
   V get() throws InterruptedException, ExecutionException;
   V get(long timeout, TimeUnit unit)
       throws InterruptedException, ExecutionException, TimeoutException;
}

而FutureTask实现了RunnableFuture接口,间接实现了Callable与Future接口。该类充当了Callable与Future之间的桥梁,该类在运行结果之后会将结果写入outcome属性中。当线程没有执行完成,调用 future.get()方法会将线程包装成WaitNode对象放入到waitnode链式结构中,并使用LockSupport.pack()方法阻塞住,直到callable执行结果或者异常才会结束阻塞。
从这里我们可以看出,result = c.call();其实是单线程执行的,所以可以获取到结果。

public class FutureTask<V> implements RunnableFuture<V> {

   /** 此处是我们自定义的callable对象,FutureTask会在执行run()方法时调用它的run() */
   private Callable<V> callable;
   /** 执行结果或异常会被保存到这个对象中,此对象没有被volatile修饰,需要使用cas赋值 */
   private Object outcome;
   /** 运行callable任务的线程 */
   private volatile Thread runner;
   /** 等待线程,当调用get()方法时会添加到阻塞链中 */
   private volatile WaitNode waiters;

   public FutureTask(Callable<V> callable) {
       if (callable == null)
           throw new NullPointerException();
       this.callable = callable;
       this.state = NEW;       // ensure visibility of callable
   }
  /**
   *此处可以看到executor.submit()方法实际上调用的还是callable的call()。
   */
   public void run() {
       // 把当前线程对象赋值给runner
       if (state != NEW ||
           !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                        null, Thread.currentThread()))
           return;
       try {
           Callable<V> c = callable;
           if (c != null && state == NEW) {
               V result;
               boolean ran;
               try {
                   // 单线程执行
                   result = c.call();
                   ran = true;
               } catch (Throwable ex) {
                   result = null;
                   ran = false;
                   setException(ex);
               }
               if (ran)
                   // 设置结果,将结果赋值给outcome
                   set(result);
            }
       } finally {
           runner = null;
           int s = state;
           if (s >= INTERRUPTING)
               handlePossibleCancellationInterrupt(s);
       }
   }

   public V get() throws InterruptedException, ExecutionException {
       int s = state;
       if (s <= COMPLETING)
           s = awaitDone(false, 0L);
       return report(s);
   }
   private int awaitDone(boolean timed, long nanos)
       throws InterruptedException {
       final long deadline = timed ? System.nanoTime() + nanos : 0L;
       WaitNode q = null;
       boolean queued = false;
       for (;;) {
           // 如果线程中断,移除阻塞链中的所有节点。
           if (Thread.interrupted()) {
               removeWaiter(q);
               throw new InterruptedException();
           }

           int s = state;
           if (s > COMPLETING) {
               if (q != null)
                   q.thread = null;
               return s;
           }
           else if (s == COMPLETING) // cannot time out yet
               Thread.yield();
           else if (q == null)
               q = new WaitNode();
           else if (!queued)
               queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                    q.next = waiters, q);
           else if (timed) {
               nanos = deadline - System.nanoTime();
               if (nanos <= 0L) {
                   removeWaiter(q);
                   return state;
               }
               LockSupport.parkNanos(this, nanos);
           }
           else
               LockSupport.park(this);
       }
   }

所以从FutureTask的源码,我们可以通过FutureTask完成Thread获取结果,或者取消的操作

FutureTask task = new FutureTask(new Callable() {
   @Override
   public Object call() throws Exception {
       System.out.println("通过Callable方式执行任务");
       Thread.sleep(3000);
       return "返回任务结果";
   }
});
new Thread(task).start();

Future的局限性:
①调用get()方法时,很容易就会造成长时间阻塞。
②不能并发执行或者组合执行多个任务。
例如,如果程序需要调用订单信息和会员信息,使用FutureTask来完成则是:

ExecutorService service = Executors.newFixedThreadPool(2);
Future<Integer> f1 = service.submit(() -> {
    //查询订单信息
    TimeUnit.SECONDS.sleep(50);
    return 1;
});
Future<Integer> f2 = service.submit(() -> {
     //查询商品信息
     TimeUnit.SECONDS.sleep(10);
     return 2;
});
int n1 = f1.get();
int n2 = f2.get();
// 根据f1返回执行其他操作
otherMethod(f1);
otherMethod(f2);

如图,在上例中,f2先结束,所以我们知道f1结束后,两个线程都会结束。但是实际环境,可能f1与f2的执行时间无法准确的预估,必须要等到两个线程都结束之后才能接着往下执行。

CompletableFuture

针对这种问题,JUC推出了CompleteFuture
CompleteFuture常用方法,方法详情可以查看Java8 CompletableFuture 用法全解

方法 用途
supplyAsync(Supplier<U> supplier) 创建一个有返回结果的任务
runAsync(Runnable runnable) 创建一个没有返回结果的任务
thenApply(Function<? super T,? extends U> fn) 添加一个新的任务到前一个任务后面,即前一个任务的回调,有返回值
thenAccept(Consumer<? super T> action)/thenRun(Consumer<? super T> action) 作用与thenApply相同,但是没有返回值
whenComplete(BiConsumer<? super T,? super Throwable> action) 作用与thenApply相同,但是参数里面存在异常,可以在自定义BiConsumer中处理异常,没有返回结果
handle(BiFunction<? super T,Throwable,? extends U> fn) 作用与thenApply相同,但是参数里面存在异常,可以在自定义BiConsumer中处理异常,可自定义返回结果
thenCompose() 用来连接两个有依赖关系的任务,结果由第二个任务返回
thenCombine() 合并任务,有返回值
allOf() 当所有的任务都完成后,返回一个新的 CompletableFuture
anyOf() 当任务一个任务完成后,返回一个新的CompletableFuture
exceptionally 任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中

使用场景:调用多个服务接口的场景或者多个任务之间存在依赖关系的场景

//主线程等待多个程序调用执行完成
CompletableFuture.allOf(CompletableFuture.supplyAsync(()->{
   //查询订单信息
   orderApi.call();
   return 1;
   }), CompletableFuture.supplyAsync(()->{
   //查询商品信息
   goodsApi.call();
   return 1;
})).join();
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
    //查询订单信息
    TimeUnit.SECONDS.sleep(50);
     return 1;
}).thenApply((n1) -> {
    otherMethod(n1);
    return n1 + 2;
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
     //查询商品信息
     TimeUnit.SECONDS.sleep(10);
     return 2;
}).thenApply((n2) -> {
    otherMethod(n2);
    return n2 + 2;
});
System.out.println(future.get());
System.out.println(future2.get());

CompletableService

如果多个线程同时执行,但是按照线程的结束顺序依次调用某个方法,则可以使用CompleteService

ExecutorService executorService = Executors.newFixedThreadPool(2);
CompletionService completionService = new ExecutorCompletionService(executorService);
completionService.submit(()->{
   return 1;
});
completionService.submit(()->{
   return 2;
});
for(int i=0;i<2;i++){
   Future<Integer> future = completionService.take();
   otherMethod(future.get());
}

CompleteService结合了ExecutorService与BlockQueue,当线程执行完后,会将结果放在BlockQueue中,接着程序可以依次从BlockQueue中取出结果
通过查看源码可以知道大概的执行流程:

public ExecutorCompletionService(Executor executor) {
   if (executor == null)
       throw new NullPointerException();
   this.executor = executor;
   this.aes = (executor instanceof AbstractExecutorService) ?
       (AbstractExecutorService) executor : null;
   // 创建了一个阻塞队列,用来存储线程执行结果
   this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
......
public Future<V> submit(Callable<V> task) {
   if (task == null) throw new NullPointerException();
   RunnableFuture<V> f = newTaskFor(task);
   /**
    *CompleteService的submit()方法,依旧使用的是我们自己传入的executorSerivce对象,所以它的作用和RunnableFuture相同,充当了ExecutorService与BlockQueue的桥梁
     */ 
   executor.execute(new QueueingFuture(f));
   return f;
}

在submit()方法中,completeService将创建的FutureTask对象包装成了一个QueueingFuture对象,从上面FutureTask的源码可知,callable执行完成后会调用FutureTask的done()方法,这个方法在FutureTask中是空实现,但在QueueingFuture 则是将结果保存到queue中

private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
   super(task, null);
   this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
// FutureTask中线程执行结束运行的方法
private void finishCompletion() {
   ...
   done(); 
   ...
}

ForkJoinPool

ForkJoinPool是一个并行计算框架,可以将大任务分为很多个小任务,然后多线程并行执行多个小任务。它继承自AbstractExecutorService,虽然ExecutorService也可以实现多线程执行,但是它不能实现任务分割并打入队列中。
ForkJoin主要包含ForkJoinPool 并行线程池、ForkJoinTask并行任务、ForkJoinWorkerThread这三部分。其中如果创建ForkJoinPool 对象不传入数字,默认生成cpu核数的线程。

ForkJoinPool可以拆分任务的原理在于主要是因为它采用了一种工作窃取(work-stealing)的机制。所有的工作线程都可以尝试窃取提交到池子里的任务来执行,执行中产生的子任务又可以提交到池子中。
ForkJoinPool中的每个workQueue都维护了一个WorkQueue的任务队列,并且用base、top指向任务队列队尾和队头。work-stealing机制就是工作线程挨个扫描任务队列,如果队列不为空则取队尾的任务并执行。从网上摘到示例图:


ForkJoinPool工作窃取流程

ForkJoinTask抽象类提供了三种实现类,分别是:

  • RecursiveTask 有返回值的并行任务
  • RecursiveAction 无返回值的并行任务

下面是使用ForkJoinPool完成的计算1-10000的累加值:

class CountTask extends RecursiveTask<Integer> {
   private int min;
   private int max;

   CountTask(int min, int max) {
       this.min = min;
       this.max = max;
   }

   @Override
   protected Integer compute() {
       int sum = 0;
       if ((max - min) < 20) {
           for (int i = min; i <= max; i++) {
               sum += i;
           }
       } else {
           int mid = (min + max) / 2;
           CountTask leftTask = new CountTask(min, mid);
           CountTask rightTask = new CountTask(mid + 1, max);
           leftTask.fork();
           rightTask.fork();
           return leftTask.join() + rightTask.join();
       }
       return sum;
   }
}
public static void main(String[] args) throws Exception {
    CountTask countTask = new CountTask(0, 10000);
    ForkJoinPool forkJoinPool = new ForkJoinPool();

    ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(countTask);
    System.out.println(forkJoinTask.get());
    forkJoinPool.awaitTermination(2, TimeUnit.SECONDS);
    forkJoinPool.shutdown();
}

我们所熟知的stream并行流就使用了ForkJoinPool,可以按照下面的程序打印出当前的线程名称

List<Integer> list = new ArrayList<>(Arrays.asList(1,2,3,4,5,6,7));
list.parallelStream().forEach(i->{
System.err.println(Thread.currentThread().getName()+"---"+i);
});

参照:future介绍
completablefuture
completablefuture详解
forkjoin

相关文章

网友评论

      本文标题:Future,FutureTask,CompletableFut

      本文链接:https://www.haomeiwen.com/subject/xfoeddtx.html