加载原理
dubbo的过滤器整体都是采用SPI的方式进行加载的
- 首先通过SPI加载dubbo加载策略
private static LoadingStrategy[] loadLoadingStrategies() {
return stream(load(LoadingStrategy.class).spliterator(), false)
.sorted()
.toArray(LoadingStrategy[]::new);
}
默认有三种策略,从以下三个目录中读取,优先级依次递减
- META-INF/dubbo/internal/
- META-INF/dubbo/
- META-INF/services/
- 加载扩展类
这里会遍历上面的加载策略,通过拼接策略目录+类的类型名(全路径名),找到对应的文件,遍历文件内容,根据文件里的key-value加载文件里对应的类
private Map<String, Class<?>> loadExtensionClasses() {
cacheDefaultExtensionName();
Map<String, Class<?>> extensionClasses = new HashMap<>();
for (LoadingStrategy strategy : strategies) {
loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
}
return extensionClasses;
}
令牌验证
通过令牌验证在注册中心控制权限,以决定要不要下发令牌给消费者,可以防止消费者绕过注册中心访问提供者
全局方式
<!--随机token令牌,使用UUID生成-->
<dubbo:provider interface="com.foo.BarService" token="true" />
<!--固定token令牌,相当于密码-->
<dubbo:provider interface="com.foo.BarService" token="123456" />
服务级别
<dubbo:service interface="com.foo.BarService" token="true" />
<dubbo:service interface="com.foo.BarService" token="123456" />
协议级别
dubbo:protocol name="dubbo" token="true" />
<dubbo:protocol name="dubbo" token="123456" />
服务端实现
通过隐式传参来获取token进行判断
String token = invoker.getUrl().getParameter(TOKEN_KEY);
if (ConfigUtils.isNotEmpty(token)) {
Class<?> serviceType = invoker.getInterface();
Map<String, Object> attachments = inv.getObjectAttachments();
String remoteToken = (attachments == null ? null : (String) attachments.get(TOKEN_KEY));
if (!token.equals(remoteToken)) {
throw new RpcException("Invalid token! Forbid invoke remote service " + serviceType + " method " + inv.getMethodName() + "() from consumer " + RpcContext.getContext().getRemoteHost() + " to provider " + RpcContext.getContext().getLocalHost());
}
}
消费者
对于消费者,如果通过注册中心来访问,并不需要自己指定token
但是如果是直连方式,默认是没有token的,当然是可以通过手动指定隐式传参来做规避这个问题
生产者限流
默认实现类:ExecuteLimitFilter,配置项:executes,默认0表示不进行限流
实现原理
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
URL url = invoker.getUrl();
String methodName = invocation.getMethodName();
int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);
if (!RpcStatus.beginCount(url, methodName, max)) {
throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
"Failed to invoke method " + invocation.getMethodName() + " in provider " +
url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
"\" /> limited.");
}
invocation.put(EXECUTE_LIMIT_FILTER_START_TIME, System.currentTimeMillis());
try {
return invoker.invoke(invocation);
} catch (Throwable t) {
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else {
throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
}
}
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), true);
}
@Override
public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
if (t instanceof RpcException) {
RpcException rpcException = (RpcException) t;
if (rpcException.isLimitExceed()) {
return;
}
}
RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), false);
}
可以看到,这里主要是借助RpcStatus的静态方法来实现,限流最大数通过url直接获取
RpcStatus的内部逻辑
- 这里对于每个url都有一个RpcStatus对象,而每个url状态对象里,又通过methodName关联了一个RpcStatus(通过ConcurrentHashMap)。限流针对方法进行处理
- active字段为原子变量,记录当前的数量
private final AtomicInteger active = new AtomicInteger();
public static boolean beginCount(URL url, String methodName, int max) {
max = (max <= 0) ? Integer.MAX_VALUE : max;
RpcStatus appStatus = getStatus(url);
RpcStatus methodStatus = getStatus(url, methodName);
if (methodStatus.active.get() == Integer.MAX_VALUE) {
return false;
}
for (int i; ; ) {
i = methodStatus.active.get();
if (i + 1 > max) {
return false;
}
if (methodStatus.active.compareAndSet(i, i + 1)) {
break;
}
}
appStatus.active.incrementAndGet();
return true;
}
- 结束时,调用endCount方法进行处理,更新统计字段
public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
endCount(getStatus(url), elapsed, succeeded);
endCount(getStatus(url, methodName), elapsed, succeeded);
}
private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
status.active.decrementAndGet();
status.total.incrementAndGet();
status.totalElapsed.addAndGet(elapsed);
if (status.maxElapsed.get() < elapsed) {
status.maxElapsed.set(elapsed);
}
if (succeeded) {
if (status.succeededMaxElapsed.get() < elapsed) {
status.succeededMaxElapsed.set(elapsed);
}
} else {
status.failed.incrementAndGet();
status.failedElapsed.addAndGet(elapsed);
if (status.failedMaxElapsed.get() < elapsed) {
status.failedMaxElapsed.set(elapsed);
}
}
}
自定义实现日志追踪
- 编写dubbo过滤器
@Activate(group = CommonConstants.CONSUMER)
public class DubboTraceFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
final String traceId = MDC.get(Constant.TRACE_ID);
RpcContext.getContext().setAttachment(Constant.TRACE_ID, traceId);
return invoker.invoke(invocation);
}
}
- resources目录下,建立META-INF/dubbo目录
- 新增文件:com.alibaba.dubbo.rpc.Filter
- 文件里配置:
trace=cn.gw.server.filter.DubboTraceFilter
网友评论