ActiveLimitFilter也是用来做并发控制的,区别在于ExecuteLimitFilter作用于服务端,而ActiveLimitFilter作用于客户端。
看下官网的例子
<dubbo:service interface="com.foo.BarService" actives="10" />
即从客户端方面限制了服务最多有10个并发
接下来看下ActiveLimitFilter的invoke方法
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
// 获取配置的数量
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
// 获取当前接口调用统计
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (max > 0) {
//获取接口超时时间
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
int active = count.getActive();// 获取并发数
if (active >= max) {// 如果大于最大数量
synchronized (count) {
while ((active = count.getActive()) >= max) {
try {
// 挂起当前线程并释放锁,因为并发数已超过限制
count.wait(remain);
} catch (InterruptedException e) {
}
// 通过notify唤醒了,计算挂起的时间
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {// 如果已经超过超时时间
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + active
+ ". max concurrent invoke limit: " + max);
}
}
}
}
}
try {
long begin = System.currentTimeMillis();
// 增加该方法的数量
RpcStatus.beginCount(url, methodName);
try {
// 调用
Result result = invoker.invoke(invocation);
// 减少数量
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
return result;
} catch (RuntimeException t) {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
throw t;
}
} finally {
if(max>0){// 调用完成后调用notify唤醒在等待的线程
synchronized (count) {
count.notify();
}
}
}
}
整个逻辑也是比较简单的,和ExecuteLimitFilter不太一样的是,ExecuteLimitFilter在超过限制的数量后会直接返回,而ActiveLimitFilter会等待,直到超时。
网友评论