美文网首页
dubbo过滤器

dubbo过滤器

作者: SparkOnly | 来源:发表于2021-03-02 23:38 被阅读0次

    加载原理

    dubbo的过滤器整体都是采用SPI的方式进行加载的

    1. 首先通过SPI加载dubbo加载策略
    private static LoadingStrategy[] loadLoadingStrategies() {
            return stream(load(LoadingStrategy.class).spliterator(), false)
                    .sorted()
                    .toArray(LoadingStrategy[]::new);
        }
    

    默认有三种策略,从以下三个目录中读取,优先级依次递减

    • META-INF/dubbo/internal/
    • META-INF/dubbo/
    • META-INF/services/
    1. 加载扩展类
      这里会遍历上面的加载策略,通过拼接策略目录+类的类型名(全路径名),找到对应的文件,遍历文件内容,根据文件里的key-value加载文件里对应的类
    private Map<String, Class<?>> loadExtensionClasses() {
        cacheDefaultExtensionName();
    
        Map<String, Class<?>> extensionClasses = new HashMap<>();
    
        for (LoadingStrategy strategy : strategies) {
            loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
            loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
        }
    
        return extensionClasses;
    }
    

    令牌验证

    通过令牌验证在注册中心控制权限,以决定要不要下发令牌给消费者,可以防止消费者绕过注册中心访问提供者

    全局方式

    <!--随机token令牌,使用UUID生成-->
    <dubbo:provider interface="com.foo.BarService" token="true" />
    <!--固定token令牌,相当于密码-->
    <dubbo:provider interface="com.foo.BarService" token="123456" />
    

    服务级别

    <dubbo:service interface="com.foo.BarService" token="true" />
    <dubbo:service interface="com.foo.BarService" token="123456" />
    

    协议级别

    dubbo:protocol name="dubbo" token="true" />
    <dubbo:protocol name="dubbo" token="123456" />
    

    服务端实现

    通过隐式传参来获取token进行判断

    String token = invoker.getUrl().getParameter(TOKEN_KEY);
    if (ConfigUtils.isNotEmpty(token)) {
      Class<?> serviceType = invoker.getInterface();
      Map<String, Object> attachments = inv.getObjectAttachments();
      String remoteToken = (attachments == null ? null : (String) attachments.get(TOKEN_KEY));
      if (!token.equals(remoteToken)) {
        throw new RpcException("Invalid token! Forbid invoke remote service " + serviceType + " method " + inv.getMethodName() + "() from consumer " + RpcContext.getContext().getRemoteHost() + " to provider " + RpcContext.getContext().getLocalHost());
      }
    }
    

    消费者

    对于消费者,如果通过注册中心来访问,并不需要自己指定token
    但是如果是直连方式,默认是没有token的,当然是可以通过手动指定隐式传参来做规避这个问题

    生产者限流

    默认实现类:ExecuteLimitFilter,配置项:executes,默认0表示不进行限流

    实现原理

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);
        if (!RpcStatus.beginCount(url, methodName, max)) {
            throw new RpcException(RpcException.LIMIT_EXCEEDED_EXCEPTION,
                    "Failed to invoke method " + invocation.getMethodName() + " in provider " +
                            url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
                            "\" /> limited.");
        }
    
        invocation.put(EXECUTE_LIMIT_FILTER_START_TIME, System.currentTimeMillis());
        try {
            return invoker.invoke(invocation);
        } catch (Throwable t) {
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        }
    }
    @Override
    public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
        RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), true);
    }
    
    @Override
    public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
        if (t instanceof RpcException) {
            RpcException rpcException = (RpcException) t;
            if (rpcException.isLimitExceed()) {
                return;
            }
        }
        RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), false);
    }
    

    可以看到,这里主要是借助RpcStatus的静态方法来实现,限流最大数通过url直接获取

    RpcStatus的内部逻辑

    1. 这里对于每个url都有一个RpcStatus对象,而每个url状态对象里,又通过methodName关联了一个RpcStatus(通过ConcurrentHashMap)。限流针对方法进行处理
    2. active字段为原子变量,记录当前的数量
    private final AtomicInteger active = new AtomicInteger();
    
    public static boolean beginCount(URL url, String methodName, int max) {
        max = (max <= 0) ? Integer.MAX_VALUE : max;
        RpcStatus appStatus = getStatus(url);
        RpcStatus methodStatus = getStatus(url, methodName);
        if (methodStatus.active.get() == Integer.MAX_VALUE) {
            return false;
        }
        for (int i; ; ) {
            i = methodStatus.active.get();
            if (i + 1 > max) {
                return false;
            }
            if (methodStatus.active.compareAndSet(i, i + 1)) {
                break;
            }
        }
        appStatus.active.incrementAndGet();
        return true;
    }
    
    1. 结束时,调用endCount方法进行处理,更新统计字段
    public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
        endCount(getStatus(url), elapsed, succeeded);
        endCount(getStatus(url, methodName), elapsed, succeeded);
    }
    
    private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
        status.active.decrementAndGet();
        status.total.incrementAndGet();
        status.totalElapsed.addAndGet(elapsed);
        if (status.maxElapsed.get() < elapsed) {
            status.maxElapsed.set(elapsed);
        }
        if (succeeded) {
            if (status.succeededMaxElapsed.get() < elapsed) {
                status.succeededMaxElapsed.set(elapsed);
            }
        } else {
            status.failed.incrementAndGet();
            status.failedElapsed.addAndGet(elapsed);
            if (status.failedMaxElapsed.get() < elapsed) {
                status.failedMaxElapsed.set(elapsed);
            }
        }
    }
    

    自定义实现日志追踪

    1. 编写dubbo过滤器
    @Activate(group = CommonConstants.CONSUMER)
    public class DubboTraceFilter implements Filter {
        @Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            final String traceId = MDC.get(Constant.TRACE_ID);
            RpcContext.getContext().setAttachment(Constant.TRACE_ID, traceId);
            return invoker.invoke(invocation);
        }
    }
    
    1. resources目录下,建立META-INF/dubbo目录
    2. 新增文件:com.alibaba.dubbo.rpc.Filter
    3. 文件里配置:
    trace=cn.gw.server.filter.DubboTraceFilter
    

    相关文章

      网友评论

          本文标题:dubbo过滤器

          本文链接:https://www.haomeiwen.com/subject/fodtqltx.html