ListenableFuture
并发是一个困难的问题,但是通过使用功能强大且简单的抽象可以显著的简化并发。为了简化问题,Guava 使用 ListenableFuture
扩展了JDK 的 Future
接口。
我们强烈建议你在所有代码中始终使用 ListenableFuture
而不是 Future
,因为:
-
大多数
Futures
工具类下的方法都需要它。 -
直接使用比以后迁移到
ListenableFuture
编程更加容易。 -
Guava 提供的通用公共类封装了公共的操作方方法,不需要再提供
Future
和ListenableFuture
的变换方法。
Interface
传统的 Future
表示异步计算的结果:即 可能已经 或 可能尚未完成计算结果 的计算。一个 Future
可以作为正在进行中的计算的句柄,是服务向我们提供结果的承诺。
ListenableFuture
允许你注册在计算完成后或在计算已经完成时立即执行的回调方法(callbacks)。这个简单的改进使其可以有效地支持许多 JDK 的 Future
接口无法支持的操作。
ListenableFuture
添加的基本操作是 addListener(Runnable, Executor)
,它指定当完成此 Future
表示的计算时,指定的 Runnable
将在指定的 Executor
上运行。
Adding Callbacks
大多数用户应该会更喜欢使用 Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor)
. 这个 FutureCallback<V>
接口需要实现两个方法:
-
onSuccess(V)
,在Future
成功的时候执行,根据Future
结果来判断是否执行。 -
onFailure(Throwable)
,在Future
失败的时候执行,根据Future
结果来判断是否执行。
Creation
对应于JDK ExecutorService.submit(Callable)
方法来启动异步计算,Guava 提供了 ListeningExecutorService
接口,该接口在 ExecutorService
返回普通 Future
的所有地方都返回了 ListenableFuture
。要将 ExecutorService
转换为 ListeningExecutorService
,可以使用 MoreExecutors.listeningDecorator(ExecutorService)
。
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(
new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(
explosion,
new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
},
service);
另外,如果您要从基于 FutureTask
的 API 进行转换,则 Guava 提供了 ListenableFutureTask.create(Callable <V>)
和 ListenableFutureTask.create(Runnable,V)
。与 JDK 不同,ListenableFutureTask
不能直接继承。
如果您更喜欢使用抽象的方式来设置 Future
的值,而不是想实现接口中的方法,可以考虑继承抽象类 AbstractFuture <V>
或直接使用 SettableFuture
。
如果你必须将其他 API 提供的 Future
转换为 ListenableFuture
,那么没有什么好的办法,只能使用重量级的 JdkFutureAdapters.listenInPoolThread(Future)
将 Future
转换为 ListenableFuture
。有可能的话,最好修改原始代码直接返回 ListenableFuture
。
Application
使用 ListenableFuture
的最重要原因是可以拥有复杂的异步操作链。
ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
AsyncFunction<RowKey, QueryResult> queryFunction =
new AsyncFunction<RowKey, QueryResult>() {
public ListenableFuture<QueryResult> apply(RowKey rowKey) {
return dataService.read(rowKey);
}
};
ListenableFuture<QueryResult> queryFuture =
Futures.transformAsync(rowKeyFuture, queryFunction, queryExecutor);
ListenableFuture
可以有效地支持许多其他操作,而 Future
不能单独支持。不同的 Executors
可以执行不同的操作,并且单个 ListenableFuture
可以有多个操作在等待它。
只要一个操作开始,其他的一些操作也会立即开始执行 —— “fan-out” —— ListenableFuture
能够满足这样的场景:它将触发所有请求的回调。进一步的,它同时可以满足 “fan-in” 场景,在其它的 Futures
全部计算完成后立即触发 ListenableFuture
获取计算结果:有关示例,可以参考 Futures.allAsList
的实现。
Method | Description | See also |
---|---|---|
transformAsync(ListenableFuture<A>, AsyncFunction<A, B>, Executor) *
|
返回一个新的 ListenableFuture ,其结果是将给定的 AsyncFunction 应用于给定的 ListenableFuture 的结果。 |
transformAsync(ListenableFuture<A>, AsyncFunction<A, B>) |
transform(ListenableFuture<A>, Function<A, B>, Executor) |
返回一个新的 ListenableFuture ,其结果是将给定 Function 应用于给定 ListenableFuture 的结果。 |
transform(ListenableFuture<A>, Function<A, B>) |
allAsList(Iterable<ListenableFuture<V>>) |
返回一个 ListenableFuture ,其值是一个 List 集合,该集合按顺序包含每个输入 Future 的值。如果有任何输入的 Future 失败或被取消,则该 Future 失败或被取消。 |
allAsList(ListenableFuture<V>...) |
successfulAsList(Iterable<ListenableFuture<V>>) |
返回一个 ListenableFuture ,其值是一个 List 集合,该集合按顺序包含每个成功输入 Future 的值。与失败或取消的 Future 相对应的值将会被替换为 null 。 |
successfulAsList(ListenableFuture<V>...) |
-
AsyncFunction <A,B>
提供一种方法,ListenableFuture<B> apply(A input)
。它可用于异步转换值。
List<ListenableFuture<QueryResult>> queries;
// The queries go to all different data centers, but we want to wait until they're all done or failed.
ListenableFuture<List<QueryResult>> successfulQueries = Futures.successfulAsList(queries);
Futures.addCallback(successfulQueries, callbackOnSuccessfulQueries);
Avoid nested Futures(避免内嵌的 Future)
在代码调用通用接口并返回 Future
的情况下,最终可能会出现嵌套 Future
的情况。例如:
executorService.submit(new Callable<ListenableFuture<Foo>() {
@Override
public ListenableFuture<Foo> call() {
return otherExecutorService.submit(otherCallable);
}
});
以上代码将返回一个 ListenableFuture<ListenableFuture<Foo>>
。这段代码是不正确的,因为如果外层的 Future
在 complete 之前调用了 cancel
方法,外层的 cancel
就无法传播给内层的 Future
,导致内层 Future
无法被取消。另外在内层 Future
调用 get
方法或者使用监听器处理结果时,除非特别的小心,否则很容易出现没有显式处理内层 Future
抛出的异常导致异常被外层 Future
忽略掉。为了避免这种情况,所有的 Guava
提供的 Future
处理方法都有一个异步版本安全的解开了这种嵌套:例如 transform(ListenableFuture<A>, Function<A, B>, Executor)
and transformAsync(ListenableFuture<A>, AsyncFunction<A, B>, Executor)
, or ExecutorService.submit(Callable)
and submitAsync(AsyncCallable<A>, Executor)
, 等等。
网友评论