1.Future扩展
ListenableFuture和SettableFuture类似于Netty的Promise接口
1.1 ListenableFuture
ListenableFuture是Guava库里面的概念,其扩展Future,添加了回调监听事件
public interface ListenableFuture<V> extends Future<V> {
void addListener(Runnable var1);
void addListener(Runnable var1, Executor var2);
}
1.2 SettableFuture
public interface SettableFuture<V> extends ListenableFuture<V> {
boolean set(V var1);
boolean setException(Throwable var1);
}
尝试设置值来手动触发回调监听事件
1.3 测试代码
@Test
public void test1() throws Exception {
final DefaultListenableFuture<String> future= Futures.createSettableFuture();
future.addListener(new Runnable() {
public void run() {
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
});
future.set("hello");
System.in.read();
}
测试结果
hello
DefaultListenableFuture是ListenableFuture和SettableFuture的默认实现,其继承了com.google.common.util.concurrent.AbstractFuture,所以本质是Guava库中的AbstractFuture帮忙实现了该功能
1.4 Futures辅助类
Futures辅助类用于创建DefaultListenableFuture和DefaultMapFuture
public class Futures {
public Futures() {
}
public static <T, R> ListenableFuture<R> map(ListenableFuture<T> source, Func1<T, R> func) {
return new DefaultMapFuture(MoreExecutors.directExecutor(), source, func);
}
public static <T, R> ListenableFuture<R> map(ListenableFuture<T> source, Func1<T, R> func, ListenerWrapper listenerWrapper) {
return new DefaultMapFuture(MoreExecutors.directExecutor(), source, func, listenerWrapper);
}
public static <T> DefaultListenableFuture<T> createSettableFuture() {
return new DefaultListenableFuture(MoreExecutors.directExecutor());
}
public static <T> DefaultListenableFuture<T> createSettableFuture(Executor executor) {
return new DefaultListenableFuture(executor);
}
}
1.5 AbstractListener
AbstractListener对回调监听事件进行了封装,可以获取到结果值
public abstract class AbstractListener<T> implements Runnable {
protected ListenableFuture<T> rpcFuture;
public AbstractListener(ListenableFuture<T> rpcFuture) {
this.rpcFuture = rpcFuture;
}
public void run() {
try {
T rpcResult = this.rpcFuture.get();
this.operationComplete(rpcResult);
} catch (Throwable var2) {
this.onThrowable(var2);
}
}
protected abstract void operationComplete(T var1);
protected abstract void onThrowable(Throwable var1);
}
示例:
@Test
public void test2() throws Exception {
DefaultListenableFuture<String> future= Futures.createSettableFuture();
future.addListener(new AbstractListener<String>(future) {
@Override
protected void operationComplete(String s) {
System.out.println(s);
}
@Override
protected void onThrowable(Throwable throwable) {
}
});
future.set("hello");
System.in.read();
}
另外还可以使用Guava中com.google.common.util.concurrent.Futures的addCallback方法,效果是相同的
com.google.common.util.concurrent.Futures.addCallback(future,
new FutureCallback<String>() {
public void onSuccess(String s) {
System.out.println(s);
}
public void onFailure(Throwable throwable) {
}
});
2.DefaultMapFuture
DefaultMapFuture添加了一个代理ListenableFuture和类型转换器
示例代码如下:
public class DefaultMapFutureTest {
@Test
public void test1() throws Exception {
DefaultListenableFuture<String> source=Futures.createSettableFuture();
ListenableFuture<Integer> future=Futures.map(source, new Func1<String, Integer>() {
public Integer call(String s) {
System.out.println("call:"+s);
return Integer.valueOf(s);
}
});
future.addListener(new AbstractListener<Integer>(future) {
@Override
protected void operationComplete(Integer integer) {
System.out.println("operationComplete:"+integer);
}
@Override
protected void onThrowable(Throwable throwable) {
}
});
source.set("10");
System.in.read();
}
}
执行流程:
- source调用set方法
- 触发source回调监听事件
- 回调监听事件调用构造函数中传入的Func1函数,转换数据类型
- 将转换好的数据结果调用future的set方法
5.触发future回调监听事件
2.1 WrappedListener
DefaultMapFuture构造函数还可以传入WrappedListener参数
WrappedListener
网友评论