美文网首页
Guava——ListenableFuture

Guava——ListenableFuture

作者: jiangmo | 来源:发表于2018-04-02 11:02 被阅读275次

    缘由

    To simplify matters, Guava extends the Future interface of the JDK with ListenableFuture
    We strongly advise that you always use ListenableFuture instead of Future in all of your code, because:

    • Most Futures methods require it.
    • It's easier than changing to ListenableFuture later.
    • Providers of utility methods won't need to provide Future and ListenableFuture variants of their methods.

    Interface

    A ListenableFuture allows you to register callbacks to be executed once the computation is complete, or if the computation is already complete, immediately. This simple addition makes it possible to efficiently support many operations that the basic Future interface cannot support.

    The basic operation added by ListenableFuture is addListener(Runnable, Executor), which specifies that when the computation represented by this Future is done, the specified Runnable will be run on the specified Executor.

    Adding Callbacks

    Most users will prefer to use Futures.addCallback(ListenableFuture<V>, FutureCallback<V>, Executor), or the version which defaults to using MoreExecutors.directExecutor(), for use when the callback is fast and lightweight. A FutureCallback<V> implements two methods:

    • onSuccess(V), the action to perform if the future succeeds, based on its result
    • onFailure(Throwable), the action to perform if the future fails, based on the failure

    Creation

    Corresponding to the JDK ExecutorService.submit(Callable) approach to initiating an asynchronous computation, Guava provides the ListeningExecutorService interface, which returns a ListenableFuture wherever ExecutorService would return a normal Future. To convert an ExecutorService to a ListeningExecutorService, just useMoreExecutors.listeningDecorator(ExecutorService).

    ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
    ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
      public Explosion call() {
        return pushBigRedButton();
      }
    });
    Futures.addCallback(explosion, new FutureCallback<Explosion>() {
      // we want this handler to run immediately after we push the big red button!
      public void onSuccess(Explosion explosion) {
        walkAwayFrom(explosion);
      }
      public void onFailure(Throwable thrown) {
        battleArchNemesis(); // escaped the explosion!
      }
    });
    

    Alternatively, if you're converting from an API based on FutureTask, Guava offers ListenableFutureTask.create(Callable<V>) and ListenableFutureTask.create(Runnable, V). Unlike the JDK, ListenableFutureTask is not meant to be extended directly.

    If you prefer an abstraction in which you set the value of the future rather than implementing a method to compute the value, consider extending AbstractFuture<V> or using SettableFuture directly.

    If you must convert a Future provided by another API to an ListenableFuture, you may have no choice but to use the heavyweight JdkFutureAdapters.listenInPoolThread(Future) to convert a Future to a ListenableFuture.
    Whenever possible, it is preferred to modify the original code to return a ListenableFuture.

    Application

    The most important reason to use ListenableFuture is that it becomes possible to have complex chains of asynchronous operations.

    * An AsyncFunction<A, B> provides one method, ListenableFuture<B> apply(A input). It can be used to asynchronously transform a value.

    ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
    AsyncFunction<RowKey, QueryResult> queryFunction =
      new AsyncFunction<RowKey, QueryResult>() {
        public ListenableFuture<QueryResult> apply(RowKey rowKey) {
          return dataService.read(rowKey);
        }
      };
    ListenableFuture<QueryResult> queryFuture =
        Futures.transformAsync(rowKeyFuture, queryFunction, queryExecutor);
    

    Many other operations can be supported efficiently with a ListenableFuture that cannot be supported with a Future alone. Different operations may be executed by different executors, and a single ListenableFuture can have multiple actions waiting upon it.

    When several operations should begin as soon as another operation starts -- "fan-out" -- ListenableFuture just works: it triggers all of the requested callbacks. With slightly more work, we can "fan-in," or trigger a ListenableFuture to get computed as soon as several other futures have all finished: see the implementation of Futures.allAsList for an example.

    Avoid nested Futures

    In cases where code calls a generic interface and returns a Future, it's possible to end up with nested Futures. For example:

    executorService.submit(new Callable<ListenableFuture<Foo>() {
      @Override
      public ListenableFuture<Foo> call() {
        return otherExecutorService.submit(otherCallable);
      }
    });
    

    would return a ListenableFuture<ListenableFuture<Foo>>.

    This code is incorrect, because if a cancel on the outer future races with the completion of the outer future, that cancellation will not be propagated to the inner future.
    It's also a common error to check for failure of the other future using get() or a listener, but unless special care is taken an exception thrown fromotherCallable would be suppressed.
    To avoid this, all of Guava's future-handling methods (and some from the JDK) have Async versions that safely unwrap this nesting - transform(ListenableFuture<A>, Function<A, B>, Executor) and transformAsync(ListenableFuture<A>, AsyncFunction<A, B>, Executor), or ExecutorService.submit(Callable) and submitAsync(AsyncCallable<A>, Executor), etc.

    CheckedFuture

    Guava also provides a CheckedFuture<V, X extends Exception> interface. A CheckedFutureis a ListenableFuture that includes versions of the get methods that can throw a checked exception. This makes it easier to create a future that executes logic which can throw an exception. To convert a ListenableFuture to a CheckedFuture, useFutures.makeChecked(ListenableFuture<V>, Function<Exception, X>).

    main class

    • MoreExecutors -- Executors
      该类是final类型的工具类,提供了很多静态方法。例如listeningDecorator方法初始化ListeningExecutorService方法,使用此实例submit方法即可初始化ListenableFuture对象。
    • ListeningExecutorService -- ExecutorService
      该类是对ExecutorService的扩展,重写ExecutorService类中的submit方法,返回ListenableFuture对象。
    • ListenableFuture -- Future
      该接口扩展了Future接口,增加了addListener方法,该方法在给定的excutor上注册一个监听器,当计算完成时会马上调用该监听器。不能够确保监听器执行的顺序,但可以在计算完成时确保马上被调用。
    • FutureCallback jdk没有的东西
      该接口提供了OnSuccess和OnFailuren方法。获取异步计算的结果并回调。
    • Futures
      该类提供和很多实用的静态方法以供使用。
    • ListenableFutureTask -- ListenableFutureTask
      该类扩展了FutureTask类并实现ListenableFuture接口,增加了addListener方法。

    Future局限性

    Future 具有局限性。在实际应用中,当需要下载大量图片或视频时,可以使用多线程去下载,提交任务下载后,可以从多个Future中获取下载结果,由于Future获取任务结果是阻塞的,所以将会依次调用Future.get()方法,这样的效率会很低。很可能第一个下载速度很慢,则会拖累整个下载速度。
    Future主要功能在于获取任务执行结果和对异步任务的控制。但如果要获取批量任务的执行结果,从上面的例子我们已经可以看到,单使用 Future 是很不方便的

    • 没有好的方法去判断第一个完成的任务(可以用 CompletionService 解决,CompletionService 提供了一个 take() 阻塞方法,用以依次获取所有已完成的任务。)
    • Future的get方法 是阻塞的,使用不当会造成线程的浪费。(可以用 Google Guava 库所提供的 ListeningExecutorService 和 ListenableFuture 来解决)
    • 不能防止任务的重复提交。(要做到这件事就需要 Future 最常见的一个实现类 FutureTask 了)

    在实际的使用中建议使用Guava ListenableFuture来实现异步非阻塞,目的就是多任务异步执行,通过回调的方方式来获取执行结果而不需轮询任务状态。

    Test Code

    使用callback

    public static void testRateLimiter() {
            ListeningExecutorService executorService = MoreExecutors
                    .listeningDecorator(Executors.newCachedThreadPool());
    
            RateLimiter limiter = RateLimiter.create(5.0); // 每秒不超过4个任务被提交
            List<ListenableFuture<Integer>> listfutures = Lists.newArrayList();
            ListenableFuture<Integer> tmp = null;
            for (int i = 0; i < 10; i++) {
                limiter.acquire(); // 请求RateLimiter, 超过permits会被阻塞
                tmp = executorService.submit(new Task(i));
                tmp.addListener(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("add Listener");
                    }
                }, executorService);
    
                Futures.addCallback(tmp, new FutureCallback<Integer>() {
                    @Override
                    public void onSuccess(Integer result) {
                        System.out.println("suc"+result);
                    }
    
                    @Override
                    public void onFailure(Throwable t) {
                        System.out.println("fail"+t.toString());
                    }
                });
    
                listfutures.add(tmp);
    
            }
    
            listfutures.forEach(e-> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                    System.out.println("e = " + e.get());
                    System.out.println("e = " + e.get());
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                } catch (ExecutionException e1) {
                    e1.printStackTrace();
                }
            });
        }
    
        static class Task implements Callable<Integer> {
            private int number;
            public Task(int i){
                this.number = i;
            }
            @Override
            public Integer call() throws Exception {
                TimeUnit.SECONDS.sleep(2);
                System.out.println("call execute.." + number);
                return number;
            }
        }
    

    使用链式future

    那如果需要多重回调呢?

    方法 描述
    transform 加一个回调函数
    allAsList 返回一个ListenableFuture ,该ListenableFuture 返回的result是一个List,List中的值是每个ListenableFuture的返回值,假如传入的其中之一fails或者cancel,这个Future fails 或者canceled
    successAsList 返回一个ListenableFuture ,该Future的结果包含所有成功的Future,按照原来的顺序,当其中之一Failed或者cancel,则用null替代
    public static void testLinkedFutureLisener() {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
    
            final ListeningExecutorService poolService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(4));
            ListenableFuture<String> futureBase = poolService.submit(new Task("task1"));
            Futures.addCallback(futureBase, new FutureCallback<String>() {
                @Override
                public void onSuccess(String result) {
                    System.out.println("onSuccess result = " + result);
                }
    
                @Override
                public void onFailure(Throwable t) {
                    System.out.println("onFailure result = " + t.toString());
    
                }
            });
    
            // 链式1
    
            ListenableFuture<String> base_1 = Futures.transform(futureBase, new AsyncFunction<String, String>() {
                public ListenableFuture<String> apply(final String input) throws Exception {
                    ListenableFuture<String> temp = poolService.submit(new Callable<String>() {
                        public String call() throws Exception {
                            System.out.println("base_1回调线程正在执行...input:"+input);
                            try {
                                Thread.sleep(1000);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                            System.out.println("base_1回调线程 done");
    
                            return input + " & base_1回调线程的结果 ";
                        }
                    });
                    return temp;
                }
            }, poolService);
    
            ListenableFuture<String> base_2 = Futures.transform(futureBase, new AsyncFunction<String, String>() {
                public ListenableFuture<String> apply(final String input) throws Exception {
                    ListenableFuture<String> temp = poolService.submit(new Callable<String>() {
                        public String call() throws Exception {
                            System.out.println("base_2回调线程正在执行...input:"+input);
                            try {
                                Thread.sleep(2000);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                            System.out.println("base_2回调线程 done");
    
                            return input + " & base_2回调线程的结果 ";
                        }
                    });
                    return temp;
                }
            }, poolService);
    
            ListenableFuture<String> first = Futures.transform(base_2, new AsyncFunction<String, String>() {
                public ListenableFuture<String> apply(final String input) throws Exception {
                    ListenableFuture<String> temp = poolService.submit(new Callable<String>() {
                        public String call() throws Exception {
                            System.out.println("first回调线程正在执行...input:"+input);
                            try {
                                String resBase1 =  base_1.get();
                                System.out.println("resBase1 = " + resBase1);
                                countDownLatch.countDown();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                            System.out.println("first 回调线程 done");
    
                            return input + " & first回调线程的结果 ";
                        }
                    });
                    return temp;
                }
            }, poolService);
    
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            poolService.shutdown();
        }
    
    // 运行结果:
    task1 doing
    task1done
    onSuccess result = task1
    base_2回调线程正在执行...input:task1
    base_1回调线程正在执行...input:task1
    base_1回调线程 done
    base_2回调线程 done
    first回调线程正在执行...input:task1 & base_2回调线程的结果 
    resBase1 = task1 & base_1回调线程的结果 
    first 回调线程 done
    
    

    Ref:
    https://github.com/google/guava/wiki/ListenableFutureExplained
    https://blog.csdn.net/pistolove/article/details/51232004

    相关文章

      网友评论

          本文标题:Guava——ListenableFuture

          本文链接:https://www.haomeiwen.com/subject/ptdycftx.html