美文网首页
由dubbo的TpsLimitFilter限流,说说dubbo的

由dubbo的TpsLimitFilter限流,说说dubbo的

作者: 安迪猪 | 来源:发表于2018-12-20 21:26 被阅读0次

    Dubbo提供了过程拦截(即Filter)功能。dubbo的大多数功能都基于此功能实现。在dubbo的服务端,提供了一个限流Filter(TpsLimitFilter),用于在服务端控制单位时间内(默认是60s)的调用数量tps。超过此数量,则服务端将会报错。

    一、TpsLimitFilter的使用

    # 1.1、TpsLimitFilter源码

    @Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
    public class TpsLimitFilter implements Filter {
    
        private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
    
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
    
            if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
                throw new RpcException(
                        new StringBuilder(64)
                                .append("Failed to invoke service ")
                                .append(invoker.getInterface().getName())
                                .append(".")
                                .append(invocation.getMethodName())
                                .append(" because exceed max service tps.")
                                .toString());
            }
    
            return invoker.invoke(invocation);
        }
    
    }
    
    public class DefaultTPSLimiter implements TPSLimiter {
    
        private final ConcurrentMap<String, StatItem> stats
            = new ConcurrentHashMap<String, StatItem>();
        
        public boolean isAllowable(URL url, Invocation invocation) {
            int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
            long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
                                             Constants.DEFAULT_TPS_LIMIT_INTERVAL);
            String serviceKey = url.getServiceKey();
            if (rate > 0) {
                StatItem statItem = stats.get(serviceKey);
                if (statItem == null) {
                    stats.putIfAbsent(serviceKey,
                                      new StatItem(serviceKey, rate, interval));
                    statItem = stats.get(serviceKey);
                }
                return statItem.isAllowable(url, invocation);
            } else {
                StatItem statItem = stats.get(serviceKey);
                if (statItem != null) {
                    stats.remove(serviceKey);
                }
            }
    
            return true;
        }
    
    }
    
    
    
    class StatItem {
    
        private String name;
    
        private long lastResetTime;
    
        private long interval;
    
        private AtomicInteger token;
    
        private int rate;
    
        StatItem(String name, int rate, long interval) {
            this.name = name;
            this.rate = rate;
            this.interval = interval;
            this.lastResetTime = System.currentTimeMillis();
            this.token = new AtomicInteger(rate);
        }
    
        public boolean isAllowable(URL url, Invocation invocation) {
            long now = System.currentTimeMillis();
            if (now > lastResetTime + interval) {
                token.set(rate);
                lastResetTime = now;
            }
    
            int value = token.get();
            boolean flag = false;
            while (value > 0 && !flag) {
                flag = token.compareAndSet(value, value - 1);
                value = token.get();
            }
    
            return flag;
        }
    
        long getLastResetTime() {
            return lastResetTime;
        }
        
        int getToken() {
            return token.get();
        }
        
        public String toString() {
            return new StringBuilder(32).append("StatItem ")
                .append("[name=").append(name).append(", ")
                .append("rate = ").append(rate).append(", ")
                .append("interval = ").append(interval).append("]")
                .toString();
        }
    
    }
    

    此限流过滤器的思想就是在规定的时间内(dubbo默认是60s),看请求数是否小于tps的数量。如果这一次的请求时间距离上一次统计的开始时间在60s内,那就计数,如果大于tps,就报错,如果这一次请求时间间隔已经大于60s,那么把此次的时间作为统计的开始时间。算法比较简单。

    1.2、如何使用此filter

    dubbo没有把这个TpsLimitFilter放入默认启动的filter中。下面是dubbo默认启动的filter类型

    echo=com.alibaba.dubbo.rpc.filter.EchoFilter
    generic=com.alibaba.dubbo.rpc.filter.GenericFilter
    genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter
    token=com.alibaba.dubbo.rpc.filter.TokenFilter
    accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter
    activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
    classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter
    context=com.alibaba.dubbo.rpc.filter.ContextFilter
    consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter
    exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter
    executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
    deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter
    compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter
    timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
    monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
    validation=com.alibaba.dubbo.validation.filter.ValidationFilter
    cache=com.alibaba.dubbo.cache.filter.CacheFilter
    trace=com.alibaba.dubbo.rpc.protocol.dubbo.filter.TraceFilter
    future=com.alibaba.dubbo.rpc.protocol.dubbo.filter.FutureFilter
    
    

    那么如何使用这个filter呢?

    1.3新建filter文件

    所以,我们要应用这个限流过滤器,需要在我们的resources目录下自己新建以filter接口为文件名的文件,如下:


    在这里插入图片描述

    里面的内容就是tps的类路径:
    tps=com.alibaba.dubbo.rpc.filter.TpsLimitFilter

    1.4 使用配置规则将tps写入注册中心的url。

    根据dubbo的user-book的说明,可以通过向注册中心写入配置规则,完成tps的限流操作。

        public static void setLimit(){
            //获得注册工程的spi扩展实例
            RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
            //根据url的zookeeper,确定是zookeeper注册中心,通过ip和端口号,连上zookeeper注册中心
            Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://192.168.25.128:2181"));
              //向注册中心写入配置规则
            registry.register(URL.valueOf("override://0.0.0.0/cn.andy.dubbo.DataService?tps=5&category=configurators"
                 ));
    
        
        }
    

    最后一句的/0.0.0.0表示对所有的ip都有效。(即我们的服务可能会放入很多的机器,那么这些机器都会执行tps规则),我们定义了tps=5次。
    这里的setLimit()方法可以在任意地方执行。为了方便,我在Main方法中,启动了服务端程序后执行这个方法。

        private static final Log log = LogFactory.getLog(DubboProviderMain.class);
    
        public static void main(String[] args) {
            try {
                ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-service.xml");
                context.start();
            //  setRouter();
                setLimit();
                
            } catch (Exception e) {
                log.error("== DubboProvider context start error:",e);
            }
            synchronized (DubboProviderMain.class) {
                while (true) {
                    try {
                        DubboProviderMain.class.wait();
                    } catch (InterruptedException e) {
                        log.error("== synchronized error:",e);
                    }
                }
            }
        }
        
    

    1.5 测试

    我们把这个jar打包,分别放入两个不同的虚拟机中,然后在客户端执行请求。结果表明,两个虚拟机在60s内各执行了5次后,就开始报错。这也从另一个方面验证了,TpsLimitFilter限流是针对单机的。
    (在测试时候,如果服务jar包,一个在虚拟机,另一个和web程序都在本地,那么申请都会发往本地,而不是随机在两个jar之间调用)

    二、由此带来的思考

    a、TpsLimitFilter为什么只能通过写入配置规则的方式使用,而不能直接在xml中直接写入?
    b、TpsLimitFilter有类上有@Activate注解,为什么不能像其他的内置filter一样,默认开启?

    2.1 @Activate注解

    Activate注解可以通过group和value配置激活条件,使得被Activate注解的扩展点实现在满足上述两个条件时候被激活使用,通常用于filter的激活。

    @Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
    public class TpsLimitFilter implements Filter{}
    

    又比如

    @Activate(group = Constants.PROVIDER, value = Constants.TOKEN_KEY)
    public class TokenFilter implements Filter {}
    

    又比如

    @Activate(group = Constants.PROVIDER, order = -110000)
    public class EchoFilter implements Filter {}
    

    对于注解中,没有value的情况,只需进行group匹配(在filter中,分为provider和consumer)。在由value的情况下,必须group匹配,而且value有值的情况下,才会启动。

    2.2 回答第一个问题,TpsLimitFilter为什么不能直接在xml中写入。

    在dubbo中,有些filter是可以直接在xml中直接写入的。比如令牌验证功能:
    在dubbo的xml配置中,加入token配置,则相当于启动了TokenFilter功能。

    <dubbo:service retries="0" interface="cn.andy.dubbo.DataService" ref="dataServiceImpl" timeout="60000" mock="true"  token="1234567" />
    

    但是,如果我们如上面实现1.2操作后,在xml中写入如下tps:

    <dubbo:service retries="0" interface="cn.andy.dubbo.DataService" ref="dataServiceImpl" timeout="60000" mock="false"  token="1234567"  filter="andyFilter"  tps="5"
                        />
    

    则会报错

    2018-11-29 18:32:16,582 ERROR [DubboProviderMain.java:39] : == DubboProvider context start error:
    org.springframework.beans.factory.parsing.BeanDefinitionParsingException: Configuration problem: Failed to import bean definitions from URL location [classpath*:spring/applicationContext-dubbo.xml]
    Offending resource: class path resource [spring/applicationContext-service.xml]; nested exception is org.springframework.beans.factory.xml.XmlBeanDefinitionStoreException: Line 28 in XML document from URL [file:/E:/javaee/DubboTest/Dubbo-test-service-jar/target/classes/spring/applicationContext-dubbo.xml] is invalid; nested exception is org.xml.sax.SAXParseException; lineNumber: 28; columnNumber: 8; cvc-complex-type.3.2.2: 元素 'dubbo:service' 中不允许出现属性 'tps'。
        at org.springframework.beans.factory.parsing.FailFastProblemReporter.error(FailFastProblemReporter.java:70)
        at org.springframework.beans.factory.parsing.ReaderContext.error(ReaderContext.java:85)
    

    大意就是dubbo:service中没有tps这个元素。
    我们知道,spring会去解析dubbo标签,来完成dubbo相关类的实例化。所以,直接查看dubbo.xsd,看看dubbo的自定义标签service中,是否有这个tps属性:

        <xsd:complexType name="serviceType">
            <xsd:complexContent>
                <xsd:extension base="abstractServiceType">
                    <xsd:choice minOccurs="0" maxOccurs="unbounded">
                        <xsd:element ref="method" minOccurs="0" maxOccurs="unbounded" />
                        <xsd:element ref="parameter" minOccurs="0" maxOccurs="unbounded" />
                        <xsd:element ref="beans:property" minOccurs="0" maxOccurs="unbounded" />
                    </xsd:choice>
                    <xsd:attribute name="interface" type="xsd:token" use="required">
                        <xsd:annotation>
                            <xsd:documentation><![CDATA[ Defines the interface to advertise for this service in the service registry. ]]></xsd:documentation>
                            <xsd:appinfo>
                                <tool:annotation>
                                    <tool:expected-type type="java.lang.Class"/>
                                </tool:annotation>
                            </xsd:appinfo>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="ref" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation><![CDATA[ The service implementation instance bean id. ]]></xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="class" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation><![CDATA[ The service implementation class name. ]]></xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="path" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation><![CDATA[ The service path. ]]></xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="provider" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation><![CDATA[ Deprecated. Replace to protocol. ]]></xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:anyAttribute namespace="##other" processContents="lax" />
                </xsd:extension>
            </xsd:complexContent>
        </xsd:complexType>
    
    
        <xsd:complexType name="abstractServiceType">
            <xsd:complexContent>
                <xsd:extension base="abstractInterfaceType">
                    <xsd:attribute name="register" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation><![CDATA[ The service can be register to registry. ]]></xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="version" type="xsd:string" use="optional" default="0.0.0">
                        <xsd:annotation>
                            <xsd:documentation><![CDATA[ The service version. ]]></xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="group" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation><![CDATA[ The service group. ]]></xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="deprecated" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation><![CDATA[ whether the service is deprecated. ]]></xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="delay" type="xsd:string" use="optional" default="0">
                        <xsd:annotation>
                            <xsd:documentation>
                                <![CDATA[ The service export delay millisecond. ]]>
                            </xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="export" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation>
                                <![CDATA[ The service is export. ]]>
                            </xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="weight" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation>
                                <![CDATA[ The service weight. ]]>
                            </xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="document" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation>
                                <![CDATA[ The service document. ]]>
                            </xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="dynamic" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation><![CDATA[ the service registered to the registry is dynamic(true) or static(false). ]]></xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="token" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation><![CDATA[ The service use token. ]]></xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="accesslog" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation><![CDATA[ The service use accesslog. ]]></xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="executes" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation><![CDATA[ The service allow execute requests. ]]></xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:attribute name="protocol" type="xsd:string" use="optional">
                        <xsd:annotation>
                            <xsd:documentation><![CDATA[ The service protocol. ]]></xsd:documentation>
                        </xsd:annotation>
                    </xsd:attribute>
                    <xsd:anyAttribute namespace="##other" processContents="lax" />
                </xsd:extension>
            </xsd:complexContent>
        </xsd:complexType>
    

    上面是service标签的属性,从里面可以看出,是没有tps这个属性的,但是有token这个属性。所以TokenFilter可以在dubbo的xml配置,而TpsLimitFilter不行。

    2.3 回答TpsLimitFilter有类上有@Activate注解,为什么不能像其他的内置filter一样,默认开启

    这一题,从上面可以得到答案。因为TpsLimitFilter的注解有group和value两个(其中,value=TPS_LIMIT_RATE_KEY = "tps"),而我们在xml配置文件中没法写入tps这是属性值,所以不能启动TpsLimitFilter。而我们通过配置规则向注册中心写入tps后,TpsLimitFilter就能启动了。

    registry.register(URL.valueOf("override://0.0.0.0/cn.andy.dubbo.DataService?tps=5&category=configurators"
    

    3、源码分析

    dubbo的服务发布过程的export过程中,会先经过ProtocolFilterWrapper,在这里,完成filter的初始化和配置。

    public class ProtocolFilterWrapper implements Protocol {
    
        private final Protocol protocol;
    
        public ProtocolFilterWrapper(Protocol protocol){
            if (protocol == null) {
                throw new IllegalArgumentException("protocol == null");
            }
            this.protocol = protocol;
        }
    
        public int getDefaultPort() {
            return protocol.getDefaultPort();
        }
    
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
                return protocol.export(invoker);
            }
            return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
        }
    
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                return protocol.refer(type, url);
            }
            return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
        }
    
        public void destroy() {
            protocol.destroy();
        }
    
        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (filters.size() > 0) {
                for (int i = filters.size() - 1; i >= 0; i --) {
                    final Filter filter = filters.get(i);
                    final Invoker<T> next = last;
                    last = new Invoker<T>() {
    
                        public Class<T> getInterface() {
                            return invoker.getInterface();
                        }
    
                        public URL getUrl() {
                            return invoker.getUrl();
                        }
    
                        public boolean isAvailable() {
                            return invoker.isAvailable();
                        }
    
                        public Result invoke(Invocation invocation) throws RpcException {
                            return filter.invoke(next, invocation);
                        }
    
                        public void destroy() {
                            invoker.destroy();
                        }
    
                        @Override
                        public String toString() {
                            return invoker.toString();
                        }
                    };
                }
            }
            return last;
        }
        
    }
    

    在buildInvokerChain方法中,通过 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);得到激活的filter。
    其中,invoker.getUrl()就是发布父类的url路径:

    dubbo://192.168.86.1:20880/cn.andy.dubbo.DataService?anyhost=true&application=dubbo-test-service&dispatcher=all&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&mock=false&pid=67080&retries=0&service.filter=andyFilter&side=provider&threadpool=fixed&threads=100&timeout=60000&timestamp=1543490087854&token=1234567
    

    key=service.filter:后续会用这个key得到我们自定义的filter。
    group=provider:表明是服务提供端
    然后,会运行到下面的方法,这里的values就是我们定义的filter数组。

     public List<T> getActivateExtension(URL url, String[] values, String group) {
            List<T> exts = new ArrayList<T>();
            List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
    //Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY就是-default。意思是说我们
    //在xml中配置filter时候没有-default,就是要加载默认启动的filter。这个大if就是加载dubbo提供的自动加载的filter集合
            if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
           //   getExtensionClasses()方法会加载所有的Filter接口的扩展实现,包括dubbo提供的和我们自定义的
               getExtensionClasses();
    //cachedActivates是一个集合,所有的Filter接口的扩展实现中,有@Activate注解的都会放入这个集合中
                for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
                    String name = entry.getKey();
                    Activate activate = entry.getValue();
    //这里group=provider,而activate.group()是类的filter注解中定义的,分为provider和consumer。
    //所以在服务发布端,只有注解中定义了provider的filter才会通过
                    if (isMatchGroup(group, activate.group())) {
                        T ext = getExtension(name);
                        if (! names.contains(name)
                                && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
    //这里主要关注 isActive(activate, url),在下面分析
                                && isActive(activate, url)) {
                            exts.add(ext);
                        }
                    }
                }
                Collections.sort(exts, ActivateComparator.COMPARATOR);
            }
            List<T> usrs = new ArrayList<T>();
            for (int i = 0; i < names.size(); i ++) {
                String name = names.get(i);
                if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                        && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
                    if (Constants.DEFAULT_KEY.equals(name)) {
                        if (usrs.size() > 0) {
                            exts.addAll(0, usrs);
                            usrs.clear();
                        }
                    } else {
                        T ext = getExtension(name);
                        usrs.add(ext);
                    }
                }
            }
            if (usrs.size() > 0) {
                exts.addAll(usrs);
            }
            return exts;
        }
    

    上面的cachedActivates是所有由@activate注解的filter接口的扩展实现,通过断点,得到其值:

    {exception=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=0), cache=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[cache], before=[], group=[consumer, provider], order=0), genericimpl=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[generic], before=[], group=[consumer], order=20000), deprecated=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[deprecated], before=[], group=[consumer], order=0), classloader=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=-30000), echo=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=-110000), monitor=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider, consumer], order=0), generic=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=-20000), timeout=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=0), accesslog=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[accesslog], before=[], group=[provider], order=0), token=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[token], before=[], group=[provider], order=0), trace=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=0), executelimit=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[executes], before=[], group=[provider], order=0), future=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[consumer], order=0), tps=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[tps], before=[], group=[provider], order=0), context=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[provider], order=-10000), activelimit=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[actives], before=[], group=[consumer], order=0), validation=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[validation], before=[], group=[consumer, provider], order=10000), consumercontext=@com.alibaba.dubbo.common.extension.Activate(after=[], value=[], before=[], group=[consumer], order=-10000)}
    

    可以看到,里面是有tps这个filter的。而且其要求有value值,value的名字是tps。即要求我们的url中要有这个tps的键值对。

    通过了 if (isMatchGroup(group, activate.group()))后,还需要进行

       if (! names.contains(name)
                                && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
    //这里主要关注 isActive(activate, url),在下面分析
                                && isActive(activate, url))
    

    才能把符合条件的filter加入到exts中,这个exts就是后面要启动的filter集合。
    看看 isActive方法

        private boolean isActive(Activate activate, URL url) {
    //获取注解中value()值
            String[] keys = activate.value();
    //如果value值是空的,说明不需要进行value过滤,直接放行
            if (keys == null || keys.length == 0) {
                return true;
            }
    //否则,就要对服务发布的url进行属性键值对的对比,看url中是否有这个value的值,而
    //我们的url中没有tps这个属性值,因此返回false,故TpsLimitFilter这个过滤器不会启动
            for (String key : keys) {
                for (Map.Entry<String, String> entry : url.getParameters().entrySet()) {
                    String k = entry.getKey();
                    String v = entry.getValue();
                    if ((k.equals(key) || k.endsWith("." + key))
                            && ConfigUtils.isNotEmpty(v)) {
                        return true;
                    }
                }
            }
            return false;
        }
    

    至此,分析完毕,正常情况下,为什么TpsLimitFilter这个filter不会启动。
    而当我们通过向注册中心动态写入配置规则后,根据dubbo的机制,会导致dubbo重新发布这个服务。刷新后发布的服务的url是

    dubbo://192.168.86.1:20880/cn.andy.dubbo.DataService?anyhost=true&application=dubbo-test-service&dispatcher=all&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&mock=false&pid=67656&retries=0&service.filter=andyFilter&side=provider&threadpool=fixed&threads=100&timeout=60000&timestamp=1543490522208&token=1234567&tps=5
    

    里面有tps这个属性,通过上面的分析,

       if (! names.contains(name)
                                && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
    //这里主要关注 isActive(activate, url),在下面分析
                                && isActive(activate, url))
    

    会返回true,导致我们的TpsLimitFilter会加入自动启动的filter集合中。
    至此,分析完毕!

    相关文章

      网友评论

          本文标题:由dubbo的TpsLimitFilter限流,说说dubbo的

          本文链接:https://www.haomeiwen.com/subject/sdgukqtx.html