CompletableFuture常见用法,CompletableFuture使用示例,CompletableFuture等待所有异步任务返回,包括常见的用法,结合自定义线程池使用。结合实际业务Demo代码运行效果如下,可直接运行查看效果,具体请参考如下代码
1.创建CompletableFuture对象
方法名 功能描述
completedFuture(U value) 返回一个已经计算好的CompletableFuture
runAsync(Runnable runnable) 使用ForkJoinPool.commonPool()作为线程池执行任务,没有返回值
runAsync(Runnable runnable, Executor executor) 使用指定的线程池执行任务,没有返回值
supplyAsync(Supplier<U> supplier) 使用ForkJoinPool.commonPool()作为线程池执行任务,有返回值
supplyAsync(Supplier<U> supplier, Executor executor) 使用指定的线程池执行任务,有返回值
各种使用示例如下(可直接复制运行):
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.创建CompletableFuture对象
// 方法名 功能描述
// completedFuture(U value) 返回一个已经计算好的CompletableFuture
// runAsync(Runnable runnable) 使用ForkJoinPool.commonPool()作为线程池执行任务,没有返回值
// runAsync(Runnable runnable, Executor executor) 使用指定的线程池执行任务,没有返回值
// supplyAsync(Supplier<U> supplier) 使用ForkJoinPool.commonPool()作为线程池执行任务,有返回值
// supplyAsync(Supplier<U> supplier, Executor executor) 使用指定的线程池执行任务,有返回值
CompletableFuture<Integer> intFuture = CompletableFuture.completedFuture(100);
// 100
System.out.println(intFuture.get());
CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> System.out.println("hello"));
// null
System.out.println(voidFuture.get());
CompletableFuture<String> stringFuture = CompletableFuture.supplyAsync(() -> "hello");
// hello
System.out.println("stringFuture="+stringFuture.get());
// 2.计算结果完成时.可以执行的方法
// 方法名
// whenComplete(BiConsumer<? super T,? super Throwable> action)
// whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
// whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "hello";
}).whenComplete((v, e) -> {
// hello
System.out.println(v);
});
// hello
System.out.println(future.get());
// 3.转换,消费,执行
// 方法名 功能描述
// thenApply 获取上一个任务的返回,并返回当前任务的值
// thenAccept 获取上一个任务的返回,单纯消费,没有返回值
// thenRun 上一个任务执行完成后,开始执行thenRun中的任务
CompletableFuture.supplyAsync(() -> {
return "hello ";
}).thenAccept(str -> {
// hello world
System.out.println(str + "world");
}).thenRun(() -> {
// task finish
System.out.println("task finish");
});
// 4.组合(两个任务都完成)
// 方法名 描述
// thenCombine 组合两个future,获取两个future的返回结果,并返回当前任务的返回值
// thenAcceptBoth 组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值
// runAfterBoth 组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> {
return "欢迎关注 ";
}).thenApply(t -> {
return t + "微信公众号 ";
}).thenCombine(CompletableFuture.completedFuture("Java"), (t, u) -> {
return t + u;
}).whenComplete((t, e) -> {
// 欢迎关注 微信公众号 Java
System.out.println(t);
});
// 5.组合(只需要一个任务完成)
// 方法名 描述
// applyToEither 两个任务有一个执行完成,获取它的返回值,处理任务并返回当前任务的返回值
// acceptEither 两个任务有一个执行完成,获取它的返回值,处理任务,没有返回值
// runAfterEither 两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "欢迎关注微信公众号";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Java";
});
CompletableFuture<String> future3 = future1.applyToEither(future2, str -> str);
// 欢迎关注微信公众号 Java 随机输出
System.out.println(future3.get());
// 6.多任务组合
// 方法名 描述
// allOf 当所有的CompletableFuture完成后执行计算
// anyOf 任意一个CompletableFuture完成后执行计算
// allOf的使用
CompletableFuture<String> future1A = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "欢迎关注";
});
CompletableFuture<String> future2A = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "微信公众号";
});
CompletableFuture<String> future3A = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "Java识堂";
});
// 欢迎关注 微信公众号 Java
CompletableFuture.allOf(future1A, future2A, future3A)
.thenApply(v -> {
List<Object> collect = Stream.of(future1, future2, future3A)
.map(CompletableFuture::join)
.collect(Collectors.toList());
return collect;
}
)
.thenAccept(System.out::print);
// anyOf的使用
CompletableFuture<String> future1C = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "欢迎关注";
});
CompletableFuture<String> future2C = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "微信公众号";
});
CompletableFuture<String> future3C = CompletableFuture.supplyAsync(() -> {
sleepRandom();
return "Java";
});
CompletableFuture<Object> resultFuture = CompletableFuture.anyOf(future1C, future2C, future3C);
// 欢迎关注 微信公众号 Java 随机输出
System.out.println(resultFuture.get());
// 7.异常处理
// exceptionally 捕获异常,进行处理
CompletableFuture<Integer> futureC = CompletableFuture.supplyAsync(() -> {
return 100 / 0;
}).thenApply(num -> {
return num + 10;
}).exceptionally(throwable -> {
return 0;
});
// 0
System.out.println(future.get());
// 当然有一些接口能捕获异常
CompletableFuture futureAAA = CompletableFuture.supplyAsync(() -> {
String str = null;
return str.length();
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("正常结果为" + v);
} else {
// 发生异常了java.util.concurrent.CompletionException: java.lang.NullPointerException
System.out.println("发生异常了" + e.toString());
}
});
// 8.集合业务使用示例,假设stringList为业务执行集合
ExecutorService executor = new ThreadPoolExecutor(10, 16, 10, TimeUnit.MICROSECONDS, new ArrayBlockingQueue<>(1000));
List<String> stringList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
stringList.add("a" + i);
}
CompletableFuture<Void> all = null;
CompletableFuture<Void> all1 = null;
// 开始我们的业务处理
for (String personName : stringList) {
CompletableFuture<Object> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {
return null;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("正常结果为" + v);
} else {
// 发生异常了java.util.concurrent.CompletionException: java.lang.NullPointerException
System.out.println("发生异常了" + e.toString());
}
});
all = CompletableFuture.allOf(stringCompletableFuture);
CompletableFuture<String> stringCompletableFuture1 = CompletableFuture.supplyAsync(() -> {
// 模拟业务逻辑,say hello world
System.out.println(personName + ": Hello World!");
return "task finished!";
});
all1 = CompletableFuture.allOf(stringCompletableFuture1);
}
// 开始等待所有任务执行完成
all.join();
all1.join();
// 使用JDK 1.8的特性,stream()和Lambda表达式: (参数) -> {表达式}
long start = System.currentTimeMillis();
if (CollectionUtils.isEmpty(stringList)) {
return;
}
final CompletableFuture[] completableFutures = stringList.stream().
map(t -> CompletableFuture
.supplyAsync(() -> pause(t), executor)
.whenComplete((result, th) -> {
System.out.println("hello" + result);
})).toArray(CompletableFuture[]::new);
// 开始等待所有任务执行完成
System.out.println("start block");
CompletableFuture.allOf(completableFutures).join();
System.out.println("block finish, consume time:" + (System.currentTimeMillis() - start));
stringList.forEach(name -> CompletableFuture.supplyAsync(() -> {
// 封装了业务逻辑
System.out.println("name = " + name);
return "success";
}).exceptionally(e -> {
System.out.println(e);
return "false";
}));
// 关闭线程池
executor.shutdown();
}
private static void sleepRandom() {
System.out.println("测试方法");
}
public static String pause(String name) {
try {
Thread.sleep(300);
} catch (Exception e) {
e.printStackTrace();
}
return name;
}
}
简约写法CompletableFuture执行任务并返回
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/**
* @author lisanwei24282
*/
public class CompletableFutureProblem {
public void batchOperation() {
/* 集合任务,统一处理 */
List<String> stringList = new ArrayList<>();
stringList.add("task1");
stringList.add("task2");
/* 任务提交汇总 */
List<CompletableFuture<String>> futures = new ArrayList<>();
stringList.parallelStream().forEach(str -> {
/* 调用业务方法 */
CompletableFuture<String> response = restApiCall(str);
futures.add(response);
});
/* 等待所有返回 */
CompletableFuture<Void> result = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
CompletableFuture<List<String>> convertedResult = result.thenApply(v ->
futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);
/* 获取返回数据结果 */
try {
List<String> finishedTask = convertedResult.get();
System.out.println(finishedTask.toString());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
/* 假设执行具体业务逻辑功能 */
public CompletableFuture<String> restApiCall(String str) {
return CompletableFuture.supplyAsync(() -> {
return "Complete-" + str;
});
}
public static void main(String[] args) {
CompletableFutureProblem problem = new CompletableFutureProblem();
problem.batchOperation();
}
}
网友评论