美文网首页java多线程
Guava 官方文档:Concurrency(一)

Guava 官方文档:Concurrency(一)

作者: changhr2013 | 来源:发表于2020-03-27 14:24 被阅读0次

    ListenableFuture

    并发是一个困难的问题,但是通过使用功能强大且简单的抽象可以显著的简化并发。为了简化问题,Guava 使用 ListenableFuture 扩展了JDK 的 Future 接口。

    我们强烈建议你在所有代码中始终使用 ListenableFuture 而不是 Future,因为:

    • 大多数 Futures 工具类下的方法都需要它。

    • 直接使用比以后迁移到 ListenableFuture 编程更加容易。

    • Guava 提供的通用公共类封装了公共的操作方方法,不需要再提供 FutureListenableFuture 的变换方法。

    Interface

    传统的 Future 表示异步计算的结果:即 可能已经可能尚未完成计算结果 的计算。一个 Future 可以作为正在进行中的计算的句柄,是服务向我们提供结果的承诺。

    ListenableFuture 允许你注册在计算完成后或在计算已经完成时立即执行的回调方法(callbacks)。这个简单的改进使其可以有效地支持许多 JDK 的 Future 接口无法支持的操作。

    ListenableFuture 添加的基本操作是 addListener(Runnable, Executor),它指定当完成此 Future 表示的计算时,指定的 Runnable 将在指定的 Executor 上运行。

    Adding Callbacks

    大多数用户应该会更喜欢使用 Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor). 这个 FutureCallback<V> 接口需要实现两个方法:

    • onSuccess(V),在 Future 成功的时候执行,根据 Future 结果来判断是否执行。
    • onFailure(Throwable),在 Future 失败的时候执行,根据 Future 结果来判断是否执行。

    Creation

    对应于JDK ExecutorService.submit(Callable) 方法来启动异步计算,Guava 提供了 ListeningExecutorService 接口,该接口在 ExecutorService 返回普通 Future 的所有地方都返回了 ListenableFuture。要将 ExecutorService 转换为 ListeningExecutorService,可以使用 MoreExecutors.listeningDecorator(ExecutorService)

    ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
    ListenableFuture<Explosion> explosion = service.submit(
        new Callable<Explosion>() {
          public Explosion call() {
            return pushBigRedButton();
          }
        });
    Futures.addCallback(
        explosion,
        new FutureCallback<Explosion>() {
          // we want this handler to run immediately after we push the big red button!
          public void onSuccess(Explosion explosion) {
            walkAwayFrom(explosion);
          }
          public void onFailure(Throwable thrown) {
            battleArchNemesis(); // escaped the explosion!
          }
        },
        service);
    

    另外,如果您要从基于 FutureTask 的 API 进行转换,则 Guava 提供了 ListenableFutureTask.create(Callable <V>)ListenableFutureTask.create(Runnable,V)。与 JDK 不同,ListenableFutureTask 不能直接继承。

    如果您更喜欢使用抽象的方式来设置 Future 的值,而不是想实现接口中的方法,可以考虑继承抽象类 AbstractFuture <V> 或直接使用 SettableFuture

    如果你必须将其他 API 提供的 Future 转换为 ListenableFuture,那么没有什么好的办法,只能使用重量级的 JdkFutureAdapters.listenInPoolThread(Future)Future 转换为 ListenableFuture。有可能的话,最好修改原始代码直接返回 ListenableFuture

    Application

    使用 ListenableFuture 的最重要原因是可以拥有复杂的异步操作链。

    ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
    AsyncFunction<RowKey, QueryResult> queryFunction =
      new AsyncFunction<RowKey, QueryResult>() {
        public ListenableFuture<QueryResult> apply(RowKey rowKey) {
          return dataService.read(rowKey);
        }
      };
    ListenableFuture<QueryResult> queryFuture =
        Futures.transformAsync(rowKeyFuture, queryFunction, queryExecutor);
    

    ListenableFuture 可以有效地支持许多其他操作,而 Future 不能单独支持。不同的 Executors 可以执行不同的操作,并且单个 ListenableFuture 可以有多个操作在等待它。

    只要一个操作开始,其他的一些操作也会立即开始执行 —— “fan-out” —— ListenableFuture 能够满足这样的场景:它将触发所有请求的回调。进一步的,它同时可以满足 “fan-in” 场景,在其它的 Futures 全部计算完成后立即触发 ListenableFuture 获取计算结果:有关示例,可以参考 Futures.allAsList 的实现。

    Method Description See also
    transformAsync(ListenableFuture<A>, AsyncFunction<A, B>, Executor)* 返回一个新的 ListenableFuture,其结果是将给定的 AsyncFunction 应用于给定的 ListenableFuture 的结果。 transformAsync(ListenableFuture<A>, AsyncFunction<A, B>)
    transform(ListenableFuture<A>, Function<A, B>, Executor) 返回一个新的 ListenableFuture,其结果是将给定 Function 应用于给定 ListenableFuture 的结果。 transform(ListenableFuture<A>, Function<A, B>)
    allAsList(Iterable<ListenableFuture<V>>) 返回一个 ListenableFuture,其值是一个 List 集合,该集合按顺序包含每个输入 Future 的值。如果有任何输入的 Future 失败或被取消,则该 Future 失败或被取消。 allAsList(ListenableFuture<V>...)
    successfulAsList(Iterable<ListenableFuture<V>>) 返回一个 ListenableFuture,其值是一个 List 集合,该集合按顺序包含每个成功输入 Future 的值。与失败或取消的 Future 相对应的值将会被替换为 null successfulAsList(ListenableFuture<V>...)
    • AsyncFunction <A,B> 提供一种方法,ListenableFuture<B> apply(A input)。它可用于异步转换值。
    List<ListenableFuture<QueryResult>> queries;
    // The queries go to all different data centers, but we want to wait until they're all done or failed.
    
    ListenableFuture<List<QueryResult>> successfulQueries = Futures.successfulAsList(queries);
    
    Futures.addCallback(successfulQueries, callbackOnSuccessfulQueries);
    

    Avoid nested Futures(避免内嵌的 Future)

    在代码调用通用接口并返回 Future 的情况下,最终可能会出现嵌套 Future 的情况。例如:

    executorService.submit(new Callable<ListenableFuture<Foo>() {
      @Override
      public ListenableFuture<Foo> call() {
        return otherExecutorService.submit(otherCallable);
      }
    });
    

    以上代码将返回一个 ListenableFuture<ListenableFuture<Foo>>。这段代码是不正确的,因为如果外层的 Future 在 complete 之前调用了 cancel 方法,外层的 cancel 就无法传播给内层的 Future,导致内层 Future 无法被取消。另外在内层 Future 调用 get 方法或者使用监听器处理结果时,除非特别的小心,否则很容易出现没有显式处理内层 Future 抛出的异常导致异常被外层 Future 忽略掉。为了避免这种情况,所有的 Guava 提供的 Future 处理方法都有一个异步版本安全的解开了这种嵌套:例如 transform(ListenableFuture<A>, Function<A, B>, Executor) and transformAsync(ListenableFuture<A>, AsyncFunction<A, B>, Executor), or ExecutorService.submit(Callable) and submitAsync(AsyncCallable<A>, Executor), 等等。

    相关文章

      网友评论

        本文标题:Guava 官方文档:Concurrency(一)

        本文链接:https://www.haomeiwen.com/subject/sicluhtx.html