美文网首页
Dubbo源码解析-核心SPI扩展

Dubbo源码解析-核心SPI扩展

作者: 一个头发茂密的程序员 | 来源:发表于2021-02-02 17:34 被阅读0次

    Dubbo版本:2.7.5
    源码下载地址:https://github.com/apache/dubbo/releases/tag/dubbo-2.7.5
    编译工具:IDEA

    下载好源码后,导入idea

    • SPI机制(spi机制的思想提供一种更加灵活的,可插拔式的机制)
    • 自适应扩展点
    • IOC和 AOP
    • Dubbo与Spring 如何集成

    Dubbo核心之SPI

    1.Dubbo源码很多地方中存在这三种代码:分别是自适应扩展点,指定名称扩展点,激活扩展点
    ExtensionLoader.getExtensionLoader(xxx.class).getAdaptiveExtension() 自适应扩展点
    ExtensionLoader.getExtensionLoader(xxx.class).getExtension(name); 指定名称扩展点
    ExtensionLoader.getExtensionLoader(xxx.class).getActivateExtension(url,key) 激活扩展点

    这种扩展点实际上就是Dubbo中的SPI机制。在Springboot中的SpringFactoriesLoader就是一种SPI机制。

    java中的SPI机制的实现
    SPI机制原本是JDK内置的一种服务提供发现机制,主要用来做服务的扩展实现。例如JDK中提供了java.sql.Driver接口,这个驱动类在JDK中并没有实现,而是由不同的数据库厂商来实现,比如mysql、oracle驱动包都会去实现这个接口,然后JDK利用SPI机制从classpath中找到对应的驱动来获取指定数据库连接,这种插拔式的扩展加载方式,也同样遵循一定的协议规定。比如所有的扩展点都要放在resource/META-INF/services目录下,SPI机制会默认扫描这个路径下的属性文件完成加载

    Dubbo自定义协议扩展点

    • Dubbo和SpringFactoriesLoader 都没有使用java的SPI机制,而是利用了SPI的思想。
    • Dubbo的SPI机制都被封装在 ExtensionLoader 类中,通过ExtensionLoader 来加载指定的实现类

    Dubbo的SPI机制扩展有两个规则:

    • 和JDK内置SPI一样,需要在resouce目录下创建任一目录结构:META-INF/dubbo、META-INF/dubbo/internal、META-INF/services,在对应的目录下创建接口全路径名称文件,Dubbo会去这三个目录下加载相应扩展点
    • 文件内容和JDK SPI内容不同,是一种Key和Value形式的数据。Key:字符串 value:对应扩展点的实现。这样的方式可以按需加载指定的实现类

    实现步骤:

    • 在一个依赖了Dubbo框架的工程中,创建一个扩展点及实现,其中扩展点需要声明@SPI注解

    创建Driver 接口

    import org.apache.dubbo.common.extension.SPI;
    
    @SPI
    public interface Driver {
        String connect();
    }
    

    MysqlDriver 实现接口

    public class MysqlDriver implements Driver {
    
        @Override
        public String connect() {
            return "连接MYSQL数据库";
        }
    }
    

    在resouces文件夹下新建,以接口全路径名作为文件名


    image.png

    文件中内容:KEY: 字符串 value:具体扩展点实现


    image.png

    测试:

    public class MysqlDriverTest {
    
        @Test
        public void connect() {
            ExtensionLoader<Driver> extensionLoader = ExtensionLoader.getExtensionLoader(Driver.class);
            Driver mysqlDriver = extensionLoader.getExtension("mysqlDriver");
            System.out.println(mysqlDriver.connect());
        }
    }
    

    输出结果:


    image.png

    Dubbo SPI扩展点源码分析

    整体逻辑:先根据ExtensionLoader.getExtensionLoader获取到ExtensionLoader实例,然后再通过getExtension()方法获得指定名称的扩展点。

    ExtensionLoader.getExtensionLoader

    用来返回一个ExtensionLoader实例

        public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
            if (type == null) {
                throw new IllegalArgumentException("Extension type == null");
            } else if (!type.isInterface()) {
                throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
            } else if (!withExtensionAnnotation(type)) {
                throw new IllegalArgumentException("Extension type (" + type + ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
            } else {
                //先从缓存中获取和扩展类对应的ExtensionLoader
                ExtensionLoader<T> loader = (ExtensionLoader)EXTENSION_LOADERS.get(type);
                //缓存未命中
                if (loader == null) {
                    //创建一个新实例放入缓存,
                    EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader(type));
                    loader = (ExtensionLoader)EXTENSION_LOADERS.get(type);
                }
    
                return loader;
            }
        }
    

    ExtensionLoader的构造方法,初始化一个objectFactory

        private ExtensionLoader(Class<?> type) {
            this.type = type;
            this.objectFactory = type == ExtensionFactory.class ? null : (ExtensionFactory)getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension();
        }
    
    

    getExtension()

    这个方法用于根据指定名称获得对象的扩展点并返回。在前面的例子中,name就是mysqlDriver,返回的实例就是MysqlDriver

    • 如果name="true" 则返回一个默认的扩展类实现
    • 创建一个 Holder对象,缓存该扩展点实例
    • 如果该扩展点不存在,则createExtension()创建扩展点
       public T getExtension(String name) {
            if (StringUtils.isEmpty(name)) {
                throw new IllegalArgumentException("Extension name == null");
            // 如果name="true" 则返回一个默认的扩展类实现
            } else if ("true".equals(name)) {
                return this.getDefaultExtension();
            } else {
                //创建一个 Holder对象,缓存该扩展点实例
                Holder<Object> holder = this.getOrCreateHolder(name);
                Object instance = holder.get();
                if (instance == null) {
                    synchronized(holder) {
                        instance = holder.get();
                        if (instance == null) {
                             //如果该扩展点不存在,则createExtension()创建扩展点
                            instance = this.createExtension(name);
                            holder.set(instance);
                        }
                    }
                }
    
                return instance;
            }
        }
    

    createExtension()

        private T createExtension(String name) {
            //getExtensionClasses()获取到一个扩展类
            Class<?> clazz = (Class)this.getExtensionClasses().get(name);
            if (clazz == null) {
                throw this.findException(name);
            } else {
                try {
                    //查询缓存中是否存在该类的实例
                    T instance = EXTENSION_INSTANCES.get(clazz);
                    if (instance == null) {
                        //不存在 放入实例缓存
                        EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                        instance = EXTENSION_INSTANCES.get(clazz);
                    }
                    
                    //依赖注入
                    this.injectExtension(instance);
                    //通过wrapper进行包装
                    Set<Class<?>> wrapperClasses = this.cachedWrapperClasses;
                    Class wrapperClass;
                    if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                        for(Iterator var5 = wrapperClasses.iterator(); var5.hasNext(); instance = this.injectExtension(wrapperClass.getConstructor(this.type).newInstance(instance))) {
                            wrapperClass = (Class)var5.next();
                        }
                    }
    
                    return instance;
                } catch (Throwable var7) {
                    throw new IllegalStateException("Extension instance (name: " + name + ", class: " + this.type + ") couldn't be instantiated: " + var7.getMessage(), var7);
                }
            }
        }
    

    getExtensionClasses()

        private Map<String, Class<?>> getExtensionClasses() {
            //从缓存中获取已经被加载的扩展类
            Map<String, Class<?>> classes = (Map)this.cachedClasses.get();
            //缓存未命中
            if (classes == null) {
                synchronized(this.cachedClasses) {
                    classes = (Map)this.cachedClasses.get();
                    if (classes == null) {
                        //通过loadExtensionClasses 加载扩展类
                        classes = this.loadExtensionClasses();
                        //放入扩展类缓存
                        this.cachedClasses.set(classes);
                    }
                }
            }
    
            return classes;
        }
    

    loadExtensionClasses()

        private Map<String, Class<?>> loadExtensionClasses() {
            //获取当前接口默认的扩展类
            this.cacheDefaultExtensionName();
            Map<String, Class<?>> extensionClasses = new HashMap();
            //loadDirectory是用来根据传入的路径和type的全路径名称找到对应的文件,解析内容并放到extensionClasses集合中
            this.loadDirectory(extensionClasses, "META-INF/dubbo/internal/", this.type.getName());
            this.loadDirectory(extensionClasses, "META-INF/dubbo/internal/", this.type.getName().replace("org.apache", "com.alibaba"));
            this.loadDirectory(extensionClasses, "META-INF/dubbo/", this.type.getName());
            this.loadDirectory(extensionClasses, "META-INF/dubbo/", this.type.getName().replace("org.apache", "com.alibaba"));
            this.loadDirectory(extensionClasses, "META-INF/services/", this.type.getName());
            this.loadDirectory(extensionClasses, "META-INF/services/", this.type.getName().replace("org.apache", "com.alibaba"));
            return extensionClasses;
        }
    

    cacheDefaultExtensionName()

        private void cacheDefaultExtensionName() {
            //获取到type类声明的注解@SPI
            SPI defaultAnnotation = (SPI)this.type.getAnnotation(SPI.class);
            if (defaultAnnotation != null) {
                //获取注解中定义的value值
                String value = defaultAnnotation.value();
                if ((value = value.trim()).length() > 0) {
                    String[] names = NAME_SEPARATOR.split(value);
                    if (names.length > 1) {
                        throw new IllegalStateException("More than 1 default extension name on extension " + this.type.getName() + ": " + Arrays.toString(names));
                    }
    
                    if (names.length == 1) {
                        this.cachedDefaultName = names[0];
                    }
                }
            }
    
        }
    

    SPI根据名称获取扩展点源码逻辑:

    • 1.获取ExtensionLoader实例
    • 2.根据实例的type全路径在缓存中查找是否存在该实例
    • 3.不存在则就去指定的文件目录下根据type全路径查询该扩展点,放入缓存

    自适应扩展点

    自适应扩展点也可以被理解为适配器扩展点
    能够根据上下文动态匹配的一个扩展类

    ExtensionLoader.getExtensionLoader(xxx.class).getAdaptiveExtension()

    自适应扩展点通过@Adaptive注解来声明
    两种使用方式:

    • 注解定义在类上面,表示当前类的自适应扩展类
      例: AdaptiveCompiler 类就是自适应扩展类,通过ExtensionLoader.getExtensionLoader(Compiler .class).getAdaptiveExtension() 就可以获取到AdaptiveCompiler 类的实例
    @Adaptive
    public class AdaptiveCompiler implements Compiler {
    
        private static volatile String DEFAULT_COMPILER;
    
        public static void setDefaultCompiler(String compiler) {
            DEFAULT_COMPILER = compiler;
        }
    
        @Override
        public Class<?> compile(String code, ClassLoader classLoader) {
            Compiler compiler;
            ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class);
            String name = DEFAULT_COMPILER; // copy reference
            if (name != null && name.length() > 0) {
                compiler = loader.getExtension(name);
            } else {
                compiler = loader.getDefaultExtension();
            }
            return compiler.compile(code, classLoader);
        }
    }
    
    
    • 注解定义在方法层面,会通过动态代理的方式生成一个动态字节码,进行自适应匹配
    @SPI("dubbo")
    public interface Protocol {
    
        int getDefaultPort();
    
        @Adaptive
        <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    
        @Adaptive
        <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    
        void destroy();
    
        default List<ProtocolServer> getServers() {
            return Collections.emptyList();
        }
    
    }
    

    扩展类中的两个方法声明了 @Adaptive注解,意味着这是一个自适应方法。

        private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
    

    在Protocol接口的源码中,自适应扩展点的声明在方法层面上,所以和类级别的声明不同。这里的protocol是一个动态代理类,基于javassist动态生成的字节码来实现方法级别的自适应调用。简单来说,就是调用方法时会根据上下文自动匹配到某个具体实现类的方法中。

    ExtensionLoader.getExtensionLoader(Compiler .class).getAdaptiveExtension()源码实现

    getAdaptiveExtension()方法就做了两件事:

    • 1.从缓存中获取自适应扩展点实例
    • 2.如果缓存未命中,则通过createAdaptiveExtension()方法创建实例,并放入缓存(适配器模式)
        public T getAdaptiveExtension() {
            //从缓存中获取自适应扩展点实例
            Object instance = cachedAdaptiveInstance.get();
            if (instance == null) {
                if (createAdaptiveInstanceError != null) {
                    throw new IllegalStateException("Failed to create adaptive instance: " +
                            createAdaptiveInstanceError.toString(),
                            createAdaptiveInstanceError);
                }
    
                //创建自适应扩展点实例,并放置到缓存中
                synchronized (cachedAdaptiveInstance) {
                    instance = cachedAdaptiveInstance.get();
                    if (instance == null) {
                        try {
                            instance = createAdaptiveExtension();
                            cachedAdaptiveInstance.set(instance);
                        } catch (Throwable t) {
                            createAdaptiveInstanceError = t;
                            throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                        }
                    }
                }
            }
    
            return (T) instance;
        }
    
        private T createAdaptiveExtension() {
            try {
              // getAdaptiveExtensionClass().newInstance() 获取一个自适应扩展类实例
             //injectExtension 完成依赖注入
                return injectExtension((T) getAdaptiveExtensionClass().newInstance());
            } catch (Exception e) {
                throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
            }
        }
    
        private Class<?> getAdaptiveExtensionClass() {
            //获取所有传入类型所有的扩展点,缓存到一个集合中
            getExtensionClasses();
            if (cachedAdaptiveClass != null) {
              //cachedAdaptiveClass 应该是在loadExtensionClasses() 方法解析指定扩展点加载进来,在加载完成后,如果某个类上定义了@Adaptive注解,则会赋值给cachedAdaptiveClass 
                return cachedAdaptiveClass;
            }
    //如果cachedAdaptiveClass 为空,则调用createAdaptiveExtensionClass 进行创建
            return cachedAdaptiveClass = createAdaptiveExtensionClass();
        }
    
        private Class<?> createAdaptiveExtensionClass() {
            //code  动态拼接code
            String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
            ClassLoader classLoader = findClassLoader();
            org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
            //通过compiler进行动态编译
            return compiler.compile(code, classLoader);
        }
    

    利用动态代理生成一个自适应扩展类,可以根据dubbo服务配置的协议名称,通过getExtension()获取到相应的扩展类

    Dubbo中的IOC和AOP

    IOC(控制反转)

    IOC机制是通过DI(依赖注入)来实现的

    上面说的injectExtension()方法就是依赖注入的实现。

        private T injectExtension(T instance) {
    
            if (objectFactory == null) {
                return instance;
            }
    
            try {
                //遍历被加载的扩展类的set方法
                for (Method method : instance.getClass().getMethods()) {
                    if (!isSetter(method)) {
                        continue;
                    }
                    /**
                     * Check {@link DisableInject} to see if we need auto injection for this property
                     */
                    if (method.getAnnotation(DisableInject.class) != null) {
                        continue;
                    }
                  //获取set方法中的参数类型
                    Class<?> pt = method.getParameterTypes()[0];
                    //如果不是对象类型就跳过
                    if (ReflectUtils.isPrimitives(pt)) {
                        continue;
                    }
    
                    try {
                        //获取方法对应的属性名称
                        String property = getSetterProperty(method);
                        //根据class 和 name,使用扩展点加载并通过set方法进行赋值
                        Object object = objectFactory.getExtension(pt, property);
                        if (object != null) {
                            method.invoke(instance, object);
                        }
                    } catch (Exception e) {
                        logger.error("Failed to inject via method " + method.getName()
                                + " of interface " + type.getName() + ": " + e.getMessage(), e);
                    }
    
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
            return instance;
        }
    

    injectExtension方法的作用:如果当前加载的扩展类中存在一个成员对象,并且为他提供了set方法,那么就会通过自适应扩展点加载并赋值。

    AOP

        private T createExtension(String name) {
            Class<?> clazz = getExtensionClasses().get(name);
            if (clazz == null) {
                throw findException(name);
            }
            try {
                T instance = (T) EXTENSION_INSTANCES.get(clazz);
                if (instance == null) {
                    EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                    instance = (T) EXTENSION_INSTANCES.get(clazz);
                }
                injectExtension(instance);
                Set<Class<?>> wrapperClasses = cachedWrapperClasses;
                if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                    for (Class<?> wrapperClass : wrapperClasses) {
                        //用到了依赖注入和AOP思想。Aop思想的提现是基于wrapper装饰器类实现对原有的扩展类instance 的封装
                        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                    }
                }
                initExtension(instance);
                return instance;
            } catch (Throwable t) {
                throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                        type + ") couldn't be instantiated: " + t.getMessage(), t);
            }
        }
    

    下一篇将解析Dubbo和Spring集成的原理,冲吧,骚年!!!

    相关文章

      网友评论

          本文标题:Dubbo源码解析-核心SPI扩展

          本文链接:https://www.haomeiwen.com/subject/jboetltx.html