美文网首页分布式
SOFARPC 源码分析2 - SPI 扩展机制

SOFARPC 源码分析2 - SPI 扩展机制

作者: 原水寒 | 来源:发表于2018-11-18 11:46 被阅读113次

    大部分 RPC 框架都会通过使用 SPI 扩展机制来实现高可扩展性,例如 Dubbo,SOFARPC 等。但是 JDK-SPI 由于其显著的缺点,RPC 框架通常会定制自己的 SPI 框架。
    JDK-SPI 这里介绍了 JDK-SPI 的原理和缺点;
    Dubbo-SPI 这里介绍了 Dubbo-SPI 的设计和实现。

    一、SOFARPC-SPI 的使用

    1.1、可扩展接口

    package com.alipay.sofa.rpc.client;
    
    @Extensible(singleton = false) // 可扩展接口的标识,并提供一些可选参数
    public abstract class LoadBalancer {
        ...
    }
    

    @Extensible 该注解通常添加在 接口 或者 抽象类 上,用于标识该 接口 或者 抽象类 是一个可扩展接口或者可扩展的抽象类。

    1.2、可扩展接口的实现

    package com.alipay.sofa.rpc.zjg;
    
    @Extension("hello") // 可扩展接口实现的标识,并且指定该实现的 alias
    public class MyLoadBalancer extends LoadBalancer {
        ...   
    }
    

    @Extension 该注解添加在可扩展接口或抽象类的实现上,并且通常会添加一个 alias 用于后续的动态加载。

    1.3、编写接口实现的配置文件

    src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.client.LoadBalancer

    hello=com.alipay.sofa.rpc.zjg.MyLoadBalancer
    

    注意:

    • 配置文件的文件名是 可扩展接口 的全类名
    • 配置文件的内容是 alias=具体实现的全类名
    • 配置文件的存放位置默认由 rpc-config-default.json 中的 "extension.load.path" 来指定,可以通过自定义 sofa-rpc/rpc-config.json 文件或者 META-INF/sofa-rpc/rpc-config.json 中指定 "extension.load.path" 来覆盖,默认配置是
     "extension.load.path": [
      "META-INF/services/sofa-rpc/",
       "META-INF/services/"
     ]
    

    1.4、运行时动态选择接口实现

    ExtensionLoader<LoadBalancer> loader = ExtensionLoaderFactory.getExtensionLoader(LoadBalancer.class);
    LoadBalancer myLoadBalancer = loader.getExtension("hello");
    

    首先获取 可扩展接口ExtensionLoader
    然后通过 ExtensionLoader 加载指定 alias具体实现

    二、SOFARPC-SPI 的设计

    image.png
    • @Extensible:该注解通常添加在 接口 或者 抽象类 上,用于标识该 接口 或者 抽象类 是一个可扩展接口或者可扩展的抽象类
    • @Extension:该注解添加在可扩展接口或抽象类的 实现上,并且通常会添加一个 alias 用于后续的动态加载
    • ExtensionClass:一个 实现类会最终被其 ExtensionLoader 加载称为一个 ExtensionClass,存储在其
      ExtensionLoader 中,并且包含了实例化 ExtensionClass 存储的 实现类 的方法
    • ExtensionLoader:每一个 可扩展接口可扩展抽象类 都有一个 ExtensionLoader,用于从相应接口的 SPI 配置文件中读取配置内容并且将每一行解析成一个 ExtensionClass(每一个 ExtensionClass 对应一个实现,SPI 配置文件中的每一行配置一个实现类),之后存储 <alias, ExtensionClass> 配置对到 Map<String, ExtensionClass<T>> 容器中
    • ExtensionLoaderListener:当将一个 ExtensionClass 成功的添加到 Map 容器中的时候,触发 ExtensionLoaderFactory 创建的 ExtensionLoaderListener 监听器
    • ExtensionLoaderFactory:用来获取或者创建 ExtensionLoader,将创建好的 ExtensionLoader 放置在 Map<Class, ExtensionLoader> 容器中

    各组件的关系:图片出处

    image.png

    2.1、@Extensible

    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.TYPE})
    public @interface Extensible {
    
        // 指定自定义扩展文件名称,默认就是可扩展接口是可扩展抽象类的全类名
        String file() default "";
    
        // 扩展类是否使用单例,默认使用
        boolean singleton() default true;
    
        // 扩展类是否需要编码,默认不需要
        boolean coded() default false;
    }
    

    2.2、@Extension

    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.TYPE})
    public @interface Extension {
        // 扩展点名字 alias
        String value();
    
        // 扩展点编码,默认不需要,当接口需要编码的时候需要
        byte code() default -1;
    
        // 优先级排序,默认不需要,大的优先级高
        int order() default 0;
    
        // 是否覆盖其它低order的同名扩展
        boolean override() default false;
    
        // 排斥其它扩展,可以排斥掉其它低order的扩展
        String[] rejection() default {};
    }
    

    2.3、ExtensionLoaderFactory

    public class ExtensionLoaderFactory {
        private ExtensionLoaderFactory() {
        }
    
        // key: 可扩展接口或者可扩展抽象类
        private static final ConcurrentMap<Class, ExtensionLoader> LOADER_MAP = new ConcurrentHashMap<Class, ExtensionLoader>();
    
        // Get extension loader by extensible class with listener
        // 1. 从 LOADER_MAP 容器中获取 clazz 对应的 ExtensionLoader
        // 2. 如果没有,则使用双重检测同步创建一个 ExtensionLoader
        // 3. 将 {clazz, ExtensionLoader} 对存储到 LOADER_MAP 中
        // 4. 返回 ExtensionLoader
        public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz, ExtensionLoaderListener<T> listener) {
            ExtensionLoader<T> loader = LOADER_MAP.get(clazz);
            if (loader == null) {
                synchronized (ExtensionLoaderFactory.class) {
                    loader = LOADER_MAP.get(clazz);
                    if (loader == null) {
                        loader = new ExtensionLoader<T>(clazz, listener);
                        LOADER_MAP.put(clazz, loader);
                    }
                }
            }
            return loader;
        }
    
        // Get extension loader by extensible class without listener
        public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz) {
            return getExtensionLoader(clazz, null);
        }
    }
    

    2.4、ExtensionLoader

    // 一个可扩展接口类,对应一个加载器
    public class ExtensionLoader<T> {
        // SOFARPC 自己编写的 Log 体系
        // getLogger 的执行会触发 RpcConfigs 的 static 块,在该 static 块中会做读取默认配置的操作
        // 读取路径有以下三个
        // 1. rpc-config-default.json(默认的配置文件)
        // 2. sofa-rpc/rpc-config.json(自定义)
        // 3. META-INF/sofa-rpc/rpc-config.json(自定义)
        private final static Logger LOGGER = LoggerFactory.getLogger(ExtensionLoader.class);
        // 当前加载的可扩展接口类
        protected final Class<T> interfaceClass;
        // 当前加载的可扩展接口类名
        protected final String interfaceName;
        // 扩展点注解
        protected final Extensible extensible;
        // 全部的加载的实现类 {"alias":ExtensionClass}
        protected final ConcurrentMap<String, ExtensionClass<T>> all;
        // 如果是单例,那么factory不为空
        protected final ConcurrentMap<String, T> factory;
        // 加载监听器
        protected final ExtensionLoaderListener<T> listener;
    
        public ExtensionLoader(Class<T> interfaceClass, ExtensionLoaderListener<T> listener) {
            this(interfaceClass, true, listener);
        }
    
        protected ExtensionLoader(Class<T> interfaceClass) {
            this(interfaceClass, true, null);
        }
    
        /**
         * interfaceClass 接口类
         * autoLoad       是否自动开始加载,默认开启
         * listener       扩展加载监听器
         */
        protected ExtensionLoader(Class<T> interfaceClass, boolean autoLoad, ExtensionLoaderListener<T> listener) {
            ...
            // 接口为空,既不是接口,也不是抽象类
            if (interfaceClass == null || !(interfaceClass.isInterface() || Modifier.isAbstract(interfaceClass.getModifiers()))) {
                throw new IllegalArgumentException("Extensible class must be interface or abstract class!");
            }
            this.interfaceClass = interfaceClass;
            this.interfaceName = ClassTypeUtils.getTypeStr(interfaceClass);
            this.listener = listener;
            Extensible extensible = interfaceClass.getAnnotation(Extensible.class);
            // 如果可扩展接口没有配置 @Extensible 注解,直接抛错
            if (extensible == null) {
                throw new IllegalArgumentException("Error when load extensible interface " + interfaceName + ", must add annotation @Extensible.");
            } else {
                this.extensible = extensible;
            }
            this.factory = extensible.singleton() ? new ConcurrentHashMap<String, T>() : null;
            this.all = new ConcurrentHashMap<String, ExtensionClass<T>>();
            // 如果开启了自动加载(默认开启),则进行 SPI 配置文件的读取
            // 1. 从全局默认配置中获取 SPI 扩展文件的路径
            // 2. 循环遍历每一个 SPI 扩展文件,读取配置内容并解析。
            if (autoLoad) {
                List<String> paths = RpcConfigs.getListValue(RpcOptions.EXTENSION_LOAD_PATH);
                for (String path : paths) {
                    loadFromFile(path);
                }
            }
        }
    
        // path必须以/结尾
        protected synchronized void loadFromFile(String path) {
            // 默认如果不指定文件名字,就是接口名
            String file = StringUtils.isBlank(extensible.file()) ? interfaceName : extensible.file().trim();
            String fullFileName = path + file;
            ClassLoader classLoader = ClassLoaderUtils.getClassLoader(getClass());
            loadFromClassLoader(classLoader, fullFileName);
        }
    
        protected void loadFromClassLoader(ClassLoader classLoader, String fullFileName) throws Throwable {
            // 获取并加载 SPI 配置文件
            Enumeration<URL> urls = classLoader != null ? classLoader.getResources(fullFileName) : ClassLoader.getSystemResources(fullFileName);
            // 可能存在多个文件
            if (urls != null) {
                while (urls.hasMoreElements()) {
                    // 读取一个文件
                    URL url = urls.nextElement();
                    BufferedReader reader = null;
                    reader = new BufferedReader(new InputStreamReader(url.openStream(), "UTF-8"));
                    String line;
                    // 每次读取一行,每一行就是一个 SPI 实现类配置
                    while ((line = reader.readLine()) != null) {
                        readLine(url, line);
                    }
                }
            }
        }
    
        protected void readLine(URL url, String line) {
            String[] aliasAndClassName = parseAliasAndClassName(line);
            if (aliasAndClassName == null || aliasAndClassName.length != 2) {
                return;
            }
            String alias = aliasAndClassName[0];
            String className = aliasAndClassName[1];
            // 读取配置的实现类
            Class tmp = ClassUtils.forName(className, false);
            // 如果实现类不是可扩展接口的实现类,抛错
            if (!interfaceClass.isAssignableFrom(tmp)) {
                throw new IllegalArgumentException(...);
            }
            Class<? extends T> implClass = (Class<? extends T>) tmp;
    
            // 检查是否有可扩展标识
            Extension extension = implClass.getAnnotation(Extension.class);
            if (extension == null) {
                throw new IllegalArgumentException(...);
            } else {
                String aliasInCode = extension.value();
                // 实现类的 @Extension 注解必须配置 value(), 即 alias,否则抛错
                if (StringUtils.isBlank(aliasInCode)) {
                    throw new IllegalArgumentException(...);
                }
                if (alias == null) {
                    // 如果 spi 配置文件里没配置 alias,则直接使用实现类的 @Extension 注解里的 alias
                    // 所以:spi 配置文件是允许不配置为 alias=XxxImpl 的,可以直接配置为 XxxImpl,此时的 alias 读取的就是 XxxImpl 的 @Extension 注解里的 alias
                    alias = aliasInCode;
                } else {
                    // spi 配置文件里配置的和代码里的不一致,直接抛错
                    if (!aliasInCode.equals(alias)) {
                        throw new IllegalArgumentException(...);
                    }
                }
                // 接口需要编号,实现类没设置
                if (extensible.coded() && extension.code() < 0) {
                    throw new IllegalArgumentException(...);
                }
            }
            // alias 不可以配置为 default 和 *
            if (StringUtils.DEFAULT.equals(alias) || StringUtils.ALL.equals(alias)) {
                throw new IllegalArgumentException(...);
            }
            // 检查是否有存在同 alias 的 ExtensionClass
            // 进行覆盖逻辑
            ExtensionClass old = all.get(alias);
            ExtensionClass<T> extensionClass = null;
            if (old != null) {
                // 如果当前扩展可以覆盖其它同名扩展
                if (extension.override()) {
                    // 如果优先级还没有旧的高,则忽略;如果优先级高于旧的
                    if (extension.order() >= old.getOrder()) {
                        // 如果当前扩展可以覆盖其它同名扩展
                        extensionClass = buildClass(extension, implClass, alias);
                    }
                }
                // 如果当前扩展不可以覆盖其它同名扩展
                else {
                    if (old.isOverride() || old.getOrder() < extension.order()) {
                        // 如果不能被覆盖,抛出已存在异常
                        throw new IllegalStateException("Duplicate class with same alias: " + alias);
                    }
                }
            } else {
                // 如果不存在同 alias 的 ExtensionClass,则直接新建一个 ExtensionClass
                extensionClass = buildClass(extension, implClass, alias);
            }
    
            // 做互斥扩展点排除逻辑
            if (extensionClass != null) {
                // 检查是否有互斥的扩展点
                for (Map.Entry<String, ExtensionClass<T>> entry : all.entrySet()) {
                    ExtensionClass existed = entry.getValue();
                    if (extensionClass.getOrder() >= existed.getOrder()) {
                        // 新的优先级 >= 老的优先级,检查新的扩展是否排除老的扩展
                        String[] rejection = extensionClass.getRejection();
                        if (CommonUtils.isNotEmpty(rejection)) {
                            for (String rej : rejection) {
                                existed = all.get(rej);
                                // 只排除低 order 的扩展点
                                if (existed == null || extensionClass.getOrder() < existed.getOrder()) {
                                    continue;
                                }
                                all.remove(rej);
                            }
                        }
                    } else {
                        // 如果 新的优先级 < 老的优先级,判断是否需要将新的排除掉
                        String[] rejection = existed.getRejection();
                        if (CommonUtils.isNotEmpty(rejection)) {
                            for (String rej : rejection) {
                                if (rej.equals(extensionClass.getAlias())) {
                                    // 被其它扩展排掉
                                    return;
                                }
                            }
                        }
                    }
                }
    
                // 执行监听器逻辑,将 {alias : ExtensionClass} 添加到容器中
                loadSuccess(alias, extensionClass);
            }
        }
    
        private ExtensionClass<T> buildClass(Extension extension, Class<? extends T> implClass, String alias) {
            ExtensionClass<T> extensionClass = new ExtensionClass<T>(implClass, alias);
            extensionClass.setCode(extension.code());
            extensionClass.setSingleton(extensible.singleton());
            extensionClass.setOrder(extension.order());
            extensionClass.setOverride(extension.override());
            extensionClass.setRejection(extension.rejection());
            return extensionClass;
        }
    
        private void loadSuccess(String alias, ExtensionClass<T> extensionClass) {
            if (listener != null) {
                listener.onLoad(extensionClass); // 加载完毕,通知监听器
                all.put(alias, extensionClass);
            } else {
                all.put(alias, extensionClass);
            }
        }
    
        public ConcurrentMap<String, ExtensionClass<T>> getAllExtensions() {
            return all;
        }
    
        // 根据服务别名查找扩展类
        public ExtensionClass<T> getExtensionClass(String alias) {
            return all == null ? null : all.get(alias);
        }
    
        // 获取扩展点实例
        public T getExtension(String alias) {
            ExtensionClass<T> extensionClass = getExtensionClass(alias);
            if (extensionClass == null) {
                throw new SofaRpcRuntimeException(...);
            } else {
                // 如果是单例,获取或者双重检测创建 T
                if (extensible.singleton() && factory != null) {
                    T t = factory.get(alias);
                    if (t == null) {
                        synchronized (this) {
                            t = factory.get(alias);
                            if (t == null) {
                                t = extensionClass.getExtInstance();
                                factory.put(alias, t);
                            }
                        }
                    }
                    return t;
                } else {
                    // 如果不是单例,直接创建 T
                    return extensionClass.getExtInstance();
                }
            }
        }
    
        /**
         * 得到实例
         *
         * @param alias    别名
         * @param argTypes 扩展初始化需要的参数类型
         * @param args     扩展初始化需要的参数
         * @return 扩展实例(已判断是否单例)
         */
        public T getExtension(String alias, Class[] argTypes, Object[] args) {
            ExtensionClass<T> extensionClass = getExtensionClass(alias);
            if (extensionClass == null) {
                throw new SofaRpcRuntimeException("Not found extension of " + interfaceName + " named: \"" + alias + "\"!");
            } else {
                if (extensible.singleton() && factory != null) {
                    T t = factory.get(alias);
                    if (t == null) {
                        synchronized (this) {
                            t = factory.get(alias);
                            if (t == null) {
                                t = extensionClass.getExtInstance(argTypes, args);
                                factory.put(alias, t);
                            }
                        }
                    }
                    return t;
                } else {
                    return extensionClass.getExtInstance(argTypes, args);
                }
            }
        }
    }
    

    2.5、ExtensionLoaderListener

    public interface ExtensionLoaderListener<T> {
        // 当扩展点成功加载时,触发的事件
        void onLoad(ExtensionClass<T> extensionClass);
    }
    

    2.6、ExtensionClass

    // 扩展接口实现类
    public class ExtensionClass<T> implements Sortable {
        // 扩展接口实现类
        protected final Class<? extends T> clazz;
        // 扩展实现类别名
        protected final String alias;
        // 扩展编码,必须唯一
        protected byte code;
        // 是否单例
        protected boolean singleton;
        // 扩展点排序值,大的优先级高
        protected int order;
        // 是否覆盖其它低order的同名扩展
        protected boolean override;
        // 排斥其它扩展,可以排斥掉其它低order的扩展
        protected String[] rejection;
        // 服务端实例对象(只在是单例的时候保留)
        private volatile transient T instance;
    
        /**
         * 构造函数
         * @param clazz 扩展实现类名
         * @param alias 扩展别名
         */
        public ExtensionClass(Class<? extends T> clazz, String alias) {
            this.clazz = clazz;
            this.alias = alias;
        }
    
        // 得到服务端实例对象,如果是单例则返回单例对象,如果不是则返回新创建的实例对象
        public T getExtInstance() {
            return getExtInstance(null, null);
        }
    
        /**
         * 得到服务端实例对象,如果是单例则返回单例对象,如果不是则返回新创建的实例对象(全部都是反射创建)
         * @param argTypes 构造函数参数类型
         * @param args     构造函数参数值
         */
        public T getExtInstance(Class[] argTypes, Object[] args) {
            if (clazz != null) {
                if (singleton) { // 如果是单例
                    if (instance == null) {
                        synchronized (this) {
                            if (instance == null) {
                                instance = ClassUtils.newInstanceWithArgs(clazz, argTypes, args);
                            }
                        }
                    }
                    return instance; // 保留单例
                } else {
                    return ClassUtils.newInstanceWithArgs(clazz, argTypes, args);
                }
            }
            throw new SofaRpcRuntimeException("Class of ExtensionClass is null");
        }
    }
    

    三、SOFARPC-SPI 流程总结

    ExtensionLoader<LoadBalancer> loader = ExtensionLoaderFactory.getExtensionLoader(LoadBalancer.class);
    
    1. Map<Class, ExtensionLoader> LOADER_MAP 容器中获取 clazz(可扩展接口或可扩展抽象类)对应的 ExtensionLoader
    2. 如果没有,则使用双重检测同步创建一个 ExtensionLoader

    2.1. 从全局默认配置文件(默认是 rpc-config-default.json)中获取 SPI 扩展文件的路径
    2.2. 循环遍历每一个 SPI 扩展文件,对每一个 SPI 扩展文件做如下操作

    2.2.1. 读取每一行(每一行是配置一个实现类,每个实现类会被解析成一个 ExtensionClass
    2.2.2. 获取 alias:如果 SPI 配置文件中没有配置 alias,则使用实现类的 @Extension 注解中的的 alias,否则,SPI 配置文件中配置的 alias 必须与实现类的 @Extension 注解中的 alias 同名
    2.2.3. 检查当前的 Map<String, ExtensionClass<T>> all 中是否有同 alias 的 ExtensionClass,如果没有,直接创建当前实现类的 ExtensionClass,否则进行覆盖逻辑(如果当前的实现类可以覆盖且优先级高于旧的,则直接覆盖)
    2.2.4. 做互斥扩展点的排除逻辑:循环遍历 Map<String, ExtensionClass<T>> all 中的每一个 ExtensionClass,如果当前实现类的 ExtensionClass 的优先级大于等于遍历到的 ExtensionClass,并且当前实现类的 ExtensionClass 的互斥扩展集合包含遍历到的 ExtensionClass,则从 all 映射中删除遍历到的 ExtensionClass;如果当前实现类的 ExtensionClass 的优先级小于遍历到的 ExtensionClass,并且遍历到的 ExtensionClass 的互斥扩展集合包含当前实现类的 ExtensionClass,则直接 return,不再将当前的实现类的 ExtensionClass 放到 all 中(5.5.0-SNAPSHOT 版本中该实现有个问题:https://github.com/alipay/sofa-rpc/issues/367
    2.2.5. 如果配置了 ExtensionLoaderListener,则执行该监听器的监听逻辑,之后将 {alias : 当前实现类的 ExtensionClass} 放置到 all 映射中

    1. {clazz, ExtensionLoader} 对存储到 LOADER_MAP
    2. 返回 ExtensionLoader
    LoadBalancer myLoadBalancer = loader.getExtension("hello");
    
    1. 从当期可扩展接口的 ExtensionLoaderMap<String, ExtensionClass<T>> all 中获取指定 alias 的 ExtensionClass
    2. 如果配置为单例,则获取或使用双重检测反射创建 ExtensionClass 的实例,并将创建好的实例塞入 Map<String, T> factory,方便下次使用;否则,直接反射创建 ExtensionClass 的实例

    加载原理图:图片地址

    image.png

    相关文章

      网友评论

        本文标题:SOFARPC 源码分析2 - SPI 扩展机制

        本文链接:https://www.haomeiwen.com/subject/kbyvfqtx.html