工作中发现有些同学在异步编程方面有些困惑,这里讲解一些应对措施。
回调地狱
回调地狱是说我们在异步编程时常用到回调(callback)的方式进行传递返回值,但是假如我们需要保证几个调用之间的顺序是串行执行的时候就会在回调里写回调,比如这样:
private static ExecutorService executor = Executors.newCachedThreadPool();
private static void first(BiConsumer<Integer, Exception> consumer) {
executor.execute(() -> {
System.out.println("exec first");
consumer.accept(1, null);
});
}
private static void second(Integer firstResult, BiConsumer<Long, Exception> consumer) {
executor.execute(() -> {
System.out.println("exec second");
consumer.accept(2L, null);
});
}
private static void third(Long secondResult, BiConsumer<Double, Exception> consumer) {
executor.execute(() -> {
System.out.println("exec third");
consumer.accept(3d, null);
});
}
// 回调地狱演示
public static void main(String[] args) {
first((integer, e) -> {
if (e != null) {
// handle exception
} else {
second(integer, (aLong, e1) -> {
if (e1 != null) {
// handle exception
} else {
third(aLong, (aDouble, e2) -> {
if (e2 != null) {
// handle exception
} else {
System.out.println(aDouble);
}
});
}
});
}
});
}
写着写着会发现我们的代码呈金字塔形向后延伸,可读性大大下降,这就是所谓的回调地狱;
java8 自带的CompletableFuture
可以很好的应对这种逻辑结构,下面对上面的代码进行改造,首先我们对 callback 类型的代码进行改造,如下所示:
private static CompletableFuture<Integer> first() {
CompletableFuture<Integer> future = new CompletableFuture<>();
first((integer, e) -> {
if (e != null) {
future.completeExceptionally(e);
} else {
future.complete(integer);
}
});
return future;
}
private static CompletableFuture<Long> second(Integer firstResult) {
// 同 first
}
private static CompletableFuture<Double> third(Long secondResult) {
// 同 first
}
将每一步都进行 future 改造后再看看最初的回调式的代码可以怎么写:
public static void main(String[] args) {
first()
.thenCompose(integer -> {
return second(integer);
})
.thenCompose(aLong -> {
return third(aLong);
})
.whenComplete((aDouble, throwable) -> {
System.out.println(aDouble);
System.out.println(throwable); // 所有的异常最终都会到这里处理
});
}
可以看到我们吧原来金字塔形的代码形状拉平了,且异常处理集中在了最后,无需每一步都处理,可读性也大大提升。
处理异步并发
有时候我们的不同调用间并没有互相依赖关系,可以并发发起调用,并且在两个调用都结束时进行下一步,如下所示:
private static void exec1(BiConsumer<Integer, Exception> consumer) {
executor.execute(() -> {
System.out.println("exec 1");
consumer.accept(1, null);
});
}
private static void exec2(BiConsumer<Integer, Exception> consumer) {
executor.execute(() -> {
System.out.println("exec 2");
consumer.accept(1, null);
});
}
public static void main(String[] args) {
exec1((integer, e) -> {
});
exec2((integer, e) -> {
});
}
有些童鞋想起了CountDownLatch
,怎么实现呢
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
exec1((integer, e) -> {
latch.countDown();
});
exec2((integer, e) -> {
latch.countDown();
});
latch.await();
// 继续处理
}
但是CountDownLatch
会阻塞我们当前线程的,如果线程数不多时会大大影响服务吞吐量,而且线程数也不是无限的,并不能简单的通过增加执行线程数来提升吞吐量,那么用CompletableFuture
可以怎么做呢,首先还是要对exec1和 exec2 进行 future 改造,
private CompletableFuture<Integer> exec1() {
CompletableFuture<Integer> future = new CompletableFuture<>();
exec1((integer, e) -> {
future.complete(integer);// 异常处理省略
});
return future;
}
private CompletableFuture<Integer> exec2() {
CompletableFuture<Integer> future = new CompletableFuture<>();
exec2((integer, e) -> {
future.complete(integer);// 异常处理省略
});
return future;
}
接下来进行改造:
public static void main(String[] args) throws InterruptedException {
final CompletableFuture<Integer> future1 = exec1();
final CompletableFuture<Integer> future2 = exec2();
final CompletableFuture<Void> future = CompletableFuture.allOf(future1,future2);
future.whenComplete((unused, throwable) -> {
if (throwable != null) {
// 异常处理
}
final Integer integer1 = future1.join();
final Integer integer2 = future2.join();
// 继续处理
});
}
这样做的好处是不阻塞当前执行线程,有效提升服务吞吐量。
另外如果是批量执行任务而不关心执行顺序的话可以看看CompletionService
,在此就不赘述了。
网友评论