服务端异步是指在调用端调用的时候,服务端采用异步的方式来进行操作
1.CompleteFuture
服务端代码
private final ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES,
new SynchronousQueue(), new NamedThreadFactory("biz-thread-pool"), new ThreadPoolExecutor.CallerRunsPolicy());
@Override
public CompletableFuture<String> sayHello(String name) {
RpcContext context = RpcContext.getContext();
//通过supplyAsunc开启异步执行
return CompletableFuture.supplyAsync(()-> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("async return ");
return "Hello " + name + " " + context.getAttachment("company");
}, bizThreadPool);
}
消费端代码
此时消费端不需要开启异步调用,
ReferenceConfig<AsyncGreetingService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(new ApplicationConfig("first-dubbo-consumer"));
RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181");
referenceConfig.setRegistry(registryConfig);
referenceConfig.setInterface(AsyncGreetingService.class);
referenceConfig.setTimeout(5000);
/* referenceConfig.setLoadbalance("myLoadBalance");
referenceConfig.setCluster("myBoradcast");*/
referenceConfig.setVersion("1.0.0");
referenceConfig.setGroup("dubbo");
AsyncGreetingService greetingService = referenceConfig.get();
//设置隐式参数
RpcContext.getContext().setAttachment("company","alibaba");
CompletableFuture<String> future = greetingService.sayHello("world");
future.whenComplete((v,t)->{
if(t != null) ((Throwable)t).printStackTrace();
else System.out.println(v);
});
System.out.println("over");
//over将先打印出来,程序继续向下进行,当服务端运行完成后,回调函数触发
2.AsyncContext
该类是dubbo包下的一个类,有兴趣可以学下servlet包下的同名类
服务端
private final ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(8, 16, 1, TimeUnit.MINUTES,
new SynchronousQueue(), new NamedThreadFactory("biz-thread-pool"), new ThreadPoolExecutor.CallerRunsPolicy());
@Override
public String sayHello(String name) {
final AsyncContext asyncContext = RpcContext.startAsync();
bizThreadPool.execute(()->{
asyncContext.signalContextSwitch();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
asyncContext.write("Hello " + name +" "+ RpcContext.getContext().getAttachment("company"));
});
return null;
}
消费端
ReferenceConfig<GreetingServiceRpcContext> referenceConfig = new ReferenceConfig<>();
referenceConfig.setApplication(new ApplicationConfig("first-dubbo-async-consumer"));
RegistryConfig registryConfig = new RegistryConfig("zookeeper://127.0.0.1:2181");
referenceConfig.setRegistry(registryConfig);
referenceConfig.setInterface(GreetingServiceRpcContext.class);
referenceConfig.setVersion("1.0.0");
referenceConfig.setGroup("dubbo");
//异步调用需要设置超时是时间,(默认超时时间1s)不然服务处理时间过长,消费者将断去链接,导致future.get()报错
referenceConfig.setTimeout(10000);
referenceConfig.setAsync(true);
GreetingServiceRpcContext greetingService = referenceConfig.get();
System.out.println(greetingService.sayHello(" world"));
Future<String> future = RpcContext.getContext().getFuture();
System.out.println(future.get());
System.out.println("over");
这种方法实现的服务端异步调用中,
消费端如果不设置消费端异步调用,采用直接调用的方式,则会一直阻塞,直到服务端真的处理完成
消费端如果设置了消费端异步调用,则可以采用future模式拿到返回值。
不管是消费端还是服务端的异步,目的都是为了让自己的程序继续下行,提升处理效率
网友评论