美文网首页
第4章 Dubbo SPI 的设计与实现

第4章 Dubbo SPI 的设计与实现

作者: 原水寒 | 来源:发表于2019-07-19 09:19 被阅读0次

本节分析 Dubbo SPI 基础源码。

import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Protocol;

ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
Protocol dubboProtocol = loader.getExtension("dubbo");

Protocol 接口的定义

@SPI("dubbo")
public interface Protocol {
    int getDefaultPort();

    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    void destroy();
}

注意这里的两个核心注解 @SPI@Adaptive

@SPI:指定一个接口为SPI接口(可扩展接口)

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface SPI {
    /**
     * 缺省扩展点名
     * default extension name
     */
    String value() default "";
}

@Adaptive:该注解可以注解在两个地方:

  • 接口上:在 Dubbo 中,仅有 AdaptiveExtensionFactoryAdaptiveCompiler
  • 接口的方法上:会动态生成相应的动态类(实际上是一个工厂类,工厂设计模式),例如 Protocol$Adapter

Dubbo 的整个加载 SPI 扩展流程都是在 ExtensionLoader 中完成的。

一、获取 ExtensionLoader

ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);

ExtensionLoader 可以类比为 JDK SPI 中的 ServiceLoader

首先来看一下 ExtensionLoader 的类属性:

    /**
     * 类变量(全局变量),也可以做成一个 ExtensionLoaderFactory 来专门存储这些类变量
     */
    /** 存放SPI文件的三个目录 */
    /** 1. META-INF/services/ 是jdk的SPI文件的存放目录 */
    private static final String SERVICES_DIRECTORY = "META-INF/services/";
    /** 2. META-INF/dubbo/ 是第三方SPI文件的存放目录 */
    private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
    /** 3. META-INF/dubbo/internal/ 是Dubbo内部SPI文件的存放目录 */
    private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";

    private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");

    /** key: SPI接口Class value: 该接口的ExtensionLoader, eg. "interface com.alibaba.dubbo.rpc.Protocol" -> "com.alibaba.dubbo.common.extension.ExtensionLoader[com.alibaba.dubbo.rpc.Protocol]" */
    private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
    /** key: SPI接口Class value: SPI实现类的对象实例 */
    private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();

注意:上述的都是类属性,即所有该类的实例都共享。在 SOFARPC 中,是有一个 ExtensionLoaderFactory 类来存储 Map<Class, ExtensionLoader> 变量的。

再来看一下 ExtensionLoader 的实例属性:

    /**
     * 实例变量
     */
    /** SPI接口Class,eg. Protocol */
    private final Class<?> type;
    /** 用于 Dubbo IOC 注入实例的创建 */
    private final ExtensionFactory objectFactory;
    /** key: ExtensionClass的Class value: SPI实现类的key */
    private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();
    /** 存储 type 的所有普通实现类的 spi key-valueClass: Map<String, Class<?>> getExtensionClasses() */
    private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();
    /** 如果类上有 @Activate 注解,则缓存到 Map<String, Activate> cachedActivates 中,key=spi key,value=注解 */
    private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();
    /** 缓存创建好的extensionClass实例 */
    private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();
    /** 缓存创建好的适配类实例 */
    private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>();
    /** 存储类上带有 @Adaptive 注解的Class,如果没有,则自动生成一个适配类 */
    private volatile Class<?> cachedAdaptiveClass = null;
    /** 如果扩展接口(eg. Protocol)有 @SPI 注解,则获取其 @SPI.value 属性,如果存在,则缓存到 String cachedDefaultName 中 */
    private String cachedDefaultName;
    /** 存储在创建适配类实例这个过程中发生的错误,后续该 type 下再调用创建适配类的时候,就直接抛错 */
    private volatile Throwable createAdaptiveInstanceError;
    /** 存放具有一个type入参的构造器的实现类(wrapper类)的Class对象,用于实现 AOP */
    private Set<Class<?>> cachedWrapperClasses;
    /** key :实现类的全类名  value: exception, 如果加载当前行失败,则存储在这里,防止真正的异常被吞掉 */
    private Map<String, IllegalStateException> exceptions = new ConcurrentHashMap<String, IllegalStateException>();

来看一下 getExtensionLoader(Class<T> type) 的源码:

    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        /**
         * 1. 校验入参type:非空 + 接口 + 含有@SPI注解
         */
        if (type == null) {
            throw new IllegalArgumentException("Extension type == null");
        }
        if (!type.isInterface()) {
            throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
        }
        if (!withExtensionAnnotation(type)) {
            throw new IllegalArgumentException("Extension type(" + type +
                    ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
        }

        /**
         * 2. 根据type接口从全局缓存EXTENSION_LOADERS中获取ExtensionLoader,如果有直接返回;如果没有,则先创建,之后放入缓存,最后返回
         */
        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loader == null) {
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }

创建 ExtensionLoader

    private ExtensionLoader(Class<?> type) {
        this.type = type;
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }

当前创建的 ExtensionLoader 对象的 type 是 com.alibaba.dubbo.rpc.Protocol,所以此时会执行:objectFactory = ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()。(实际上,除了 ExtensionFactory 本身之外,其他所有的 SPI 接口的 ExtensionLoader 对象都有一个 ExtensionFactory 实例 objectFactory,该实例主要用于实现 Dubbo IOC 注入实例的创建)该行代码后续在进行分析。我们目前只需要知道 ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(Class<T> type) 最终得到的实例变量是:

Class<?> type = interface T
ExtensionFactory objectFactory = AdaptiveExtensionFactory(适配类)
                 - factories = [SpiExtensionFactory实例, SpringExtensionFactory实例]

二、加载指定 SPI 实现类

Protocol dubboProtocol = loader.getExtension("dubbo")

调用层级:

ExtensionLoader<T>.getExtension()
-- createExtension(String name)
  // 获取扩展类
  -- getExtensionClasses().get(name)
    -- loadExtensionClasses()
      // 加载一个目录
      -- loadDirectory(Map<String, Class<?>> extensionClasses, String dir)
        // 加载目录下的一个文件
        -- loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL)
          // 加载文件中的一个SPI实现类
          -- loadClass(Map<String, Class<?>> extensionClasses, Class<?> clazz, String name)
  // Dubbo IOC
  -- injectExtension(instance);
  // Dubbo AOP
  -- wrapper包装;

本小节只分析 getExtensionClasses()injectExtension(instance)wrapper包装 放在 IOC 和 AOP 部分进行分析。

    /**
     * 根据传入的spiKey(name,eg.dubbo),创建相应的SPI扩展实例(eg.DubboProtocol),
     * 并且将 <spiKey, 创建好的SPI扩展实例> 存储到 Map<String, Holder<Object>> cachedInstances 中,
     * eg. <"dubbo", DubboProtocol>
     */
    @SuppressWarnings("unchecked")
    public T getExtension(String name) {
        ...
        // 创建 @SPI.value 的 extension 的实例
        if ("true".equals(name)) {
            return getDefaultExtension();
        }
        // 首先从缓存 Map<String, Holder<Object>> cachedInstances 中获取,如果存在,则直接返回,如果不存在,则双重检测创建实例
        Holder<Object> holder = cachedInstances.get(name);
        if (holder == null) {
            cachedInstances.putIfAbsent(name, new Holder<Object>());
            holder = cachedInstances.get(name);
        }
        Object instance = holder.get();
        if (instance == null) {
            synchronized (holder) {
                instance = holder.get();
                if (instance == null) {
                    // 创建SPI实现类实例
                    instance = createExtension(name);
                    // 进行缓存
                    holder.set(instance);
                }
            }
        }
        return (T) instance;
    }


    /**
     * 调用方已经加了同步锁,无需考虑线程同步问题
     */
    private T createExtension(String name) {
        // 从普通SPI实现类map中获取传入的spiKey的实现类,例如 DubboProtocol.class
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            // 如果没找到,抛出加载SPI实现类时出现的异常
            throw findException(name);
        }
        try {
            // 从缓存中获取SPI实现类实例,如果有,直接返回,如果没有,创建
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                // 创建 DubboProtocol 实例(这里可以看出,要求SPI实现类必须具有无参构造器),放到缓存中 Map<Class<?>, Object> EXTENSION_INSTANCES
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            // Dubbo IOC
            injectExtension(instance);
            // Dubbo AOP
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw ...;
        }
    }

    /**
     * 加载三个 spi 目录下的 Class<?> type(eg. Protocol) 指定的 spi 接口的所有适配类、包装类和普通实现类,进而初始化各个缓存项
     * 1. Holder<Map<String, Class<?>>> cachedClasses,存储 type 的所有普通实现类的 <spiKey, valueClass>
     * 2. 如果扩展接口(eg. Protocol)有 @SPI 注解,则获取其 @SPI.value 属性,如果存在,则缓存到 String cachedDefaultName 中(例如, "dubbo")
     * 3. 之后依次加载三个 spi 目录下的所有 type spi 文件,读取文件中的每一行:
     *  3.1 如果类上有 @Adaptive 注解,则缓存到 Class<?> cachedAdaptiveClass 中
     *  3.2 如果类是 wrapper 类,则缓存到 Set<Class<?>> cachedWrapperClasses 中
     *  3.3 如果类是普通实现类,
     *      检测实现类必须有无参构造器,(在 T createExtension(String name) 处会调用无参构造器实例化)
     *      如果类上有 @Activate 注解,则缓存到 Map<String, Activate> cachedActivates 中,key=spiKey,value=注解;
     *      存储 spiKey 到 Map<Class<?>, String> cachedNames;
     *      将当前的 spi 实现类的 <spiKey, value> 存储到 Holder<Map<String, Class<?>>> cachedClasses
     *  3.4 如果加载当前行失败,则存储在 Map<String, IllegalStateException> exceptions 中,key :实现类的全类名 value: exception,防止真正的异常被吞掉
     */
    private Map<String, Class<?>> getExtensionClasses() {
        // 从缓存中获取 SPI 实现类集合,如果有,直接返回,如果没有,双重检测进行加载
        Map<String, Class<?>> classes = cachedClasses.get();
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
    }

    // synchronized in getExtensionClasses
    private Map<String, Class<?>> loadExtensionClasses() {
        // 获取扩展接口(eg. Protocol)的 @SPI 注解(SPI接口必须有该注解),获取其 @SPI.value 属性,如果存在,则缓存到 String cachedDefaultName 中
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if (defaultAnnotation != null) {
            String value = defaultAnnotation.value();
            ...
            cachedDefaultName = value;
        }

        Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
        loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
        loadDirectory(extensionClasses, DUBBO_DIRECTORY);
        loadDirectory(extensionClasses, SERVICES_DIRECTORY);
        return extensionClasses;
    }

    /**
     * 加载指定目录dir下的SPI实现
     */
    private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir) {
        String fileName = dir + type.getName();
        try {
            // 同一个名称的文件可能存在于多个 jar 包中
            Enumeration<java.net.URL> urls;
            ClassLoader classLoader = findClassLoader();
            ...
            if (urls != null) {
                while (urls.hasMoreElements()) {
                    // 每一个 url 是一个文件
                    java.net.URL resourceURL = urls.nextElement();
                    loadResource(extensionClasses, classLoader, resourceURL);
                }
            }
        } catch (Throwable t) {
            logger.error(...);
        }
    }

    /**
     * 加载一个文件
     */
    private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), "utf-8"));
            ...
            // 每一行是一个 spi 扩展实现
            String line;
            // 读取每一行
            while ((line = reader.readLine()) != null) {
                // 去除注释 key=value #xxx
                ...
                try {
                    String name = null;
                    int i = line.indexOf('=');
                    if (i > 0) {
                        // 获取 spi key
                        name = line.substring(0, i).trim();
                        // 获取 spi 实现类
                        line = line.substring(i + 1).trim();
                    }
                    if (line.length() > 0) {
                        // 加载一个类
                        loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
                    }
                } catch (Throwable t) {
                    IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
                    // 加载失败,存储失败信息
                    exceptions.put(line, e);
                }
            }
            ...
        } catch (Throwable t) {
            logger.error(...);
        }
    }

    /**
     * 加载一个类
     */
    private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
        // adaptive 类、wrapper 类、普通 SPI 类都必须实现 SPI 接口
        if (!type.isAssignableFrom(clazz)) {
            throw ...
        }
        // 如果类上有 @Adaptive 注解,则缓存到 Class<?> cachedAdaptiveClass 中(如果有多个 @Adaptive 类,则直接抛出异常)
        if (clazz.isAnnotationPresent(Adaptive.class)) {
            if (cachedAdaptiveClass == null) {
                cachedAdaptiveClass = clazz;
            } else if (!cachedAdaptiveClass.equals(clazz)) {
                throw new IllegalStateException("More than 1 adaptive class found: "
                        + cachedAdaptiveClass.getClass().getName()
                        + ", " + clazz.getClass().getName());
            }
        // 如果类是 wrapper 类,则缓存到 Set<Class<?>> cachedWrapperClasses 中
        } else if (isWrapperClass(clazz)) {
            Set<Class<?>> wrappers = cachedWrapperClasses;
            if (wrappers == null) {
                cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                wrappers = cachedWrapperClasses;
            }
            wrappers.add(clazz);
        // 正常类
        } else {
            // 检测必须有无参构造器
            clazz.getConstructor();
            ...
            String[] names = NAME_SEPARATOR.split(name);
            if (names != null && names.length > 0) {
                // 如果类上有 @Activate 注解,则缓存到 Map<String, Activate> cachedActivates 中,key=spiKey,value=注解
                Activate activate = clazz.getAnnotation(Activate.class);
                if (activate != null) {
                    cachedActivates.put(names[0], activate);
                }

                for (String n : names) {
                    // 存储 spiKey 到 Map<Class<?>, String> cachedNames
                    if (!cachedNames.containsKey(clazz)) {
                        cachedNames.put(clazz, n);
                    }
                    // 将 spi 的 key-value 存储到 Map<String, Class<?>> extensionClasses 中,
                    // 实际上该 extensionClasses 会在最外层 getExtensionClasses() 处存储到 Holder<Map<String, Class<?>>> cachedClasses 中
                    ...
                    extensionClasses.put(n, clazz);
                    ...
                }
            }
        }
    }

    private boolean isWrapperClass(Class<?> clazz) {
        try {
            clazz.getConstructor(type);
            return true;
        } catch (NoSuchMethodException e) {
            return false;
        }
    }

到这里,getExtensionClasses() 就分析完成了,来看下此时的 ExtensionLoader 对象:

- static Map<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS
    -- "interface com.alibaba.dubbo.rpc.Protocol" -> "com.alibaba.dubbo.common.extension.ExtensionLoader[com.alibaba.dubbo.rpc.Protocol]"
    -- "interface com.alibaba.dubbo.common.extension.ExtensionFactory" -> "com.alibaba.dubbo.common.extension.ExtensionLoader[com.alibaba.dubbo.common.extension.ExtensionFactory]"
- static Map<Class<?>, Object> EXTENSION_INSTANCES
    -- "class com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol" -> DubboProtocol 实例
    -- "class com.alibaba.dubbo.common.extension.factory.SpiExtensionFactory" -> SpiExtensionFactory 实例
    -- "class com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory" -> SpringExtensionFactory 实例

- Class<?> type = interface com.alibaba.dubbo.rpc.Protocol;
- ExtensionFactory objectFactory = AdaptiveExtensionFactory(适配类);
       -- factories = [SpiExtensionFactory实例, SpringExtensionFactory实例]
- Map<Class<?>, String> cachedNames
    -- "class com.alibaba.dubbo.registry.integration.RegistryProtocol" -> "registry"
    -- "class com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol" -> "injvm"
    -- "class com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol" -> "dubbo"
    -- "class com.alibaba.dubbo.rpc.support.MockProtocol" -> "mock"
- Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();
     -- "registry" -> "class com.alibaba.dubbo.registry.integration.RegistryProtocol"
     -- "injvm" -> "class com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol"
     -- "dubbo" -> "class com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol"
     -- "mock" -> "class com.alibaba.dubbo.rpc.support.MockProtocol"
- Map<String, Activate> cachedActivates
- Map<String, Holder<Object>> cachedInstances
    -- "dubbo" -> 
- Holder<Object> cachedAdaptiveInstance
- Class<?> cachedAdaptiveClass = null;
- String cachedDefaultName = 'dubbo';
- Throwable createAdaptiveInstanceError;
- Set<Class<?>> cachedWrapperClasses;
    -- class com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
    -- class com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
- Map<String, IllegalStateException> exceptions

三、加载或创建适配类

// 1. 手动适配类,类上带有注解 @Adaptive
ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()
// 2. 自动生成适配类,方法上带有注解 @Adaptive
Protocol adaptiveExtension = loader.getAdaptiveExtension()

3.1、手动创建适配类

Dubbo 中手动创建的适配类只有 ExtensionFactoryCompiler,以 ExtensionFactory 为例,

image.png
@SPI
public interface ExtensionFactory {
    /**
     * @param type object type. SPI 类型
     * @param name object name. setXxx 中的 xxx
     * @return object instance.
     */
    <T> T getExtension(Class<T> type, String name);
}

@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {
    /**
     * factories = [SpiExtensionFactory, SpringExtensionFactory]
     * 也就是说 dubbo 的 ioc 可以注入 "spi适配类" 和 "spring bean"
     */
    private final List<ExtensionFactory> factories;

    public AdaptiveExtensionFactory() {
        ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
        List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
        for (String name : loader.getSupportedExtensions()) {
            list.add(loader.getExtension(name));
        }
        factories = Collections.unmodifiableList(list);
    }

    @Override
    public <T> T getExtension(Class<T> type, String name) {
        for (ExtensionFactory factory : factories) {
            T extension = factory.getExtension(type, name);
            if (extension != null) {
                return extension;
            }
        }
        return null;
    }
}

从上一小节中 SPI 实现类的加载过程来看,在加载 ExtensionFactory 的过程中,会把 AdaptiveExtensionFactory 缓存到 ExtensionFactoryExtensionLoader 中的 Class<?> cachedAdaptiveClass 属性上。

    public T getAdaptiveExtension() {
        // 如果创建好的适配器实例存在,则直接返回,否则,双重检测进行创建单例 cachedAdaptiveInstance
        Object instance = cachedAdaptiveInstance.get();
        if (instance == null) {
            if (createAdaptiveInstanceError == null) {
                synchronized (cachedAdaptiveInstance) {
                    instance = cachedAdaptiveInstance.get();
                    if (instance == null) {
                        try {
                            // 创建 适配类 实例
                            instance = createAdaptiveExtension();
                            // 将适配类实例缓存到 Holder<Object> cachedAdaptiveInstance 中
                            cachedAdaptiveInstance.set(instance);
                        } catch (Throwable t) {
                            // 如果创建适配类失败,存储到 Throwable createAdaptiveInstanceError 缓存中
                            createAdaptiveInstanceError = t;
                            throw new IllegalStateException(...);
                        }
                    }
                }
            } else {
                throw new IllegalStateException(...);
            }
        }
        return (T) instance;
    }

    /**
     * createAdaptiveExtension()
     * --getAdaptiveExtensionClass()
     *   //从dubbo-spi配置文件中获取AdaptiveExtensionClass
     *   --getExtensionClasses()
     *     --loadExtensionClasses()
     *       --loadFile(Map<String, Class<?>> extensionClasses, String dir)
     *   //创建适配类
     *   --createAdaptiveExtensionClass()
     *
     * --injectExtension(T instance)  // dubbo ioc
     */
    private T createAdaptiveExtension() {
        try {
            return injectExtension((T) getAdaptiveExtensionClass().newInstance());
        } catch (Exception e) {
            throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
        }
    }

    private Class<?> getAdaptiveExtensionClass() {
        // 1. 加载三个 spi 目录下的 Class<?> type(eg. Protocol) 指定的 spi 接口的所有适配类、包装类和普通实现类,进而初始化各个缓存项
        getExtensionClasses();
        // 2. 如果存在类上具有 @@Adaptive 注解的Class,则直接返回,否则创建一个 AdaptiveExtensionClass
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }

此处因为 cachedAdaptiveClass = AdaptiveExtensionFactory,所以直接返回。如果没有适配类,那么就走 createAdaptiveExtensionClass() 方法(eg. Protocol)。

3.2、自动生成适配类

    private Class<?> createAdaptiveExtensionClass() {
        // 1. 拼接 AdaptiveExtensionClass 类代码
        String code = createAdaptiveExtensionClassCode();
        ClassLoader classLoader = findClassLoader();
        // 2. 获取编译器
        com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        // 3. 使用编译器编译 AdaptiveExtensionClass 类代码
        return compiler.compile(code, classLoader);
    }

对于 Protocol 接口而言,拼接好的代码如下:

package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.extension.ExtensionLoader;

/**
 * 工厂类和代理类的结合体
 */
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    @Override
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    @Override
    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    @Override
    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) {
            throw new IllegalArgumentException("url == null");
        }
        // 1. 从 URL 中获取 spiKey
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null) {
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        }
        // 2. 根据 spiKey 获取相应的实现类
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        // 3. 调用实现类的方法
        return extension.refer(arg0, arg1);
    }

    @Override
    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) {
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        }
        if (arg0.getUrl() == null) {
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        }
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null) {
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        }
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }
}

对于方法上有 @Adaptive 注解的方法,生成相应的实现类(该实现是一个工厂 + 代理的结合体),对于方法上没有 @Adaptive 注解的方法,直接抛出异常。拼接好之后,获取编译器,调用编译器将 String code 编译为 Class。(关于编译器,后续进行分析,关于拼接源码分析,见 http://dubbo.apache.org/zh-cn/docs/source_code_guide/adaptive-extension.html

四、Dubbo IOC

    /**
     * ioc 有效的前提: objectFactory != null
     * 遍历当前的实现类(eg. DubboProtocol)中的每一个方法:
     * 1. 对于方法是 setXxx + public修饰符 + setXxx入参只有一个的方法,进行如下操作:
     * 1.1 如果该 setXxx 方法有 @DisableInject 注解,则当前方法不处理,行下一个方法的处理,否则
     * 1.2 获取 setXxx 入参类型 pt + 获取入参属性名 property,
     *     之后使用 AdaptiveExtensionFactory.getExtension(pt, property) 来获取被注入的实例
     * 1.3 调用 setXxx 将被注入的实例设置到当前的实现类中,实现 ioc
     *
     */
    private T injectExtension(T instance) {
        try {
            if (objectFactory != null) {
                for (Method method : instance.getClass().getMethods()) {
                    if (method.getName().startsWith("set") && method.getParameterTypes().length == 1 && Modifier.isPublic(method.getModifiers())) {
                        // 如果该 setXxx 方法有 @DisableInject 注解,则当前方法不处理,行下一个方法的处理
                        if (method.getAnnotation(DisableInject.class) != null) {
                            continue;
                        }
                        // 获取setXxx入参类型
                        Class<?> pt = method.getParameterTypes()[0];
                        try {
                            // 获取入参属性名 setName => name
                            String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                            // 使用 AdaptiveExtensionFactory.getExtension(pt, property) 来获取被注入的实例
                            Object object = objectFactory.getExtension(pt, property);
                            if (object != null) {
                                // 调用 setXxx 将被注入的实例设置到当前的实现类中,实现 ioc
                                method.invoke(instance, object);
                            }
                        } catch (Exception e) {
                            logger.error(...);
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return instance;
    }

从上述的分析来看,除了 ExtensionFactory 本身之外,其他 SPI 接口的 ExtensionFactory objectFactory = AdaptiveExtensionFactory

@Adaptive
public class AdaptiveExtensionFactory implements ExtensionFactory {

    /**
     * factories = [SpiExtensionFactory, SpringExtensionFactory]
     * 也就是说 dubbo 的 ioc 可以注入 "spi适配类" 和 "spring bean"
     */
    private final List<ExtensionFactory> factories;

    public AdaptiveExtensionFactory() {
        ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
        List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
        for (String name : loader.getSupportedExtensions()) {
            list.add(loader.getExtension(name));
        }
        factories = Collections.unmodifiableList(list);
    }

    @Override
    public <T> T getExtension(Class<T> type, String name) {
        for (ExtensionFactory factory : factories) {
            T extension = factory.getExtension(type, name);
            if (extension != null) {
                return extension;
            }
        }
        return null;
    }
}

public class SpiExtensionFactory implements ExtensionFactory {
    /**
     * 注意:因为这里只是返回适配类,所以 name 属性是没用的
     */
    @Override
    public <T> T getExtension(Class<T> type, String name) {
        // 如果type是接口并且有@SPI注解 && type的spi实现不为空,则返回 type 的适配实现类(一个spi接口至少有一个普通实现类,适配类起着工厂和代理的作用)
        if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
            ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
            if (!loader.getSupportedExtensions().isEmpty()) {
                return loader.getAdaptiveExtension();
            }
        }
        return null;
    }
}

public class SpringExtensionFactory implements ExtensionFactory {
    private static final Set<ApplicationContext> contexts = new ConcurrentHashSet<ApplicationContext>();
    private static final ApplicationListener shutdownHookListener = new ShutdownHookListener();

    /**
     * 调用点:
     * 1. ServiceBean<T> implements ApplicationContextAware
     * 2. ReferenceBean<T> implements ApplicationContextAware
     * 3. 手动调用
     */
    public static void addApplicationContext(ApplicationContext context) {
        contexts.add(context);
        BeanFactoryUtils.addApplicationListener(context, shutdownHookListener);
    }

    ...

    /**
     * 1. 首先按 spring bean 名称查找:遍历 Set<ApplicationContext>,如果包含 name 的 bean,则直接获取该 bean,判断该 bean 属于 type,则直接返回;否则,
     * 2. 上述没有查找到,则直接遍历 Set<ApplicationContext>,调用 ctx.getBean(type) 根据 type 来创建 bean(如果同时有多个实现类或者一个也没有,那么根据 type 获取直接抛出)
     */
    @Override
    public <T> T getExtension(Class<T> type, String name) {
        // 1. 首先按 spring bean 名称查找
        for (ApplicationContext context : contexts) {
            if (context.containsBean(name)) {
                Object bean = context.getBean(name);
                if (type.isInstance(bean)) {
                    return (T) bean;
                }
            }
        }

        // 2. 按 spring bean type 查找
        for (ApplicationContext context : contexts) {
            try {
                return context.getBean(type);
            } catch (NoUniqueBeanDefinitionException multiBeanExe) {
                // 有多个实现,报错
                ...
            } catch (NoSuchBeanDefinitionException noBeanExe) {
                // 没有实现,报错
                ...
            }
        }
        return null;
    }

    /**
     * 关闭钩子
     **/
    private static class ShutdownHookListener implements ApplicationListener {
        @Override
        public void onApplicationEvent(ApplicationEvent event) {
            if (event instanceof ContextClosedEvent) {
                // we call it anyway since dubbo shutdown hook make sure its destroyAll() is re-entrant.
                // pls. note we should not remove dubbo shutdown hook when spring framework is present, this is because
                // its shutdown hook may not be installed.
                DubboShutdownHook shutdownHook = DubboShutdownHook.getDubboShutdownHook();
                shutdownHook.destroyAll();
            }
        }
    }
}

五、Dubbo AOP

    /**
     * 调用方已经加了同步锁,无需考虑线程同步问题
     */
    private T createExtension(String name) {
        // 从普通SPI实现类map中获取传入的spiKey的实现类,例如 DubboProtocol.class
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            // 如果没找到,抛出加载SPI实现类时出现的异常
            throw findException(name);
        }
        try {
            // 从缓存中获取SPI实现类实例,如果有,直接返回,如果没有,创建
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                // 创建 DubboProtocol 实例(这里可以看出,要求SPI实现类必须具有无参构造器),放到缓存中 Map<Class<?>, Object> EXTENSION_INSTANCES
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            // Dubbo IOC
            injectExtension(instance);
            // Dubbo AOP
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw ...;
        }
    }

来看 AOP 的这段源码:

            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }

在第二小节分析《加载 SPI 实现》的过程中,会把指定的 SPI 接口的 Wrapper 类(实现了 SPI 接口 + 入参仅有一个SPI接口参数)放到 Set<Class<?>> cachedWrapperClasses 中,之后遍历 cachedWrapperClasses 循环调用 Wrapper 的单参构造器创建实例,形成一条 Wrapper 链,链尾是进行了 IOC 操作的最底层的 SPI 实现类

六、@Activate 条件注解

@Activate 类似于 spring @Conditional 注解,可根据一定的条件(group - 比如 provider、value - spiKey,比如 dubbo)自动加载多个 SPI 实现类,并且实现了这些 SPI 实现类加载的先后顺序(绝对排序 - order 和相对排序 - before/after), 下面我将 第3章 Dubbo SPI 使用姿势 分析的 @Activate激活点加载流程 再贴一遍,非常重要

  • @Activate 注解:
  • String[] group() default {}
  • 如果 getActivateExtension 接口传入的 group 参数为 null 或者 length==0,表示不限制 group,则允许加载当前 SPI 实现;
  • 查看当前的 SPI 实现的 @Activate 注解中的参数 groups 是否包含传入的限制参数 group,如果包含,则允许加载当前的 SPI 实现。
  • String[] value() default {}
  • 如果当前的 SPI 实现的 @Activate 注解没有 value() 属性,则认为默认是允许当前的 SPI 实现加载的;
  • 如果 value() 中的任一值出现在当前传入的 URL#getParameters() 中的一个参数名,则认为默认是允许当前的 SPI 实现加载的;

@Activate

/**
 * 激活点。该注解用于根据已有的条件来自动激活满足条件的多个扩展实现类。
 * eg. @Activate 可以被用于加载多个 Filter 扩展实现类。
 * SPI 服务发现者可以通过调用 {@link ExtensionLoader#getActivateExtension(URL, String, String)} 来找出所有满足给定条件的的激活扩展实现类。
 */
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
    /* 当 String[] group() 包含当前传入的 group,则该条件满足,进行其他条件的判断 */
    String[] group() default {};

    /* 当 String[] value() 中的任一值出现在当前传入的 URL#parameters() 中的一个参数名 */
    String[] value() default {};

    /* String[] before() 指定当前的 SPI 扩展实现类需要放在哪些扩展实现类之前 */
    String[] before() default {};

    /* String[] after() 指定当前的 SPI 扩展实现类需要放在哪些扩展实现类之后 */
    String[] after() default {};

    /* 指定绝对的顺序 order,order 越小,越靠前 */
    int order() default 0;
}

这里尤其注意 String[] value() 的含义,可结合 第3章 Dubbo SPI 使用姿势@Activate 的例子进行理解。

条件激活点加载原理

  • 激活点加载流程(仅列出最常用的主线,其他支线见后续的源码分析及源码注释):
  1. 首先获取除了传入的 spiKey 集合(values)指定的 spi 激活点实现类(称为 default 激活点),之后对 default 激活点进行排序

加载 default 激活点的规则:

  • 如果 getActivateExtension 接口传入的 group 参数为 null 或者 length==0,表示不限制 group,则允许加载当前 SPI 实现;
  • 如果 group 有效,则查看当前的 SPI 实现的 @Activate 注解中的参数 groups 是否包含传入的限制参数 group,如果包含,则允许加载当前的 SPI 实现;
  • 传入的spiKey 集合(values)不包含(-name,name 表示当前处理到的 SPI 激活点的 spiKey):也就是说配置 -name 可以排除掉某个实现类;
  • 如果当前的 SPI 实现的 @Activate 注解没有 value() 属性,则认为默认是加载的,直接返回 true;
  • 如果当前的 SPI 实现的 @Activate 注解有 value() 属性,遍历每一个元素,如果 url.getParameters() 中的参数名包含了其中任意一个元素(也就是说String[] value() 中的任一值出现在当前传入的 URL#parameters() 中的一个参数名)
  1. 之后获取传入的 spiKey 集合(values)指定的 SPI 激活点实现类(称为 usr 激活点)
  • 传入的spiKey 集合(values)不包含(-name,name 表示当前处理到的 SPI 激活点的 spiKey):也就是说配置 -name 可以排除掉某个实现类;
  1. 将 default 激活点集合和 usr 激活点集合放到一个集合中,default 在前,usr 在后
    public List<T> getActivateExtension(URL url, String[] values, String group) {
        // 只加载 names 之外的 spi 实现(或者称为 default 实现) + 最后将 names 指定的 spi 实现集合(usrs)也加入到了 exts 中
        List<T> exts = new ArrayList<T>();
        List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
        // 如果不包含"-default",
        if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
            // 1. 加载三个 spi 目录下的 Class<?> type(eg. Protocol) 指定的 spi 接口的所有适配类、包装类和普通实现类,进而初始化各个缓存项
            getExtensionClasses();
            for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
                String name = entry.getKey(); // spiKey
                Activate activate = entry.getValue(); // 对应的注解

                /**
                 * 匹配 group,两条规则:|| 关系
                 * 1. 如果接口传入的 group 参数为 null 或者 length==0,表示不限制 group
                 * 2. 查看当前的 @Activate 注解中的参数 groups 是否包含传入的限制参数 group
                 */
                if (isMatchGroup(group, activate.group())) {
                    // 如果 group 匹配,则创建相应的 SPI 实现类
                    T ext = getExtension(name);
                    /**
                     * 匹配 value,三条规则:&& 关系
                     * 1. 如果传入的 spi key 集合(names)不包含当前遍历的 spi 实现(spi-key=name):也就是说这个 for 循环只加载 names 之外的 spi 实现,names 指定的在 usrs 中加载
                     * 2. 传入的 spi key 集合(names)不包含(-name):也就是说配置 -name 可以排除掉某个实现类
                     * 3. 判断是否激活当前的 spi:|| 关系
                     *    - 如果 @Activate 注解没有 value() 属性,则认为默认是加载的,直接返回 true
                     *    - 如果 @Activate 注解有 value() 属性,遍历每一个元素,
                     *      如果 (url.getParameters() 中的参数名包含了其中任意一个元素 || url.getParameters() 中的参数名是以 ".任一元素" 结尾)
                     *      && url 中的对应的参数名的参数值不为空
                     */
                    if (!names.contains(name)
                            && !names.contains(Constants.REMOVE_VALUE_PREFIX + name)
                            && isActive(activate, url)) {
                        // 添加到附加列表中
                        exts.add(ext);
                    }
                }
            }
            // 对默认 spi 激活扩展进行排序
            Collections.sort(exts, ActivateComparator.COMPARATOR);
        }
        // 只加载 names 指定的 spi 实现
        List<T> usrs = new ArrayList<T>();
        for (int i = 0; i < names.size(); i++) {
            String name = names.get(i);
            // 如果传入的是 name 以 - 开头 || 传入的 names 包含 -name,则不加载该激活点
            if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                    && !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
                // 如果传入的 name 的名字是 default,则将之前所加载的 usrs 扩展点放到 exts 的最前边;
                // 否则,创建扩展点,将扩展点加入到 usrs 中
                if (Constants.DEFAULT_KEY.equals(name)) {
                    if (!usrs.isEmpty()) {
                        exts.addAll(0, usrs);
                        usrs.clear();
                    }
                } else {
                    T ext = getExtension(name);
                    usrs.add(ext);
                }
            }
        }
        // 由于使用了 List,所以 usrs 放在了 exts 之后
        if (!usrs.isEmpty()) {
            exts.addAll(usrs);
        }
        return exts;
    }

    /**
     * 查看当前的 @Activate 注解中的参数 groups 是否包含传入的限制参数 group
     * @param group  传入的限制参数
     * @param groups @Activate 注解中的参数
     */
    private boolean isMatchGroup(String group, String[] groups) {
        // 如果传入的 group 参数为 null 或者 length==0,表示不限制 group
        if (group == null || group.length() == 0) {
            return true;
        }
        if (groups != null && groups.length > 0) {
            for (String g : groups) {
                if (group.equals(g)) {
                    return true;
                }
            }
        }
        return false;
    }

    private boolean isActive(Activate activate, URL url) {
        // 如果 @Activate 注解没有 value() 属性,则认为默认是加载的,直接返回 true
        String[] keys = activate.value();
        if (keys.length == 0) {
            return true;
        }
        // 如果 @Activate 注解有 value() 属性,遍历每一个元素,
        // 如果 (url.getParameters() 中的参数名包含了其中任意一个元素 || url.getParameters() 中的参数名是以 ".任一元素" 结尾)
        // && url 中的对应的参数名的参数值不为空
        for (String key : keys) {
            for (Map.Entry<String, String> entry : url.getParameters().entrySet()) {
                String k = entry.getKey();
                String v = entry.getValue();
                if ((k.equals(key) || k.endsWith("." + key)) // url.getParameters() 中包含了其中任意一个元素
                        && ConfigUtils.isNotEmpty(v)) {
                    return true;
                }
            }
        }
        return false;
    }

相关文章

网友评论

      本文标题:第4章 Dubbo SPI 的设计与实现

      本文链接:https://www.haomeiwen.com/subject/xqtklctx.html