构造同步和异步操作
// 定义线程池。第一个参数是线程池的大小,第2个参数是线程工厂
Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() {
// 线程工厂 ThreadFactory.newThread返回线程变量,参数Runnable定义了线程的业务逻辑
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
List<CompletableFuture<String>> priceFutures = shops.stream()
// 异步方式调用shop.getPrice()方法,第二个参数executor是线程池
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(), executor))
// thenApply 同步调用上一个流的结果,因为Quote::parse是本地方法耗时低,函数参数是个同步方法
.map(future -> future.thenApply(Quote::parse))
// thenCompose 异步调用上一个流的结果,参数函数要异步包装
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(
() -> Shop.applyDiscount(quote), executor)))
.collect(toList());
合并两个独立线程的任务
// 线程2的任务输入不依赖于线程1的任务输出,可以线程1执行的同时,线程2不用等待直接执行
CompletableFuture.supplyAsync(() -> shop.task1(xxx))
.thenCombine(
CompletableFuture.supplyAsync(
() -> service.task2(xxx)),
// 线程1和线程2的输出在第二个参数执行
(result1, result2) -> ...
);
响应事件
CompletableFuture[] futures =
// thenApply方法不会等待上一个流所有结果都出来才开始处理,而是只要有一个结果出来就开始处理
(CompletableFuture[])findPrices2(shops).map(f -> f.thenAccept(
s -> System.out.println(s + "done in " +
((System.nanoTime() - start) / 1_000_000))))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
System.out.println("end");
CompletableFuture.allOf(futures).join(); // 等待所有结果出来才往下走
CompletableFuture.anyOf(futures).join(); // 只要有一个结果出来就往下走
网友评论