美文网首页
hsf笔记-ListenableFuture

hsf笔记-ListenableFuture

作者: 兴浩 | 来源:发表于2018-08-17 16:08 被阅读30次

    1.Future扩展

    ListenableFuture和SettableFuture类似于Netty的Promise接口

    1.1 ListenableFuture

    ListenableFuture是Guava库里面的概念,其扩展Future,添加了回调监听事件

    public interface ListenableFuture<V> extends Future<V> {
        void addListener(Runnable var1);
    
        void addListener(Runnable var1, Executor var2);
    }
    

    1.2 SettableFuture

    public interface SettableFuture<V> extends ListenableFuture<V> {
        boolean set(V var1);
    
        boolean setException(Throwable var1);
    }
    

    尝试设置值来手动触发回调监听事件

    1.3 测试代码

        @Test
        public void test1() throws Exception {
            final DefaultListenableFuture<String> future= Futures.createSettableFuture();
    
            future.addListener(new Runnable() {
                public void run() {
                    try {
                        System.out.println(future.get());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            future.set("hello");
            
            System.in.read();
        }
    

    测试结果

    hello
    

    DefaultListenableFuture是ListenableFuture和SettableFuture的默认实现,其继承了com.google.common.util.concurrent.AbstractFuture,所以本质是Guava库中的AbstractFuture帮忙实现了该功能

    1.4 Futures辅助类

    Futures辅助类用于创建DefaultListenableFuture和DefaultMapFuture

    public class Futures {
        public Futures() {
        }
    
        public static <T, R> ListenableFuture<R> map(ListenableFuture<T> source, Func1<T, R> func) {
            return new DefaultMapFuture(MoreExecutors.directExecutor(), source, func);
        }
    
        public static <T, R> ListenableFuture<R> map(ListenableFuture<T> source, Func1<T, R> func, ListenerWrapper listenerWrapper) {
            return new DefaultMapFuture(MoreExecutors.directExecutor(), source, func, listenerWrapper);
        }
    
        public static <T> DefaultListenableFuture<T> createSettableFuture() {
            return new DefaultListenableFuture(MoreExecutors.directExecutor());
        }
    
        public static <T> DefaultListenableFuture<T> createSettableFuture(Executor executor) {
            return new DefaultListenableFuture(executor);
        }
    }
    

    1.5 AbstractListener

    AbstractListener对回调监听事件进行了封装,可以获取到结果值

    public abstract class AbstractListener<T> implements Runnable {
        protected ListenableFuture<T> rpcFuture;
    
        public AbstractListener(ListenableFuture<T> rpcFuture) {
            this.rpcFuture = rpcFuture;
        }
    
        public void run() {
            try {
                T rpcResult = this.rpcFuture.get();
                this.operationComplete(rpcResult);
            } catch (Throwable var2) {
                this.onThrowable(var2);
            }
    
        }
    
        protected abstract void operationComplete(T var1);
    
        protected abstract void onThrowable(Throwable var1);
    }
    

    示例:

        @Test
        public void test2() throws Exception {
            DefaultListenableFuture<String> future= Futures.createSettableFuture();
            future.addListener(new AbstractListener<String>(future) {
                @Override
                protected void operationComplete(String s) {
                    System.out.println(s);
                }
                @Override
                protected void onThrowable(Throwable throwable) {
                }
            });
            future.set("hello");
            System.in.read();
        }
    

    另外还可以使用Guava中com.google.common.util.concurrent.Futures的addCallback方法,效果是相同的

            com.google.common.util.concurrent.Futures.addCallback(future, 
    new FutureCallback<String>() {
                public void onSuccess(String s) {
                    System.out.println(s);
                }
    
                public void onFailure(Throwable throwable) {
    
                }
            });
    

    2.DefaultMapFuture

    DefaultMapFuture添加了一个代理ListenableFuture和类型转换器

    示例代码如下:

    public class DefaultMapFutureTest {
    
        @Test
        public void test1() throws Exception {
            DefaultListenableFuture<String> source=Futures.createSettableFuture();
            ListenableFuture<Integer> future=Futures.map(source, new Func1<String, Integer>() {
                        public Integer call(String s) {
                            System.out.println("call:"+s);
                            return Integer.valueOf(s);
                        }
                    });
    
            future.addListener(new AbstractListener<Integer>(future) {
                @Override
                protected void operationComplete(Integer integer) {
                    System.out.println("operationComplete:"+integer);
                }
    
                @Override
                protected void onThrowable(Throwable throwable) {
                }
            });
            source.set("10");
            System.in.read();
        }
    
    }
    

    执行流程:

    1. source调用set方法
    2. 触发source回调监听事件
    3. 回调监听事件调用构造函数中传入的Func1函数,转换数据类型
    4. 将转换好的数据结果调用future的set方法
      5.触发future回调监听事件

    2.1 WrappedListener

    DefaultMapFuture构造函数还可以传入WrappedListener参数

    WrappedListener

    相关文章

      网友评论

          本文标题:hsf笔记-ListenableFuture

          本文链接:https://www.haomeiwen.com/subject/uxctiftx.html