场景:
假设这种场景:要同时处理多条数据,而且不阻塞调用方,且所有的结果处理完之后,再执行指定的其他的处理。不阻塞我们可以采用异步处理即可,但是这里面还有就是全部执行完之后还有后处理,而且是所有的流都执行完之后。那么这里completablefuture中有一个这样的函数allOf
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
这个就是用于多个流的合并,但是真正将List<CompletableFuture>转变成一个CompletableFuture还需要再尽心封装一层:
/**
* 全流式处理的融合
* @param futures
* @param <T>
* @return
*/
public <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.<T>toList()));
}
就可以愉快的使用了
测试如下:
**
* 多数据流的异步操作
* 也用于多流输入的情况下的合并,这样可以将多流进行合并的输入
*/
@Test
public void allOf(){
CompletableFuture c1 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
show("111");
return "111";
});
CompletableFuture c2 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
show("222");
return "222";
});
CompletableFuture c3 = CompletableFuture.supplyAsync(()->{
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
e.printStackTrace();
}
show("333");
return "333";
});
CompletableFuture c4 = sequence(Arrays.asList(c1, c2, c3));
c4.thenAccept(System.out::println);
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
网友评论