直接上代码
public static void main(String[] args) {
List l = Arrays.asList("1", "2", "3", "5", "4");
Long start1 = System.currentTimeMillis();
test1(l);
System.out.println(System.currentTimeMillis() - start1);
Long start2 = System.currentTimeMillis();
test2(l);
System.out.println(System.currentTimeMillis() - start2);
Long start3 = System.currentTimeMillis();
test3(l);
System.out.println(System.currentTimeMillis() - start3);
Long start4 = System.currentTimeMillis();
test4(l);
System.out.println(System.currentTimeMillis() - start4);
}
//普通的顺序处理
public static List<String> test1(List<String> list){
return list.stream().map(l -> {
l = l+l;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return l;
}).collect(Collectors.toList());
}
//并行流方式处理
public static List<String> test2(List<String> list){
return list.parallelStream().map(l -> {
l = l+l;
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return l;
}).collect(Collectors.toList());
}
//CompletableFuture异步请求方式处理
public static List<String> test3(List<String> list){
List<CompletableFuture<String>> lists = list.stream().map(l -> CompletableFuture.supplyAsync(()->{try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}return l+l;})
).collect(Collectors.toList());
return lists.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
//CompletableFuture异步请求方式处理with Executor
public static List<String> test4(List<String> list){
Executor executor = Executors.newFixedThreadPool(8, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
List<CompletableFuture<String>> lists = list.stream().map(l -> CompletableFuture.supplyAsync(()->{try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}return l+l;},executor)
).collect(Collectors.toList());
return lists.stream().map(CompletableFuture::join).collect(Collectors.toList());
}
执行结果如下:
10088
4011
4003
2003
CompletableFuture
1 )多步处理返回future
CompletableFuture<Integer> f = CompletableFuture.supplyAsync(() -> 2).thenApply(future -> future +
2).thenApply(future -> future + 2).thenCompose(f1-> CompletableFuture.supplyAsync(()->f1+2));
2 )处理后消费
CompletableFuture.supplyAsync(() -> 2).thenAccept(System.out::println);
3 ) 完成后做操作
CompletableFuture.supplyAsync(() -> 2).whenComplete((s, throwable) -> System.out.println(s));
4 ) 异常处理 2种方式
CompletableFuture f = CompletableFuture.supplyAsync(() -> 2/0).thenApply(future -> future + 2).thenApply(future -> future + 2).thenCompose(f1-> CompletableFuture.supplyAsync(()->f1+2));
CompletableFuture exceptionHandler = f.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; });
f.completeExceptionally(new RuntimeException("completed exceptionally"));
System.out.println(exceptionHandler.get());
System.out.println(f.get());
exceptionHandler 可以继续向下执行。但是f.get会抛出异常无法向下执行
参考:https://juejin.im/post/5cac710ce51d456e2c2484d5
https://segmentfault.com/a/1190000013452165?utm_source=index-hottest
网友评论