1. CompletableFuture 类
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
}
1.1 CompletableFuture 工厂方法
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
允许指定自定义的 Executor 来执行异步任务
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
allOf() 主要用于并行执行多个异步任务,并等待所有任务都完成
1.2 CompletionStage 接口
public interface CompletionStage<T> {
// 异步回调
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenRun(Runnable action);
public <U> CompletionStage<U> thenCompose
(Function<? super T, ? extends CompletionStage<U>> fn);
// 组合
public <U,V> CompletionStage<V> thenCombine
(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
}
2. 异步计算任务 supplyAsync()
2.1 supplyAsync() 方法
@Test
public void demo() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
try {
// 阻塞,等待 future 完成
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
private final ExecutorService executor = new ThreadPoolExecutor(5, 20, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10));
@Test
public void joinDemo() {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> System.currentTimeMillis(), executor);
Long time = future.join();
}
2.2 示例
@Test
public void demo() throws Exception {
ArrayList<String> strings = Lists.newArrayList("111", "222");
List<CompletableFuture<String>> futures = strings.stream()
.map(str -> CompletableFuture.supplyAsync(() -> str))
.collect(Collectors.toList());
List<String> collect = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
log.info("result:" + collect);
}
2.3 回调函数 thenAccept()
@Test
public void demo() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "Hello";
});
// 注册回调函数,当异步任务完成时打印结果
future.thenAccept(result -> {
System.out.println(result);
});
}
thenAccept() 注册一个回调函数,当 CompletableFuture 完成时,该函数将接收到结果字符串并将其打印出来
2.4 模拟耗时操作
@Test
public void thenAccept() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("supplyAsync: Hello");
return "Hello";
});
log.info("start");
// 回调函数
future.thenAccept(str -> log.info(str));
log.info("end");
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
3. 异步计算任务 runAsync()
@Test
public void runAsync() {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
log.info("runAsync start...");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("runAsync end...");
});
log.info("callback1...");
// 回调函数
CompletableFuture<Void> callbackFuture = future.thenRun(() -> {
log.info("thenRun start...");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("thenRun end...");
});
log.info("callback2...");
// future.join();
callbackFuture.join();
}
打印结果:
09:34:34.881 [main] INFO com.juc.pool.Demo - callback1...
09:34:34.881 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - runAsync start...
09:34:34.885 [main] INFO com.pool.Demo - callback2...
09:34:37.888 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - runAsync end...
09:34:37.888 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - thenRun start...
09:34:39.890 [ForkJoinPool.commonPool-worker-1] INFO com.juc.pool.Demo - thenRun end...
4. allOf()
4.1 等待
@Test
public void demo() {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
log.info("future1 start");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("future1 end");
return "future1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
log.info("future2 start");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("future2 end");
return "future2";
});
CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2);
CompletableFuture<Void> thenFuture = allFuture.thenAccept(unused -> {
String join1 = future1.join();
String join2 = future2.join();
System.out.println(join1 + ", " + join2);
});
log.info("end....");
// 阻塞,等待回调执行完成
// thenFuture.join();
try {
thenFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
4.1 若改为直接 join()
CompletableFuture.allOf(future1, future2).join();
String join1 = future1.join();
String join2 = future2.join();
System.out.println(join1 + ", " + join2);
思考:
CompletableFuture.allOf().join()
与 CompletableFuture.allOf().thenAccept().join()
区别与联系
阻塞时机:
直接 join() 立即阻塞,直到所有任务完成;
而 thenAccept().join() 先注册回调,异步执行回调,最后阻塞等待回调执行完成
结果处理:
直接 join() 不涉及结果处理;
thenAccept().join() 在所有任务完成后,执行特定的回调逻辑来处理结果
5. join() vs get()
相同点:
两者都是为了获取由CompletableFuture封装的异步操作完成后产生的最终结果
区别:
异常处理,get() 抛出一个受检异常(checked exception 必须处理), join() 抛出未检查异常(unchecked exception)
public T get() throws InterruptedException, ExecutionException {
Object r;
return reportGet((r = result) == null ? waitingGet(true) : r);
}
public T join() {
Object r;
return reportJoin((r = result) == null ? waitingGet(false) : r);
}
6. 计算
@Test
public void calculateDemo() {
List<Integer> values = new ArrayList<>();
for (int i = 1; i < 1000001; i += 5000) {
values.add(i);
}
List<CompletableFuture<List<Integer>>> futures = new ArrayList<>();
for (Integer value : values) {
futures.add(CompletableFuture.supplyAsync(() -> getList(value)));
}
// List<List<Integer>> collect = futures.stream().map(CompletableFuture::join)
// .collect(Collectors.toList());
List<Integer> list = futures.stream().map(CompletableFuture::join)
.flatMap(Collection::stream)
.collect(Collectors.toList());
System.out.println("collect.size: " + list.size());
}
public static List<Integer> getList(Integer value) {
List<Integer> values = new ArrayList<>();
for (int i = 0; i < 5000; i++) {
values.add(i + value);
}
return values;
}
网友评论