Runnable,Callable,FutureTask
我们平时创建一个线程的时候,一般都会采用三种,一种是实现Runnable,一种是继承Thread类,还有就是实现Callable。Runnable与Callable实现方式的不同在于,Callable配合Future接口使用可以获取线程的运行状态,阻塞获取线程的结果。而Runnable方式却不行。
通过查看executor.submit()方法可知,Callable在执行过程中会生成一个RunnableFuture对象,而这个对象的既实现了Runnable接口,又实现了Future接口。
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } ...... protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
//FutureTask只是定义了一系列接口,真正完成相应功能的是FutureTask类 public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
而FutureTask实现了RunnableFuture接口,间接实现了Callable与Future接口。该类充当了Callable与Future之间的桥梁,该类在运行结果之后会将结果写入
outcome
属性中。当线程没有执行完成,调用future.get()
方法会将线程包装成WaitNode对象
放入到waitnode
链式结构中,并使用LockSupport.pack()
方法阻塞住,直到callable执行结果或者异常才会结束阻塞。
从这里我们可以看出,result = c.call();
其实是单线程执行的,所以可以获取到结果。public class FutureTask<V> implements RunnableFuture<V> { /** 此处是我们自定义的callable对象,FutureTask会在执行run()方法时调用它的run() */ private Callable<V> callable; /** 执行结果或异常会被保存到这个对象中,此对象没有被volatile修饰,需要使用cas赋值 */ private Object outcome; /** 运行callable任务的线程 */ private volatile Thread runner; /** 等待线程,当调用get()方法时会添加到阻塞链中 */ private volatile WaitNode waiters; public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } /** *此处可以看到executor.submit()方法实际上调用的还是callable的call()。 */ public void run() { // 把当前线程对象赋值给runner if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 单线程执行 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) // 设置结果,将结果赋值给outcome set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 如果线程中断,移除阻塞链中的所有节点。 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
所以从FutureTask的源码,我们可以通过FutureTask完成Thread获取结果,或者取消的操作
FutureTask task = new FutureTask(new Callable() { @Override public Object call() throws Exception { System.out.println("通过Callable方式执行任务"); Thread.sleep(3000); return "返回任务结果"; } }); new Thread(task).start();
Future的局限性:
①调用get()方法时,很容易就会造成长时间阻塞。
②不能并发执行或者组合执行多个任务。
例如,如果程序需要调用订单信息和会员信息,使用FutureTask来完成则是:
ExecutorService service = Executors.newFixedThreadPool(2); Future<Integer> f1 = service.submit(() -> { //查询订单信息 TimeUnit.SECONDS.sleep(50); return 1; }); Future<Integer> f2 = service.submit(() -> { //查询商品信息 TimeUnit.SECONDS.sleep(10); return 2; }); int n1 = f1.get(); int n2 = f2.get(); // 根据f1返回执行其他操作 otherMethod(f1); otherMethod(f2);
如图,在上例中,f2先结束,所以我们知道f1结束后,两个线程都会结束。但是实际环境,可能f1与f2的执行时间无法准确的预估,必须要等到两个线程都结束之后才能接着往下执行。
CompletableFuture
针对这种问题,JUC推出了CompleteFuture
CompleteFuture常用方法,方法详情可以查看Java8 CompletableFuture 用法全解
方法 | 用途 |
---|---|
supplyAsync(Supplier<U> supplier) | 创建一个有返回结果的任务 |
runAsync(Runnable runnable) | 创建一个没有返回结果的任务 |
thenApply(Function<? super T,? extends U> fn) | 添加一个新的任务到前一个任务后面,即前一个任务的回调,有返回值 |
thenAccept(Consumer<? super T> action)/thenRun(Consumer<? super T> action) | 作用与thenApply相同,但是没有返回值 |
whenComplete(BiConsumer<? super T,? super Throwable> action) | 作用与thenApply相同,但是参数里面存在异常,可以在自定义BiConsumer中处理异常,没有返回结果 |
handle(BiFunction<? super T,Throwable,? extends U> fn) | 作用与thenApply相同,但是参数里面存在异常,可以在自定义BiConsumer中处理异常,可自定义返回结果 |
thenCompose() | 用来连接两个有依赖关系的任务,结果由第二个任务返回 |
thenCombine() | 合并任务,有返回值 |
allOf() | 当所有的任务都完成后,返回一个新的 CompletableFuture |
anyOf() | 当任务一个任务完成后,返回一个新的CompletableFuture |
exceptionally | 任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中 |
使用场景:调用多个服务接口的场景或者多个任务之间存在依赖关系的场景
//主线程等待多个程序调用执行完成 CompletableFuture.allOf(CompletableFuture.supplyAsync(()->{ //查询订单信息 orderApi.call(); return 1; }), CompletableFuture.supplyAsync(()->{ //查询商品信息 goodsApi.call(); return 1; })).join();
CompletableFuture future = CompletableFuture.supplyAsync(() -> { //查询订单信息 TimeUnit.SECONDS.sleep(50); return 1; }).thenApply((n1) -> { otherMethod(n1); return n1 + 2; }); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> { //查询商品信息 TimeUnit.SECONDS.sleep(10); return 2; }).thenApply((n2) -> { otherMethod(n2); return n2 + 2; }); System.out.println(future.get()); System.out.println(future2.get());
CompletableService
如果多个线程同时执行,但是按照线程的结束顺序依次调用某个方法,则可以使用CompleteService
ExecutorService executorService = Executors.newFixedThreadPool(2); CompletionService completionService = new ExecutorCompletionService(executorService); completionService.submit(()->{ return 1; }); completionService.submit(()->{ return 2; }); for(int i=0;i<2;i++){ Future<Integer> future = completionService.take(); otherMethod(future.get()); }
CompleteService结合了ExecutorService与BlockQueue,当线程执行完后,会将结果放在BlockQueue中,接着程序可以依次从BlockQueue中取出结果
通过查看源码可以知道大概的执行流程:
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; // 创建了一个阻塞队列,用来存储线程执行结果 this.completionQueue = new LinkedBlockingQueue<Future<V>>(); } ...... public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); /** *CompleteService的submit()方法,依旧使用的是我们自己传入的executorSerivce对象,所以它的作用和RunnableFuture相同,充当了ExecutorService与BlockQueue的桥梁 */ executor.execute(new QueueingFuture(f)); return f; }
在submit()方法中,completeService将创建的FutureTask对象包装成了一个QueueingFuture对象,从上面FutureTask的源码可知,callable执行完成后会调用FutureTask的done()方法,这个方法在FutureTask中是空实现,但在QueueingFuture 则是将结果保存到queue中
private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future<V> task; } // FutureTask中线程执行结束运行的方法 private void finishCompletion() { ... done(); ... }
ForkJoinPool
ForkJoinPool是一个并行计算框架,可以将大任务分为很多个小任务,然后多线程并行执行多个小任务。它继承自AbstractExecutorService,虽然ExecutorService也可以实现多线程执行,但是它不能实现任务分割并打入队列中。
ForkJoin主要包含ForkJoinPool 并行线程池、ForkJoinTask并行任务、ForkJoinWorkerThread这三部分。其中如果创建ForkJoinPool 对象不传入数字,默认生成cpu核数的线程。
ForkJoinPool可以拆分任务的原理在于主要是因为它采用了一种工作窃取(work-stealing)的机制。所有的工作线程都可以尝试窃取提交到池子里的任务来执行,执行中产生的子任务又可以提交到池子中。
ForkJoinPool中的每个workQueue都维护了一个WorkQueue的任务队列,并且用base、top指向任务队列队尾和队头。work-stealing机制就是工作线程挨个扫描任务队列,如果队列不为空则取队尾的任务并执行。从网上摘到示例图:
ForkJoinPool工作窃取流程
ForkJoinTask抽象类提供了三种实现类,分别是:
- RecursiveTask 有返回值的并行任务
- RecursiveAction 无返回值的并行任务
下面是使用ForkJoinPool完成的计算1-10000的累加值:
class CountTask extends RecursiveTask<Integer> { private int min; private int max; CountTask(int min, int max) { this.min = min; this.max = max; } @Override protected Integer compute() { int sum = 0; if ((max - min) < 20) { for (int i = min; i <= max; i++) { sum += i; } } else { int mid = (min + max) / 2; CountTask leftTask = new CountTask(min, mid); CountTask rightTask = new CountTask(mid + 1, max); leftTask.fork(); rightTask.fork(); return leftTask.join() + rightTask.join(); } return sum; } } public static void main(String[] args) throws Exception { CountTask countTask = new CountTask(0, 10000); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(countTask); System.out.println(forkJoinTask.get()); forkJoinPool.awaitTermination(2, TimeUnit.SECONDS); forkJoinPool.shutdown(); }
我们所熟知的stream并行流就使用了ForkJoinPool,可以按照下面的程序打印出当前的线程名称
List<Integer> list = new ArrayList<>(Arrays.asList(1,2,3,4,5,6,7)); list.parallelStream().forEach(i->{ System.err.println(Thread.currentThread().getName()+"---"+i); });
网友评论