消费端控制并发个数
image.png此设置针对GreetingService接口中的所有方法,每个方法最多同时并发请求10个。
image.png
针对于特定的方法设置
image.png在服务消费端,具体进行并发控制的是ProtocolFilterWrapper类创建的Filter链中的ActivieLimitFilter()方法
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
if (!count.beginCount(url, methodName, max)) {
long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
long start = System.currentTimeMillis();
long remain = timeout;
synchronized (count) {
while (!count.beginCount(url, methodName, max)) {
try {
count.wait(remain);
} catch (InterruptedException e) {
// ignore
}
long elapsed = System.currentTimeMillis() - start;
remain = timeout - elapsed;
if (remain <= 0) {
throw new RpcException("Waiting concurrent invoke timeout in client-side for service: "
+ invoker.getInterface().getName() + ", method: "
+ invocation.getMethodName() + ", elapsed: " + elapsed
+ ", timeout: " + timeout + ". concurrent invokes: " + count.getActive()
+ ". max concurrent invoke limit: " + max);
}
}
}
}
boolean isSuccess = true;
long begin = System.currentTimeMillis();
try {
return invoker.invoke(invocation);
} catch (RuntimeException t) {
isSuccess = false;
throw t;
} finally {
count.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
if (max > 0) {
synchronized (count) {
count.notifyAll();
}
}
}
}
超出的请求将会在超时后抛出异常并被丢弃。
服务端控制并发个数
image.png具体进行并发控制的是ProtocolFilterWrapper类创建的Filter链中的ExecuteLimitFilter()
image.png
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);
if (!RpcStatus.beginCount(url, methodName, max)) {
throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " +
url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
"\" /> limited.");
}
long begin = System.currentTimeMillis();
boolean isSuccess = true;
try {
return invoker.invoke(invocation);
} catch (Throwable t) {
isSuccess = false;
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
} finally {
RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess);
}
}
与消费端类似,通过RpcStatus来统计接口请求次数并作出判断。
网友评论