美文网首页
hsf笔记-ListenableFuture

hsf笔记-ListenableFuture

作者: 兴浩 | 来源:发表于2018-08-17 16:08 被阅读30次

1.Future扩展

ListenableFuture和SettableFuture类似于Netty的Promise接口

1.1 ListenableFuture

ListenableFuture是Guava库里面的概念,其扩展Future,添加了回调监听事件

public interface ListenableFuture<V> extends Future<V> {
    void addListener(Runnable var1);

    void addListener(Runnable var1, Executor var2);
}

1.2 SettableFuture

public interface SettableFuture<V> extends ListenableFuture<V> {
    boolean set(V var1);

    boolean setException(Throwable var1);
}

尝试设置值来手动触发回调监听事件

1.3 测试代码

    @Test
    public void test1() throws Exception {
        final DefaultListenableFuture<String> future= Futures.createSettableFuture();

        future.addListener(new Runnable() {
            public void run() {
                try {
                    System.out.println(future.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        });

        future.set("hello");
        
        System.in.read();
    }

测试结果

hello

DefaultListenableFuture是ListenableFuture和SettableFuture的默认实现,其继承了com.google.common.util.concurrent.AbstractFuture,所以本质是Guava库中的AbstractFuture帮忙实现了该功能

1.4 Futures辅助类

Futures辅助类用于创建DefaultListenableFuture和DefaultMapFuture

public class Futures {
    public Futures() {
    }

    public static <T, R> ListenableFuture<R> map(ListenableFuture<T> source, Func1<T, R> func) {
        return new DefaultMapFuture(MoreExecutors.directExecutor(), source, func);
    }

    public static <T, R> ListenableFuture<R> map(ListenableFuture<T> source, Func1<T, R> func, ListenerWrapper listenerWrapper) {
        return new DefaultMapFuture(MoreExecutors.directExecutor(), source, func, listenerWrapper);
    }

    public static <T> DefaultListenableFuture<T> createSettableFuture() {
        return new DefaultListenableFuture(MoreExecutors.directExecutor());
    }

    public static <T> DefaultListenableFuture<T> createSettableFuture(Executor executor) {
        return new DefaultListenableFuture(executor);
    }
}

1.5 AbstractListener

AbstractListener对回调监听事件进行了封装,可以获取到结果值

public abstract class AbstractListener<T> implements Runnable {
    protected ListenableFuture<T> rpcFuture;

    public AbstractListener(ListenableFuture<T> rpcFuture) {
        this.rpcFuture = rpcFuture;
    }

    public void run() {
        try {
            T rpcResult = this.rpcFuture.get();
            this.operationComplete(rpcResult);
        } catch (Throwable var2) {
            this.onThrowable(var2);
        }

    }

    protected abstract void operationComplete(T var1);

    protected abstract void onThrowable(Throwable var1);
}

示例:

    @Test
    public void test2() throws Exception {
        DefaultListenableFuture<String> future= Futures.createSettableFuture();
        future.addListener(new AbstractListener<String>(future) {
            @Override
            protected void operationComplete(String s) {
                System.out.println(s);
            }
            @Override
            protected void onThrowable(Throwable throwable) {
            }
        });
        future.set("hello");
        System.in.read();
    }

另外还可以使用Guava中com.google.common.util.concurrent.Futures的addCallback方法,效果是相同的

        com.google.common.util.concurrent.Futures.addCallback(future, 
new FutureCallback<String>() {
            public void onSuccess(String s) {
                System.out.println(s);
            }

            public void onFailure(Throwable throwable) {

            }
        });

2.DefaultMapFuture

DefaultMapFuture添加了一个代理ListenableFuture和类型转换器

示例代码如下:

public class DefaultMapFutureTest {

    @Test
    public void test1() throws Exception {
        DefaultListenableFuture<String> source=Futures.createSettableFuture();
        ListenableFuture<Integer> future=Futures.map(source, new Func1<String, Integer>() {
                    public Integer call(String s) {
                        System.out.println("call:"+s);
                        return Integer.valueOf(s);
                    }
                });

        future.addListener(new AbstractListener<Integer>(future) {
            @Override
            protected void operationComplete(Integer integer) {
                System.out.println("operationComplete:"+integer);
            }

            @Override
            protected void onThrowable(Throwable throwable) {
            }
        });
        source.set("10");
        System.in.read();
    }

}

执行流程:

  1. source调用set方法
  2. 触发source回调监听事件
  3. 回调监听事件调用构造函数中传入的Func1函数,转换数据类型
  4. 将转换好的数据结果调用future的set方法
    5.触发future回调监听事件

2.1 WrappedListener

DefaultMapFuture构造函数还可以传入WrappedListener参数

WrappedListener

相关文章

网友评论

      本文标题:hsf笔记-ListenableFuture

      本文链接:https://www.haomeiwen.com/subject/uxctiftx.html