美文网首页
Dubbo SPI(四)

Dubbo SPI(四)

作者: outwar | 来源:发表于2018-09-02 15:24 被阅读0次

    Activate

    之前已经讲到了关于dubbo spi的默认扩展,适配扩展,包装扩展。现在稍微总结一下:

    1. 普通扩展和默认扩展是实现类
    2. 包装扩展一般仅仅只是做一层装饰
    3. 在类上有Adaptive注解的适配扩展也是实现类,没有就会通过源码生成,源码通过url拿到真正想要实现的spi对象,没有就还是用默认扩展

    还差激活扩展(名字别在意)没有涉及,首先讲一下源码,然后通过filter来举例,增加印象。

    ExtensionLoader#getActivateExtension

    这个方法有很多不同参数的重载版本,来看一下最终调用的代码。

    public List<T> getActivateExtension(URL url, String[] values, String group) {
        List<T> exts = new ArrayList<T>();
        List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
        //不需要默认的激活扩展:-default
        if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
            //加载
            getExtensionClasses();
            for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
                String name = entry.getKey();
                Object activate = entry.getValue();
    
                String[] activateGroup, activateValue;
                //兼容,cachedActivates的值类型为Object
                if (activate instanceof Activate) {
                    activateGroup = ((Activate) activate).group();
                    activateValue = ((Activate) activate).value();
                } else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
                    activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
                    activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
                } else {
                    continue;
                }
                //是否分组匹配:provider和consumer
                if (isMatchGroup(group, activateGroup)) {
                    T ext = getExtension(name);
                    //1.该filter是否被手动指定激活
                    if (!names.contains(name)
                            //2.被手动通过-删除
                            && !names.contains(Constants.REMOVE_VALUE_PREFIX + name)
                            //3.激活条件是否满足
                            && isActive(activateValue, url)) {
                        exts.add(ext);
                    }
                }
            }
            //通过注解排序
            Collections.sort(exts, ActivateComparator.COMPARATOR);
        }
        //手动指定的激活扩展
        List<T> usrs = new ArrayList<T>();
        for (int i = 0; i < names.size(); i++) {
            String name = names.get(i);
            //不是通过-删除的激活条件
            if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                    //不存在删除该名字的条件
                    && !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
                //如果有default,将之前已经手动激活的扩展放到最前面
                if (Constants.DEFAULT_KEY.equals(name)) {
                    if (!usrs.isEmpty()) {
                        exts.addAll(0, usrs);
                        usrs.clear();
                    }
                } else {
                    //缓存已经激活的扩展
                    T ext = getExtension(name);
                    usrs.add(ext);
                }
            }
        }
        //default后面或者没有default都放到最后面
        if (!usrs.isEmpty()) {
            exts.addAll(usrs);
        }
        return exts;
    }
    

    代码很简单,复杂的就是激活条件,这里举一些例子,相信只要使用过就能理解。首先假设有a,b,c,d,e五个待激活扩展:

    1. a,b,c,-default,-a --> b,c
    2. b,default,e --> b,a,c,d,e(acd顺序由ActivateComparator保证)
    3. b,default,e,a,-c --> b,d,e,a

    filter

    这个接口有spi注解,只有一个方法且只有ProtocolFilterWrapper这个类在调用。

    ProtocolFilterWrapper

    public class ProtocolFilterWrapper implements Protocol {
    
        private final Protocol protocol;
        //构造方法表明这是一个包装扩展
        public ProtocolFilterWrapper(Protocol protocol) {
            if (protocol == null) {
                throw new IllegalArgumentException("protocol == null");
            }
            this.protocol = protocol;
        }
    
        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
            //所有被激活的扩展
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (!filters.isEmpty()) {
                //保证filter按照顺序执行,即排序第一的最先执行
                for (int i = filters.size() - 1; i >= 0; i--) {
                    final Filter filter = filters.get(i);
                    final Invoker<T> next = last;
                    //实现Invoker接口,对invoke方法进行处理
                    last = new Invoker<T>() {
    
                        @Override
                        public Class<T> getInterface() {
                            return invoker.getInterface();
                        }
    
                        @Override
                        public URL getUrl() {
                            return invoker.getUrl();
                        }
    
                        @Override
                        public boolean isAvailable() {
                            return invoker.isAvailable();
                        }
    
                        @Override
                        public Result invoke(Invocation invocation) throws RpcException {
                            return filter.invoke(next, invocation);
                        }
    
                        @Override
                        public void destroy() {
                            invoker.destroy();
                        }
    
                        @Override
                        public String toString() {
                            return invoker.toString();
                        }
                    };
                }
            }
            return last;
        }
    
        @Override
        public int getDefaultPort() {
            //直接调用真实的方法
            return protocol.getDefaultPort();
        }
    
        @Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
                return protocol.export(invoker);
            }
            return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
        }
    
        @Override
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                return protocol.refer(type, url);
            }
            return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
        }
    
        @Override
        public void destroy() {
            //直接调用真实的方法
            protocol.destroy();
        }
    
    }
    

    通过它的构造方法,很明显这是一个包装扩展。那么再来看一下它实现的方法,exportrefer通过buildInvokerChain做了一些特殊的操作,那么继续看这个特殊处理的方法。首先通过url和key找到激活条件,配合group找到所有满足条件的激活扩展filter,通过之前的讲解这里已经是排好序的,然后倒叙遍历,倒叙排列保证了第一个filter会先被执行。这就是dubbo的filter机制的实现方法,例如echo调用,泛化调用都是通过该机制实现的,也可以通过实现filter接口和spi注册帮助实现一些调用时的扩展。

    相关文章

      网友评论

          本文标题:Dubbo SPI(四)

          本文链接:https://www.haomeiwen.com/subject/grsmwftx.html