一、Dubbo中的线程池
提供了三种线程池的实现:
- fixed:固定大小的线程池,启动时建立,并且不会关闭,这也是缺省的设置。
- cached:缓存线程池,空闲一分钟自动删除,需要时重建。
- limited:可伸缩线程池,但是线程池中的线程数只会增长不会收缩,这样做的目的是为了避免当进行收缩时流量突然增加造成性能问题。

二、解决线程池耗尽
@DubboService
@Component
public class AsyncOrderFacadeImpl implements AsyncOrderFacade {
@Override
public OrderInfo queryOrderById(String id) {
// 这里模拟执行一段耗时的业务逻辑
sleepInner(5000);
OrderInfo resultInfo = new OrderInfo(
"GeekDubbo",
"服务方异步方式之RpcContext.startAsync#" + id,
new BigDecimal(129));
return resultInfo;
}
}
如果 queryOrderById 这个方法的调用量上来了,很容易导致 Dubbo 线程池耗尽。因为 Dubbo 线程池总数默认是固定的,200 个,假设系统在单位时间内可以处理 500 个请求,一旦 queryOrderById 的请求流量上来了,极端情况下,可能会出现 200 个线程都在处理这个耗时的任务,那么剩下的 300 个请求,即使是不耗时的功能,也很难有机会拿到线程资源。所以,紧接着就导致 Dubbo 线程池耗尽了。
解决:
为了让这种耗时的请求尽量不占用公共的线程池资源,这段代码优化成异步形式。核心问题是异步化的时候需要拿到返回结果。
选择线程池的实现,如果能有一个存储媒介来存储异步化的结果,然后把存储媒介中的数据取出来返回回去。
在 queryOrderById 方法中开始异步化分支处理,紧接着在异步化分支中得到异步结果,然后把异步结果存储到某个地方,最后再取出这个异步结果并返回。
处理请求时共同的必经之路:
寻找一种可以拦截所有方法的流程机制,在拦截处拿到异步结果并返回:

因为 queryOrderById 走了异步分支,可能导致最终什么也没拦截到,所以要让拦截处想办法感知到 queryOrderById 内部实现是否走了异步处理,从代码层面上,引入一个变量,让拦截处一旦感知到业务接口(比如这里的 queryOrderById)开启了异步化模式处理,就可以理所当然地直接从存储异步结果的地方,把结果取回并返回。
采用上下文对象来存储,异步化的结果存储在上下文对象中。首先拦截识别异步,当拦截处发现有异步化模式的变量,从上下文对象中取出异步化结果并返回。
使用java.util.concurrent.Future
:
java.util.concurrent.CompletableFuture
#get(long timeout, TimeUnit unit)
方法支持传入超时时间。
@DubboService
@Component
public class AsyncOrderFacadeImpl implements AsyncOrderFacade {
@Override
public OrderInfo queryOrderById(String id) {
// 创建线程池对象
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 开启异步化操作模式,标识异步化模式开始
AsyncContext asyncContext = RpcContext.startAsync();
// 利用线程池来处理 queryOrderById 的核心业务逻辑
cachedThreadPool.execute(new Runnable() {
@Override
public void run() {
// 将 queryOrderById 所在线程的上下文信息同步到该子线程中
asyncContext.signalContextSwitch();
// 这里模拟执行一段耗时的业务逻辑
sleepInner(5000);
OrderInfo resultInfo = new OrderInfo(
"GeekDubbo",
"服务方异步方式之RpcContext.startAsync#" + id,
new BigDecimal(129));
System.out.println(resultInfo);
// 利用 asyncContext 将 resultInfo 返回回去
asyncContext.write(resultInfo);
}
});
return null;
}
}
在 queryOrderById 中创建了一个线程池,然后把 Runnable 内部类放到线程池中去执行,并且存在一个上下文信息的传递动作,最后在 Runnable 实现类中,将异步结果写入到上下文对象。
- 定义线程池对象,通过
RpcContext.startAsync
方法开启异步模式; - 在异步线程中通过
asyncContext.signalContextSwitch
同步父线程的上下文信息; - 在异步线程中将异步结果通过
asyncContext.write
写入到异步线程的上下文信息中。
三、Dubbo 异步实现原理
1.定义线程池对象
在 Dubbo 中 RpcContext.startAsync
方法意味着异步模式的开启:

通过 CAS 原子性的方式创建了一个 java.util.concurrent.CompletableFuture 对象,这个对象就存储在当前的上下文 org.apache.dubbo.rpc.RpcContextAttachment 对象中。
2.在异步线程中同步父线程的上下文信息

用
asyncContext.signalContextSwitch
同步不同线程间的信息,也就是信息的拷贝,只不过这个拷贝需要利用到异步模式开启之后的返回对象 asyncContext。因为 asyncContext 富含上下文信息,只需要把这个所谓的 asyncContext 对象传入到子线程中,然后将 asyncContext 中的上下文信息充分拷贝到子线程中,这样,子线程处理所需要的任何信息就不会因为开启了异步化处理而缺失。
3.在异步线程中,将异步结果写入到异步线程的上下文信息中
// org.apache.dubbo.rpc.AsyncContextImpl#write
public void write(Object value) {
if (isAsyncStarted() && stop()) {
if (value instanceof Throwable) {
Throwable bizExe = (Throwable) value;
future.completeExceptionally(bizExe);
} else {
future.complete(value);
}
} else {
throw new IllegalStateException("The async response has probably been wrote back by another thread, or the asyncContext has been closed.");
}
}
Dubbo 用 asyncContext.write
写入异步结果,通过 write 方法的查看,最终的异步化结果是存入了 java.util.concurrent.CompletableFuture
对象中,这样拦截处只需要调用 java.util.concurrent.CompletableFuture#get(long timeout, TimeUnit unit)
方法就可以很轻松地拿到异步化结果了。
四、异步应用场景
定义了线程池,可能会认为定义线程池的目的就是为了异步化操作,其实不是,因为异步化的操作会使 queryOrderById 方法立马返回,也就是说,异步化耗时的操作并没有在 queryOrderById 方法所在线程中继续占用资源,而是在新开辟的线程池中占用资源。
可以采用异步的场景:
- 一些 IO 耗时的操作,比较影响客户体验和使用性能
- 某段业务逻辑开启异步执行后不太影响主线程的原有业务逻辑。
因为 queryOrderById 开启异步操作后就立马返回了,queryOrderById 所在的线程和异步线程没有太多瓜葛,异步线程的完成与否,不太影响 queryOrderById 的返回操作。 - 时序上没有严格要求的业务逻辑
在 queryOrderById 这段简单的逻辑中,只开启了一个异步化的操作,站在时序的角度上看,queryOrderById 方法返回了,但是异步化的逻辑还在慢慢执行着,完全对时序的先后顺序没有严格要求。
极客时间《Dubbo 源码剖析与实战》学习笔记Day18 - http://gk.link/a/11VBp
网友评论