dubbo系列代码地址
https://github.com/AslanNH/DubboLearn.git
dubbo的消费端异步调用即指在消费者端,通过异步的方式调用服务端接口,服务端不用任何异步处理的操作。
如果采用这种方式实现异步调用,需要在订阅服务的时候,配置async = true 这个配置
1.Future
使用RpcContext中提供的异步API,直接获取Future对象来得到返回值
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(new ApplicationConfig("first-dubbo-async-consumer"));
RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181");
referenceConfig.setRegistry(registryConfig);
referenceConfig.setInterface(GreetingService.class);
referenceConfig.setVersion("1.0.0");
referenceConfig.setGroup("dubbo");
//异步调用需要设置超时是时间,(默认超时时间1s)不然服务处理时间过长,消费者将断去链接,导致future.get()报错
referenceConfig.setTimeout(5000);
//此处开启消费端异步调用
referenceConfig.setAsync(true);
//此时服务端处理时间较长(可通过sleep实现)
//因为开启了异步调用,此处消费端不会等待,直接返回结果为null
GreetingService greetingService = referenceConfig.get();
System.out.println(greetingService.sayHello(" world"));
Future<String> future = RpcContext.getContext().getFuture();
//调用get会阻塞
System.out.println(future.get(5, TimeUnit.SECONDS));
虽然可以实现异步调用,但是在future.get()的时候还是回一直等待,类似于一个join的操作。
2.FutureAdapter
使用FutureAdapter来设置异步的回调函数,则当前消费端现成不会等待,当服务端处理完成之后,
则会通过回调函数来返回值
其中done则为成功的返回值 caught为异常
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(new ApplicationConfig("first-dubbo-async-callback-consumer"));
RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181");
referenceConfig.setRegistry(registryConfig);
referenceConfig.setInterface(GreetingService.class);
referenceConfig.setVersion("1.0.0");
referenceConfig.setGroup("dubbo");
//异步调用需要设置超时是时间,(默认超时时间1s)不然服务处理时间过长,消费者将断去链接,导致future.get()报错
referenceConfig.setTimeout(5000);
referenceConfig.setAsync(true);
GreetingService greetingService = referenceConfig.get();
System.out.println(greetingService.sayHello(" world"));
((FutureAdapter)RpcContext.getContext().getFuture()).getFuture().setCallback(new ResponseCallback(){
@Override
public void done(Object o) {
System.out.println("result: " + o);
}
@Override
public void caught(Throwable throwable) {
System.out.println("error: " + throwable.getLocalizedMessage() );
}
});
System.out.println("over");
Thread.currentThread().join();//为了等待返回结果,不然当前现成回关闭
3.CompletableFuture
CompletableFuture是JUC包下的一个异步对象。dubbo借用此对象实现更优雅的消费端异步调用
ReferenceConfig<GreetingService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(new ApplicationConfig("first-dubbo-async-callback-consumer"));
RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181");
referenceConfig.setRegistry(registryConfig);
referenceConfig.setInterface(GreetingService.class);
referenceConfig.setVersion("1.0.0");
referenceConfig.setGroup("dubbo");
//异步调用需要设置超时是时间,(默认超时时间1s)不然服务处理时间过长,消费者将断去链接,导致future.get()报错
referenceConfig.setTimeout(5000);
referenceConfig.setAsync(true);
GreetingService greetingService = referenceConfig.get();
System.out.println(greetingService.sayHello(" world"));
CompletableFuture completableFuture = RpcContext.getContext().getCompletableFuture();
completableFuture.whenComplete((v, t) ->{
if(t != null) ((Throwable)t).printStackTrace();
else System.out.println(v);
});
System.out.println("over");
Thread.currentThread().join();//为了等待返回结果
CompletableFuture 也是采用FutureAdapter的设置回调的方法,但是CF更加的优雅实现。
网友评论