美文网首页
0318 guava并发工具

0318 guava并发工具

作者: 李福春carter | 来源:发表于2020-03-19 13:51 被阅读0次
    image.png

    <br />

    <br />并发是一个难题,但是可以通过使用强力简单的抽象来显著的简化,为了简化问题,guava扩展了Future接口,即 ListenableFuture (可以监听的Future)。<br />我强烈建议你在你的所有代码里使用ListenableFuture去替代Future,原因如下:

    • 很多的Futures 类的方法需要它。(Futures工具类使用)
    • 它比后来改造为ListenableFutrue更简单。(早点使用比重构更简单)
    • 工具方法的提供者不需要提供Future和ListenableFuture方法的变体。(不需要兼容两套)

    <br />

    接口

    一个传统的Futrue代表一个异步计算的结果:一个可能完成也可能没有完成输出结果的计算。<br />一个Future可以用在进度计算,或者说是 一个提供给我们结果的服务的承诺。<br />
    <br />一个ListenableFuture允许注册当你在计算完成的时候的回调,或者计算已经完成了。<br />这个简单的增强让高效支持多种操作成为可能。而Future接口并不能支持。<br />
    <br />ListenbleFuture中添加的基本操作是<br />addListener(Runnable , Executor ),<br />它指出了当未来计算完成时,指定的Runnable会在指定的Executor中运行。<br />
    <br />

    增加回调

    很多用户喜欢使用 Futures.addCallback(ListenableFuture,FutureCallback,Executor)方法。<br />FutureCallback实现了下面两个方法:<br />
    <br />

    • onSuccess(v) 当未来成功执行的动作,基于计算结果
    • onFailure(Throwable) 当未来失败执行的动作,基于失败

    创建

    相较于jdk提供的 ExecutorService.submit(Callable)方法来初始化一个异步计算。它返回一个常规的Future,<br />guava提供了ListeningExecutorService接口,它返回ListenableFuture。<br />把ExecutorService转换为ListenableExecutorService<br />使用:MoreExecutors.listeningDecorator(ExecutorService)

    基础用法如下:

    
    /**
     * 说明:使用例子代码
     * @author carter
     * 创建时间: 2020年03月19日 9:54 上午
     **/
    
    @Slf4j
    public class ListenableFutureUtils {
    
        public static void main(String[] args) {
    
    ListeningExecutorService service = MoreExecutors.listeningDecorator(
        Executors.newFixedThreadPool(10));
    
    
            final ListenableFuture<AResult> listenableFuture = service.submit(() -> {
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return new AResult(30, "male", 1);
    
            });
    
    
            Futures.addCallback(listenableFuture,
                    new FutureCallback<AResult>() {
                        @Override
                        public void onSuccess(AResult aResult) {
                            log.info("计算成功,{}",aResult);
                        }
    
                        @Override
                        public void onFailure(Throwable throwable) {
    
                            log.error("计算错误",throwable);
                            
                        }
                    },service);
    
        }
        
        @Data
        @AllArgsConstructor
        public static class AResult{
            
            private Integer age;
            
            private String sex;
            
            private Integer id;
            
            
        }
        
    }
    

    相对的,如果你想从基于FutureTask的API转换过来,<br />Guava提供了<br />ListenableFutureTask.create(Callable)<br />和<br />ListenableFutureTask.create(Runnable)<br />不同于jdk,ListenableFutureTask并不是直接扩展的。

    如果你喜欢抽象的设置future的值,而不是实现一个方法然后计算值,可以考虑使用AbstractFuture或使用SettableFuture ;

    如果你必须转换Future为ListenableFuture,你别无选择,必须使用 JdkFutureAdapters.listenInPoolThread(Future)来转换Future为ListenableFuture<br />任何时候只要可能,推荐你修改源码让它返回一个 ListenableFuture

    应用

    使用ListenablFuture最重要的原因是可以使用链式异步操作。

    代码如下:

    package com.xxx.demo;
    
    import com.google.common.util.concurrent.AsyncFunction;
    import com.google.common.util.concurrent.Futures;
    import com.google.common.util.concurrent.ListenableFuture;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    
    /**
     * 说明:异步操作链
     * @author carter
     * 创建时间: 2020年03月19日 10:11 上午
     **/
    
    public class ApplicationUtils {
    
    
        public static void main(String[] args) {
    
            Query query = new Query(30);
    
            ListenableFuture<RowKey> rowKeyFuture = lookUp(query);
    
            AsyncFunction<RowKey, QueryResult> queryFun = rowKey -> readData(rowKey);
    
            final ListenableFuture<QueryResult> queryResultListenableFuture = 
                Futures.transformAsync(rowKeyFuture, queryFun);
    
        }
    
        private static ListenableFuture<QueryResult> readData(RowKey rowKey) {
            return null;
        }
    
        private static ListenableFuture<RowKey> lookUp(Query query) {
            return null;
        }
    
    
        @Data
        @AllArgsConstructor
        public static class RowKey {
    
            private String id;
    
        }
    
        @Data
        @AllArgsConstructor
        public static class Query {
    
            private Integer age;
    
        }
    
    
        @Data
        @AllArgsConstructor
        public static class QueryResult {
    
            private String id;
            private String age;
    
        }
    
    
    }
    
    

    很多其他高效支持的操作ListenableFuture提供,而Future不提供。<br />不同的操作可以被不同的线程池执行,一个简单的ListenableFuture可以有多个操作去等待。

    只要一个操作开始,其他多个操作应该开始,fan-out, 千帆竞发。

    ListenableFuture可以实现这样的操作:它触发了所有请求的回调。

    通过少量的工作,我们可以 fan-in.

    触发一个ListenableFuture 来获得计算结果,当其他的Future结束的时候。

    Futures.allAsList是一个例子。

    方法介绍:

    方法 描述
    transformAsync(ListenableFuture , AsyncFunction , Executor) 返回一个新的ListenableFuture,它的结果是执行异步函数的返回,函数入参是ListenableFuture的返回结果;
    transform(ListenableFuture , Function , Executor) 返回一个新的ListenableFuture,它的结果是执行函数的返回,函数入参是ListenableFuture的返回结果;
    allAsList(Iterable<ListenableFuture>) 返回一个ListenableFuture,它的结果是一个list,包含每一个列表中的ListenableFuture的执行结果,任何一个ListenableFuture执行失败或者取消,最后的返回结果取消
    successfullAsList(Iterable<ListenableFuture>) 返回一个ListenableFuture,它的结果是一个list,包含每一个列表中的ListenableFuture的执行结果,成功的是结果,失败或者取消的值使用null替代

    AsyncFunction<A,B> 提供了一个方法 , ListenableFuture<B> apply(A inpunt),它可以用来异步的转换值。

    代码如下:

    package com.xxx.demo;
    
    import com.google.common.collect.Lists;
    import com.google.common.util.concurrent.FutureCallback;
    import com.google.common.util.concurrent.Futures;
    import com.google.common.util.concurrent.ListenableFuture;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.List;
    
    /**
     * 说明:成功执行结果汇集
     * @author carter
     * 创建时间: 2020年03月19日 10:34 上午
     **/
    @Slf4j
    public class Test3 {
    
        public static void main(String[] args) {
    
            List<ListenableFuture<QueryResult>> querys = Lists.newLinkedList();
            final ListenableFuture<List<QueryResult>> successfulAsList =
                Futures.successfulAsList(querys);
            
            Futures.addCallback(successfulAsList, new FutureCallback<List<QueryResult>>() {
                @Override
                public void onSuccess(List<QueryResult> queryResults) {
                    log.info("执行结果列表:{}",queryResults);
                }
    
                @Override
                public void onFailure(Throwable throwable) {
                    log.error("执行失败",throwable);
                }
            });
    
    
        }
    
        @Data
        @AllArgsConstructor
        public static class QueryResult{
            
            
          private  Integer age;
            
        }
        
    
    }
    
    

    嵌套的Future

    你的代码调用一个通用接口并返回一个Future,很可能最终返回一个嵌套的Future.

    package com.xxx.demo;
    
    import com.google.common.util.concurrent.ListenableFuture;
    import com.google.common.util.concurrent.ListeningExecutorService;
    import com.google.common.util.concurrent.MoreExecutors;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.Executors;
    
    /**
     * 说明:嵌套的ListenableFuture
     * @author carter
     * 创建时间: 2020年03月19日 10:43 上午
     **/
    
    public class Test4 {
    
        public static void main(String[] args) {
    
    
            final ListeningExecutorService executorService = MoreExecutors
                .listeningDecorator(Executors.newFixedThreadPool(2));
            final ListeningExecutorService otherExecutorService = MoreExecutors
                .listeningDecorator(Executors.newFixedThreadPool(2));
    
    
            Callable<Foo> otherCallback =  ()->new Foo("aaa");
    
    
            final ListenableFuture<ListenableFuture<Foo>> submit = 
                    executorService.submit(() -> otherExecutorService.submit(otherCallback));
    
    
        }
        
        @Data
        @AllArgsConstructor
        public static class Foo{
            
            private String name;
        }
        
    }
    
    

    例子最后返回的是: ListenableFuture<ListenableFuture<Foo>> ,<br />这个代码不对,因为当外层的Future 取消的时候,无法传播到内层的Future,<br />这也是一个 使用get()检查别的Future或者Listnener的常规的错误,

    但是,除非特别关注 否则 otherCallback抛出的异常会被压制。<br />为了避免这种情况,所有的guava的Future处理方法(有些从jdk来),有 *Async版本来安全的解开这个嵌套。

    比如:transform,transformAsyn, submit, submitAsync方法。

    深入研究

    原创不易,转载请注明出处。

    相关文章

      网友评论

          本文标题:0318 guava并发工具

          本文链接:https://www.haomeiwen.com/subject/saxlyhtx.html