美文网首页
Dubbo源码分析----扩展机制

Dubbo源码分析----扩展机制

作者: _六道木 | 来源:发表于2018-02-19 14:57 被阅读24次

    Dubbo提供了一种类似JavaSPI的一种机制,ExtensionLoader是扩展机制的核心,类似于JavaSPI的ServiceLoader

    和JavaSPI类似,Dubbo规定在META-INF/services/、META-INF/dubbo/、internal/下定好配置文件,Dubbo会按照一定的规则去加载这些类

    例如rpc模块下的配置文件


    image.png

    com.alibaba.dubbo.rpc.ProxyFactory文件如下:

    stub=com.alibaba.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
    jdk=com.alibaba.dubbo.rpc.proxy.jdk.JdkProxyFactory
    javassist=com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory
    

    这里配置的类都是ProxyFactory接口的实现类,key为配置的名字

    整体加载流程

    我们拿Protocol来分析一下ExtensionLoader的加载过程
    看下Protocol接口的声明:

    @SPI("dubbo")
    public interface Protocol {
    
        /**
         * 获取缺省端口,当用户没有配置端口时使用。
         *
         * @return 缺省端口
         */
        int getDefaultPort();
    
        /**
         * 暴露远程服务:<br>
         * 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
         * 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>
         * 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>
         *
         * @param <T> 服务的类型
         * @param invoker 服务的执行体
         * @return exporter 暴露服务的引用,用于取消暴露
         * @throws RpcException 当暴露服务出错时抛出,比如端口已占用
         */
        @Adaptive
        <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    
        /**
         * 引用远程服务:<br>
         * 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>
         * 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
         * 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>
         *
         * @param <T> 服务的类型
         * @param type 服务的类型
         * @param url 远程服务的URL地址
         * @return invoker 服务的本地代理
         * @throws RpcException 当连接服务提供方失败时抛出
         */
        @Adaptive
        <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    
        /**
         * 释放协议:<br>
         * 1. 取消该协议所有已经暴露和引用的服务。<br>
         * 2. 释放协议所占用的所有资源,比如连接和端口。<br>
         * 3. 协议在释放后,依然能暴露和引用新的服务。<br>
         */
        void destroy();
    
    }
    

    ServiceConfig里声明了一个Protocol对象

    private static final Protocol protocol =
    ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    

    getExtensionLoader是获取ExtensionLoader类的对象,并放到EXTENSION_LOADERS(map)中
    getAdaptiveExtension如下:

        public T getAdaptiveExtension() {
            Object instance = cachedAdaptiveInstance.get();
            if (instance == null) {
                if(createAdaptiveInstanceError == null) {
                    synchronized (cachedAdaptiveInstance) {
                        instance = cachedAdaptiveInstance.get();
                        if (instance == null) {
                            try {
                                instance = createAdaptiveExtension();
                                cachedAdaptiveInstance.set(instance);
                            } catch (Throwable t) {
                                createAdaptiveInstanceError = t;
                                throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                            }
                        }
                    }
                }
                else {
                    throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
                }
            }
    
            return (T) instance;
        }
    

    主要是实例化一个对象,而这个对象是通过createAdaptiveExtension生成,生成完毕后放到了缓存当中。

    看下createAdaptiveExtension方法

        private T createAdaptiveExtension() {
            //....
                return injectExtension((T) getAdaptiveExtensionClass().newInstance());
            //....   
        }
    

    先调用getAdaptiveExtensionClass方法返回一个Class对象,然后实例化,然后再调用injectExtension处理该生成的对象。
    再看下getAdaptiveExtensionClass方法

        private Class<?> getAdaptiveExtensionClass() {
            getExtensionClasses();
            if (cachedAdaptiveClass != null) {
                return cachedAdaptiveClass;
            }
            return cachedAdaptiveClass = createAdaptiveExtensionClass();
        }
    

    getExtensionClasses方法调用了loadExtensionClasses方法

        private Map<String, Class<?>> loadExtensionClasses() {
            final SPI defaultAnnotation = type.getAnnotation(SPI.class);
            if(defaultAnnotation != null) {// 获取SPI注解上的名字且放到cachedDefaultName中
                String value = defaultAnnotation.value();
                if(value != null && (value = value.trim()).length() > 0) {
                    String[] names = NAME_SEPARATOR.split(value);
                    if(names.length > 1) {
                        throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                                + ": " + Arrays.toString(names));
                    }
                    if(names.length == 1) cachedDefaultName = names[0];
                }
            }
            // 加载3个目录下type类型的Class
            Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
            loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
            loadFile(extensionClasses, DUBBO_DIRECTORY);
            loadFile(extensionClasses, SERVICES_DIRECTORY);
            return extensionClasses;
        }
    

    前半部分是获取接口上的SPI注解的值,cachedDefaultName用来存储这个值,在Protocol上,该值是dubbo,那么默认会使用key为dubbo的实现类

    接下来loadFile就是重点所在了,会加载META-INF/services/、META-INF/dubbo/、internal/下配置的所有类并放在
    注意:该方法在一个type下,只会执行一次,一开始会解析每个文件

        String name = null;
        int i = line.indexOf('=');
        if (i > 0) {
            name = line.substring(0, i).trim();
            line = line.substring(i + 1).trim();
        }
    

    name为key,line为实现类全类名,line用来加载类,接下来分析一个解析line的过程

            Class<?> clazz = Class.forName(line, true, classLoader);
            //....
            if (clazz.isAnnotationPresent(Adaptive.class)) {// 如果类上有Adaptive注解,则放到cachedAdaptiveClass中
                if(cachedAdaptiveClass == null) {
                    cachedAdaptiveClass = clazz;
                } else if (! cachedAdaptiveClass.equals(clazz)) {  //error }
            } else {
                try {// 通过文件中指定的Class进行构造方法实例化
                    clazz.getConstructor(type);// 调用参数为当前类的构造方法
                    Set<Class<?>> wrappers = cachedWrapperClasses;
                    if (wrappers == null) {
                        cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                        wrappers = cachedWrapperClasses;
                    }
                    wrappers.add(clazz);
                } catch (NoSuchMethodException e) {// 如果没有,则会报错,走默认构造方法
                    clazz.getConstructor();
                    //....
                    String[] names = NAME_SEPARATOR.split(name);
                    if (names != null && names.length > 0) {
                        // 获取类上的Activate注解,且放到map中,key为name,
                        Activate activate = clazz.getAnnotation(Activate.class);
                        if (activate != null) {
                            cachedActivates.put(names[0], activate);
                        }
                        for (String n : names) {
                            if (! cachedNames.containsKey(clazz)) {
                                cachedNames.put(clazz, n);
                            }
                            Class<?> c = extensionClasses.get(n);
                            if (c == null) {
                                extensionClasses.put(n, clazz);
                            } else if (c != clazz) {  //error }
                        }
                    }
                }
            }
    

    实例化的时候,使用了try....catch来捕获异常,这是因为clazz.getConstructor(type);会报错,因为该类没有一个包含自己类型的一个构造方法
    其实这里是用了装饰模式,看下Protocol接口的实现类


    image.png

    其中只有两个类是有这个构造方法的,那么将其放到cachedWrapperClasses(Set)中,后面会用这几个类来装饰Protocol,增加额外的功能
    非装饰类的情况下:
    判断name是否为空(有些只有类名,没有key,如rpc-http模块下的配置文件为com.alibaba.dubbo.rpc.protocol.http.HttpProtocol),那么这里处理是取http
    然后如果有Activate注解的放到cachedActivates中,没该注解的放到extensionClasses中

    执行完getExtensionClasses方法之后会执行createAdaptiveExtensionClass方法,这个方法是动态生成Java代码然后编译,最后返回一个Class对象。先看下createAdaptiveExtensionClass方法

        private Class<?> createAdaptiveExtensionClass() {
            String code = createAdaptiveExtensionClassCode();// 动态生成代码
            ClassLoader classLoader = findClassLoader();
            // 编译
            com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
            return compiler.compile(code, classLoader);
        }
    

    Protocol生成的代码如下:

    package com.alibaba.dubbo.rpc;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
        public void destroy() {
            throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
        public int getDefaultPort() {
            throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
        public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
            if (arg0 == null) 
                    throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
            if (arg0.getUrl() == null) 
                    throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
            
            com.alibaba.dubbo.common.URL url = arg0.getUrl();
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) 
                    throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader
                    .getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.export(arg0);
    }
        public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
            if (arg1 == null) 
                    throw new IllegalArgumentException("url == null");
            com.alibaba.dubbo.common.URL url = arg1;
            String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) 
                    throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader
                    .getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.refer(arg0, arg1);
        }
    }
    

    其他的生成的代码模板都基本相似

    回到createAdaptiveExtension方法,执行完getExtensionClasses类且生成实例后,会调用injectExtension注入属性
    在其中会遍历set方法,取得需要set的对象的名字,如setProtocol就是protocol,然后再EXTENSION_LOADERS中取对应的ExtensionLoader,最后的过程就和获取Protocol对象是一样的了
    取到对象后就用反射将属性设置进去

    在export(暴露)和refer(引用)的时候会先通过getExtension获取Protocol,我们看下这个方法的实现

        public T getExtension(String name) {
            if (name == null || name.length() == 0)
                throw new IllegalArgumentException("Extension name == null");
            if ("true".equals(name)) {
                return getDefaultExtension();
            }
            Holder<Object> holder = cachedInstances.get(name);
            if (holder == null) {
                cachedInstances.putIfAbsent(name, new Holder<Object>());
                holder = cachedInstances.get(name);
            }
            Object instance = holder.get();
            if (instance == null) {
                synchronized (holder) {
                    instance = holder.get();
                    if (instance == null) {
                        instance = createExtension(name);
                        holder.set(instance);
                    }
                }
            }
            return (T) instance;
        }
    

    生成实例的方法是createExtension方法

        private T createExtension(String name) {
            Class<?> clazz = getExtensionClasses().get(name);
            if (clazz == null) {
                throw findException(name);
            }
            try {
                T instance = (T) EXTENSION_INSTANCES.get(clazz);
                if (instance == null) {
                    EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
                    instance = (T) EXTENSION_INSTANCES.get(clazz);
                }
                injectExtension(instance);
                Set<Class<?>> wrapperClasses = cachedWrapperClasses;
                if (wrapperClasses != null && wrapperClasses.size() > 0) {
                    for (Class<?> wrapperClass : wrapperClasses) {
                        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                    }
                }
                return instance;
            } catch (Throwable t) {//ERROR }
        }
    

    getExtensionClasses方法是从cachedClasses中获取,如果没有,那么将会调用loadExtensionClasses,由于这个在一开始就已经执行过loadExtensionClasses方法,那么缓存中就会有该对象
    可以看到遍历cachedWrapperClasses,将最初的实例一层层的包装起来,例如Protocol外面就有两个类的包装ProtocolFilterWrapper和ProtocolListenerWrapper。最后返回生成好的Protocol示例,那么整个流程就已经完毕了

    Adaptive和Activate注解

    在一些SPI声明的接口有,有一些不同,举两个栗子,Protocol和Filter,不同点如下:

    1. Protocol在方法上有@Adaptive注解,Filter没有
    2. Filter的实现类中,有@Activate注解,Protocol没有

    其实就引出个问题,@Adaptive和@Activate有什么用?

    Adaptive

    以Protocol为例,分析一下Adaptive,在ExtensionLoader类中,搜索一下Adaptive解析的地方,找到createAdaptiveExtensionClassCode,前面分析过,这是动态生成代码的地方,那么看下其中如何处理Adaptive
    首先一进来,判断了方法中是否有该注解

            boolean hasAdaptiveAnnotation = false;
            for(Method m : methods) {
                if(m.isAnnotationPresent(Adaptive.class)) {
                    hasAdaptiveAnnotation = true;
                    break;
                }
            }
            // 完全没有Adaptive方法,则不需要生成Adaptive类
            if(! hasAdaptiveAnnotation)
                throw new IllegalStateException("No adaptive method on extension " + type.getName() + ", refuse to create the adaptive class!");
    

    可以看到如果要动态生成该类,那么需要有该注解。接下来会获取注解的值

        String[] value = adaptiveAnnotation.value();
        // 没有设置Key,则使用“扩展点接口名的点分隔 作为Key
        if(value.length == 0) {
            char[] charArray = type.getSimpleName().toCharArray();
            StringBuilder sb = new StringBuilder(128);
            for (int i = 0; i < charArray.length; i++) {
                if(Character.isUpperCase(charArray[i])) {
                    if(i != 0) {
                        sb.append(".");
                    }
                    sb.append(Character.toLowerCase(charArray[i]));
                }
                else {
                    sb.append(charArray[i]);
                }
            }
            value = new String[] {sb.toString()};
        }
    

    value是Adaptive上的值,如果为空则取SimpleName。接下来看下如何处理该value

        for (int i = value.length - 1; i >= 0; --i) {
            if(i == value.length - 1) {
                if(null != defaultExtName) {
                    if(!"protocol".equals(value[i]))
                        if (hasInvocation)
                            getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
                        else
                            getNameCode = String.format("url.getParameter(\"%s\", \"%s\")", value[i], defaultExtName);
                    else
                        getNameCode = String.format("( url.getProtocol() == null ? \"%s\" : url.getProtocol() )", defaultExtName);
                }
                else {
                    if(!"protocol".equals(value[i]))
                        if (hasInvocation)
                            getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
                        else
                            getNameCode = String.format("url.getParameter(\"%s\")", value[i]);
                    else
                        getNameCode = "url.getProtocol()";
                }
            }
            else {
                if(!"protocol".equals(value[i]))
                    if (hasInvocation)
                        getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
                    else
                        getNameCode = String.format("url.getParameter(\"%s\", %s)", value[i], getNameCode);
                else
                    getNameCode = String.format("url.getProtocol() == null ? (%s) : url.getProtocol()", getNameCode);
            }
        }
        code.append("\nString extName = ").append(getNameCode).append(";");
        // check extName == null?
        String s = String.format("\nif(extName == null) " +
                "throw new IllegalStateException(\"Fail to get extension(%s) name from url(\" + url.toString() + \") use keys(%s)\");",
                type.getName(), Arrays.toString(value));
    

    不断的拼接字符串,以Protocol为例,得到的如下

     String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
            if(extName == null) 
                    throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader
                    .getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
    

    其实就是从Url中获取注解上的值,做为extName,调用getExtension生成对象

    Activate

    Filter和Protocol获取的方式不太一样,两者如下:

    Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group)
    

    getAdaptiveExtension会触发到createAdaptiveExtensionClassCode方法,即会对Adaptive进行处理,而getActivateExtension则是对Activate进行处理

        public List<T> getActivateExtension(URL url, String[] values, String group) {
            List<T> exts = new ArrayList<T>();
            List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
            if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
                getExtensionClasses();// 加载META-INF下3个文件的Class、
                // 遍历有Activate注解的信息,该信息在loadFile解析的时候放进去
                for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
                    String name = entry.getKey();
                    Activate activate = entry.getValue();
                    if (isMatchGroup(group, activate.group())) {// 判断当前group和注解的gourp是否匹配
                        T ext = getExtension(name);//获取对应对象
                        if (! names.contains(name)
                                && ! names.contains(Constants.REMOVE_VALUE_PREFIX + name) 
                                && isActive(activate, url)) {//判断url是否有注解的key
                            exts.add(ext);// 如果匹配成功则加入该返回集合
                        }
                    }
                }
                Collections.sort(exts, ActivateComparator.COMPARATOR);
            }
            //....
            return exts;
        }
    

    可以看到,这种情况下会直接通过url的参数去获取符合条件的Filter列表,例如如果当前Url的group为consumer,key中有一个actives的可以,那么当取到所有Filter的实现类的时候,会去匹配,当处理到ActiveLimitFilter的时候,发现符合,则加入到Filter中(因为ActiveLimitFilter配置的group为consumer,value为actives)

    总结:

    1. Adaptive是根据固定key通过value寻找对应实现的过程
    2. Activate是根据不同的key寻找不同实现的过程,value在对应实现中处理

    相关文章

      网友评论

          本文标题:Dubbo源码分析----扩展机制

          本文链接:https://www.haomeiwen.com/subject/pswitftx.html