美文网首页程序员
dubbo源码阅读(一) -- SPI 机制

dubbo源码阅读(一) -- SPI 机制

作者: 吃冰淇淋的团团 | 来源:发表于2019-02-20 22:37 被阅读0次

    dubbo在整体架构设计上都是通过SPI 去实现,因此将SPI 作为第一部分阅读内容。

    SPI

    Service Provider Interface,一种动态替换发现机制。对于一个接口,想动态的给它添加实现,只需要增加一个实现类。主要用于启用、扩展、或者替换框架的实现策略。


    SPI机制

    dubbo对SPI 的改进

    • JDK 的 SPI 会一次性实例化扩展点的所有实现类,若存在扩展实现类初始化很耗时,加载后却没有用,就会浪费资源。
    • dubbo使用了大量缓存,先从缓存中获取,若缓存中没有,才会创建。
    • 增加了对扩展点 IoC 和 AOP 的支持,一个扩展点可以直接 setter 注入其它扩展点。

    源码分析

    以下面这段代码为例,理解扩展点机制:

    ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()

    getExtensionLoader(Protocol.class)

        public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
            if (type == null) {
                throw new IllegalArgumentException("Extension type == null");
            }
            if (!type.isInterface()) {
                throw new IllegalArgumentException("Extension type (" + type + ") is not interface!");
            }
            if (!withExtensionAnnotation(type)) {
                throw new IllegalArgumentException("Extension type (" + type +
                        ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
            }
    
            ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
            if (loader == null) {
                EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
                loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
            }
            return loader;
        }
    

    这是一个static方法,返回ExtensionLoader<T>。这个方法主要是将传入的type作为key,从缓存EXTENSION_LOADERS(这是个ConcurrentMap)中获取ExtensionLoader,若为空,则新建ExtensionLoader实例并存入缓存EXTENSION_LOADERS中。

    接下来看新建ExtensionLoader实例:

        private ExtensionLoader(Class<?> type) {
            this.type = type;
            objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
        }
    

    ExtensionLoader的构造器是private,主要用于给type和objectFactory赋值。这里有段和目标代码类似的代码,ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()。注意这边对参数type进行了判断,若type为ExtensionFactory.class,则objectFactory为null。这个判断很必要,因为getExtensionLoader(ExtensionFactory.class)可能又会调用new ExtensionLoader(ExtensionFactory.class),若没有这个判断,则陷入无限循环。

    在继续看getAdaptiveExtension()之前先总结一下,执行getExtensionLoader返回的是一个ExtensionLoader实例。传入的type为Protocol.class时,objectFactory为ExtensionFactory的一个适配扩展类(这个接下去会解释);传入的type为ExtensionFactory.class时,objectFactory为null。

    getAdaptiveExtension()

    方法间的调用关系如下图所示:


    方法调用关系

    接着结合ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()和ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension())这两个例子分析代码。

    先看getAdaptiveExtension()

        public T getAdaptiveExtension() {
            Object instance = cachedAdaptiveInstance.get();
            if (instance == null) {
                if (createAdaptiveInstanceError == null) {
                    synchronized (cachedAdaptiveInstance) {
                        instance = cachedAdaptiveInstance.get();
                        if (instance == null) {
                            try {
                                instance = createAdaptiveExtension();
                                cachedAdaptiveInstance.set(instance);
                            } catch (Throwable t) {
                                createAdaptiveInstanceError = t;
                                throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                            }
                        }
                    }
                } else {
                    throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
                }
            }
    
            return (T) instance;
        }
    

    首先从cachedAdaptiveInstance(这是个用于存储对象的辅助类,可以理解为缓存)中获取实例,若为空,则同步双重检查后创建实例,然后存入cachedAdaptiveInstance。

    接下来看createAdaptiveExtension()

        private T createAdaptiveExtension() {
            try {
                return injectExtension((T) getAdaptiveExtensionClass().newInstance());
            } catch (Exception e) {
                throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
            }
        }
    

    这里包含两个方法,injectExtension和getAdaptiveExtensionClass,先分析getAdaptiveExtensionClass方法。

        private Class<?> getAdaptiveExtensionClass() {
            getExtensionClasses();
            if (cachedAdaptiveClass != null) {
                return cachedAdaptiveClass;
            }
            return cachedAdaptiveClass = createAdaptiveExtensionClass();
        }
    

    这个方法主要的作用是获取扩展类,然后判断cachedAdaptiveClass是否被赋值,若为null则创建。cachedAdaptiveClass在执行getExtensionClasses方法过程中若满足一定条件会被赋值,接着就看一下getExtensionClasses方法。

        private Map<String, Class<?>> getExtensionClasses() {
            Map<String, Class<?>> classes = cachedClasses.get();
            if (classes == null) {
                synchronized (cachedClasses) {
                    classes = cachedClasses.get();
                    if (classes == null) {
                        classes = loadExtensionClasses();
                        cachedClasses.set(classes);
                    }
                }
            }
            return classes;
        }
    

    从cachedClasses(这也是个用于存储对象的辅助类,存储的是一个Map,Map的key为类名,value为类)中获取类,若为空则同步双重检查后加载扩展类,然后存入cachedClasses。

    看一下加载扩展类的过程:

        private Map<String, Class<?>> loadExtensionClasses() {
            final SPI defaultAnnotation = type.getAnnotation(SPI.class);
            if (defaultAnnotation != null) {
                String value = defaultAnnotation.value();
                if ((value = value.trim()).length() > 0) {
                    String[] names = NAME_SEPARATOR.split(value);
                    if (names.length > 1) {
                        throw new IllegalStateException("More than 1 default extension name on extension " + type.getName()
                                + ": " + Arrays.toString(names));
                    }
                    if (names.length == 1) {
                        cachedDefaultName = names[0];
                    }
                }
            }
    
            Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
            loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
            loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
            loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
            loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
            loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
            loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
            return extensionClasses;
        }
    

    loadExtensionClasses方法首先获取type的SPI注解,若SPI注解有value值,就解析value设置cachedDefaultName,这里的type就是我们new ExtensionLoader时传入的参数值。传入的type为ExtensionFactory.class时,ExtensionFactory接口SPI注解的值为空,因此不会设置cachedDefaultName的值;传入的type为Protocol.class时,Protocol接口SPI注解的值为dubbo,因此cachedDefaultName的值被置为dubbo,即Protocol接口的默认扩展实现的key。

    ExtensionFactory.class

    @SPI
    public interface ExtensionFactory {
    
        <T> T getExtension(Class<T> type, String name);
    
    }
    

    Protocol.class

    @SPI("dubbo")
    public interface Protocol {
     
        int getDefaultPort();
    
        @Adaptive
        <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    
        @Adaptive
        <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    
        void destroy();
    
    }
    

    接下来loadExtensionClasses方法调用loadDirectory方法加载dubbo扩展点的配置文件内容,依次从"META-INF/dubbo/internal/"、"META-INF/dubbo/"和"META-INF/services/"三个目录下加载配置文件。

    loadDirectory方法的主要内容是通过类加载器获取资源地址,之后加载资源loadResource()。

        private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
           ......
                 String name = null;
                 int i = line.indexOf('=');
                 if (i > 0) {
                      name = line.substring(0, i).trim();
                      line = line.substring(i + 1).trim();
                  }
                  if (line.length() > 0) {
                       loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
                  }
          ......
        }
    

    loadResource方法用于解析资源文件内容,资源文件的规范是key=value的形式,例:dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol。于是上面代码中的name就是扩展点的名字,line就是扩展点类的全限定类名,然后关注一下调用的loadClass方法。

        private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
            if (!type.isAssignableFrom(clazz)) {
                throw new IllegalStateException("Error occurred when loading extension class (interface: " +
                        type + ", class line: " + clazz.getName() + "), class "
                        + clazz.getName() + " is not subtype of interface.");
            }
            if (clazz.isAnnotationPresent(Adaptive.class)) {
                cacheAdaptiveClass(clazz);
            } else if (isWrapperClass(clazz)) {
                cacheWrapperClass(clazz);
            } else {
                clazz.getConstructor();
                if (StringUtils.isEmpty(name)) {
                    name = findAnnotationName(clazz);
                    if (name.length() == 0) {
                        throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
                    }
                }
                
                String[] names = NAME_SEPARATOR.split(name);
                if (ArrayUtils.isNotEmpty(names)) {
                    cacheActivateClass(clazz, names[0]);
                    for (String n : names) {
                        cacheName(clazz, n);
                        saveInExtensionClass(extensionClasses, clazz, name);
                    }
                }
            }
        }
    

    对于不同的clazz(这个clazz是loadResource方法中通过扩展点类的全限定类名加载得到的),分为3种情况:
    第一种. clazz类上带有@Adaptive注解,则把这个clazz赋值给cachedAdaptiveClass,这就是之前提到的cachedAdaptiveClass在执行getExtensionClasses方法过程中若满足一定条件会被赋值。执行ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()时,根据配置文件存在多个扩展类,其中一个扩展类为AdaptiveExtensionFactory。此时,clazz为AdaptiveExtensionFactory(在类上带有@Adaptive注解),于是cachedAdaptiveClass被赋值。

    AdaptiveExtensionFactory.class

    @Adaptive
    public class AdaptiveExtensionFactory implements ExtensionFactory {
    ...
    }
    

    第二种. 若clazz为包装类,则存入cachedWrapperClasses(这是个Set)。判断是否是包装类通过判断这个类的构造方法是否把某个给定的类作为其唯一参数。

    第三种. 其他的情况,将扩展点类和对应的扩展点名字存入cachedNames,然后将扩展点名字+扩展点类存入extensionClasses(这是个Map,由方法参数传入)。至此,getExtensionClasses方法完成,最终将加载的扩展类存入cachedClasses。(以ExtensionFactory接口为例,cachedClasses中存储的扩展类包括spi=SpiExtensionFactory.class,spring=SpringExtensionFactory)。

    回到getAdaptiveExtensionClass方法,刚刚我们分析了getExtensionClasses方法,接下来就是判断cachedAdaptiveClass是否为null。对于ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension(),在之前分析loadClass方法时可以看到cachedAdaptiveClass被赋值为AdaptiveExtensionFactory,因此直接返回cachedAdaptiveClass即可。而对于ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(),cachedAdaptiveClass为null,需要继续执行createAdaptiveExtensionClass方法。

        private Class<?> createAdaptiveExtensionClass() {
            String code = createAdaptiveExtensionClassCode();
            ClassLoader classLoader = findClassLoader();
            org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
            return compiler.compile(code, classLoader);
        }
    

    这个方法用于动态生成Adaptive类,先调用createAdaptiveExtensionClassCode方法生成Adaptive类的代码,然后调用Compiler编译生成Class。createAdaptiveExtensionClassCode方法会对接口方法上对注解进行判断,若方法不带有@Adaptive注解,生成的Adaptive类中该方法抛出UnsupportedOperationException;若方法带有@Adaptive注解,则会生成相应的方法。以Protocol接口为例,动态生成的Adaptive类为Protocol$Adaptive。

    package org.apache.dubbo.rpc;
    import org.apache.dubbo.common.extension.ExtensionLoader;
    public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
    public void destroy() {throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }
    public int getDefaultPort() {throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
    }
    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
    if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
    if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");org.apache.dubbo.common.URL url = arg0.getUrl();
    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
    org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.export(arg0);
    }
    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
    if (arg1 == null) throw new IllegalArgumentException("url == null");
    org.apache.dubbo.common.URL url = arg1;
    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
    if(extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
    org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
    return extension.refer(arg0, arg1);
    }
    }
    

    Protocol$Adaptive类生成了export和refer方法(Protocol接口中这两个方法带有@Adaptive注解)。Adaptive类的作用是获取接口扩展点的具体实现类,调用具体实现类的方法。首先给extName赋值

    String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );

    若调用方配置了<dubbo:protocol name="registry" />,于是url.getProtocol()的值为registry,此时extName = registry,进而通过getExtension(extName)获取具体实现类RegistryProtocol;若url.getProtocol()为null,则extName为默认值dubbo,具体实现类为DubboProtocol。

    注意上面这段代码中的默认值"dubbo“,回忆一下Protocol类上的@SPI("dubbo")注解,执行loadExtensionClasses方法的时候"dubbo"被存入cachedDefaultName中,createAdaptiveExtensionClassCode方法动态生成代码的时候将cachedDefaultName作为默认值写入代码。

    总结一下createAdaptiveExtensionClass方法,首先动态生成Adaptive类,然后通过Adaptive类获取接口扩展类的具体实现。

    接下来看一下刚刚提到的getExtension(extName)方法

        public T getExtension(String name) {
            if (StringUtils.isEmpty(name)) {
                throw new IllegalArgumentException("Extension name == null");
            }
            if ("true".equals(name)) {
                return getDefaultExtension();
            }
            Holder<Object> holder = cachedInstances.get(name);
            if (holder == null) {
                cachedInstances.putIfAbsent(name, new Holder<Object>());
                holder = cachedInstances.get(name);
            }
            Object instance = holder.get();
            if (instance == null) {
                synchronized (holder) {
                    instance = holder.get();
                    if (instance == null) {
                        instance = createExtension(name);
                        holder.set(instance);
                    }
                }
            }
            return (T) instance;
        }
    

    getDefaultExtension方法会将cachedDefaultName作为参数再次调用getExtension方法,在我们的例子中就是调用getExtension("dubbo")。然后判断cachedInstances是否为空,若为null则同步双重检查,之后调用createExtension方法。createExtension方法继而调用getExtensionClasses方法(这个在上面已经分析过了),createExtension主要根据name获得扩展类的全限定名,然后新建实例。

    最后说一下方法调用关系图中的injectExtension

        private T injectExtension(T instance) {
            try {
                if (objectFactory != null) {
                    for (Method method : instance.getClass().getMethods()) {
                        if (method.getName().startsWith("set")
                                && method.getParameterTypes().length == 1
                                && Modifier.isPublic(method.getModifiers())) {
                           ...........
                            try {
                                String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                                Object object = objectFactory.getExtension(pt, property);
                                if (object != null) {
                                    method.invoke(instance, object);
                                }
                          ..........
        }
    

    可以看作dubbo的IOC反转控制,从objectFactory中获取对象为instance的字段赋值。

    tips:
    @Adaptive注解在类上和注解在方法上的区别:
    注解在类上表示这个类dubbo框架已经实现(装饰模式设计),主要作用于固定已知类,典型的有AdaptiveCompiler和AdaptiveExtensionFactory,这两个类都是在类上带有@Adaptive;
    【AdaptiveCompiler固定已知 -- 框架仅支持JavassistCompiler和JdkCompiler;AdaptiveExtensionFactory固定已知 -- 框架仅支持SpiExtensionFactory和SpringExtensionFactory】
    注解在方法上表示自动生成和编译一个动态代理适配器类,如上文的Protocol$Adpative。主要作用是调用方通过适配器类获取接口扩展类的具体实现,由于扩展类的具体实现是未知的,因此需要动态生成。

    相关文章

      网友评论

        本文标题:dubbo源码阅读(一) -- SPI 机制

        本文链接:https://www.haomeiwen.com/subject/jugqyqtx.html