美文网首页
dubbo的内核剖析

dubbo的内核剖析

作者: 剑道_7ffc | 来源:发表于2020-06-01 16:52 被阅读0次

    SPI机制

    Java SPI

    SPI(service provider interface),在运行时,动态加载接口实现如jdbc和日志框架用到,场景是我定义标准,具体的实现由各个厂商来实现。

    实现SPI的标准

    1 解析spi使用的默认目录是resource的META-INF/service
    2 文件名为接口的全路径,文件内容是实现类的全路径。


    image.png
    image.png

    3 通过ServiceLoader.load方法来发现服务

    public static void main( String[] args )
    {
        ServiceLoader<DatabaseDriver> serviceLoader=ServiceLoader.load(DatabaseDriver.class);
        for(DatabaseDriver databaseDriver:serviceLoader){
            System.out.println(databaseDriver.buildConnect("test"));
        }
    }
    
    SPI 的实际应用

    数据库的连接接口是java.sql.Driver,实现由各厂商实现的。


    image.png
    SPI 的缺点

    1 SPI会一次性加载META-INF/service的所有实现类,很耗资源和时间。
    2 如果扩展点加载失败,会导致调用方报错,而且这个错误很难定位到是这个原因

    Dubbo 优化后的 SPI 机制

    自定义SPI扩展

    1 解析spi使用的默认目录是META-INF/dubbo 或者 META-INF/dubbo/internal 或者 META-INF/services
    2 文件名是接口名,文件内容以key-value来呈现,key表示自定义id,value是接口的实现类。


    image.png
    image.png

    3 通过ExtensionLoader.getExtensionLoader发现服务

    public static void main(String[] args) {
        Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("myprotocol");
        System.out.println(protocol.getDefaultPort());
    }
    
    原理实现

    扩展点就是通过配置文件来指定某个接口的实现类。
    1 org.apache.dubbo.common.extension.ExtensionLoader#loadExtensionClasses
    加载指定目录下的文件

    private Map<String, Class<?>> loadExtensionClasses() {
        Map<String, Class<?>> extensionClasses = new HashMap<>();
        loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
        loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
        loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
        loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
        loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
        loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
        return extensionClasses;
    }
    

    2 org.apache.dubbo.common.extension.ExtensionLoader#getExtensionClasses
    放入缓存,Holder<Map<String, Class<?>>> cachedClasses,key是文件内容key的值,value是文件内容的value的值。

    private Map<String, Class<?>> getExtensionClasses() {
        Map<String, Class<?>> classes = cachedClasses.get();
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
    }
    

    3 org.apache.dubbo.common.extension.ExtensionLoader#createExtension
    创建扩展点实例并注入,并放入ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES中,key是实现类,value是实现类的对象。

    private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                    type + ") couldn't be instantiated: " + t.getMessage(), t);
        }
    }
    

    4 org.apache.dubbo.common.extension.ExtensionLoader#injectExtension
    set属性注入

    private T injectExtension(T instance) {
        try {
            if (objectFactory != null) {
                for (Method method : instance.getClass().getMethods()) {
                    if (isSetter(method)) {
                        /**
                         * Check {@link DisableInject} to see if we need auto injection for this property
                         */
                        if (method.getAnnotation(DisableInject.class) != null) {
                            continue;
                        }
                        Class<?> pt = method.getParameterTypes()[0];
                        if (ReflectUtils.isPrimitives(pt)) {
                            continue;
                        }
                        try {
                            String property = getSetterProperty(method);
                            Object object = objectFactory.getExtension(pt, property);
                            if (object != null) {
                                method.invoke(instance, object);
                            }
                        } catch (Exception e) {
                            logger.error("Failed to inject via method " + method.getName()
                                    + " of interface " + type.getName() + ": " + e.getMessage(), e);
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        return instance;
    }
    

    Adaptive 自适应扩展点

    通过Adaptive来动态选择某个接口的实现类。某个接口的默认的扩展点的设置如@SPI("dubbo"),dubbo表示默认值。

    @Adaptive

    若修饰在类上,表示该类是自适应扩展点的类如Compiler
    若修改在方法上,表示需要动态生成字节码,来进行转发如Protocol。

    案例

    public static void main(String[] args) {
        Protocol protocol= ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
        System.out.println(protocol);--org.apache.dubbo.rpc.Protocol$Adaptive@65d6b83b
    }
    

    源码分析

    org.apache.dubbo.common.extension.ExtensionLoader#Protocol

    getAdaptiveExtensionClass
    若是类级别,则返回;若是方法级别则创建字节码

    private Class<?> getAdaptiveExtensionClass() {
        getExtensionClasses();
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
    }
    
    org.apache.dubbo.common.extension.ExtensionLoader#

    createAdaptiveExtensionClass
    生成%s$Adaptive的类并编译为class文件

    private Class<?> createAdaptiveExtensionClass() {
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
        ClassLoader classLoader = findClassLoader();
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }
    
    Protocol$Adaptive

    这个是方法级别,动态生成的代理类

    import org.apache.dubbo.common.extension.ExtensionLoader;
    
    public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
        public void destroy() {
            throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public int getDefaultPort() {
            throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
            if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
            if (arg0.getUrl() == null)
                throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
            org.apache.dubbo.common.URL url = arg0.getUrl();
            String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
            if (extName == null)
                throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
            org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.export(arg0);
        }
    
        public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
            if (arg1 == null) throw new IllegalArgumentException("url == null");
            org.apache.dubbo.common.URL url = arg1;
            String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
            if (extName == null)
                throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
            org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.refer(arg0, arg1);
        }
    }
    
    image.png
    org.apache.dubbo.common.extension.ExtensionLoader#injectExtension

    对扩展点的属性设置值。

    org.apache.dubbo.common.extension.ExtensionLoader#loadClass

    对cachedAdaptiveClass属性值进行设置

    private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
        if (!type.isAssignableFrom(clazz)) {
            throw new IllegalStateException("Error occurred when loading extension class (interface: " +
                    type + ", class line: " + clazz.getName() + "), class "
                    + clazz.getName() + " is not subtype of interface.");
        }
        if (clazz.isAnnotationPresent(Adaptive.class)) {
            cacheAdaptiveClass(clazz);
        } else if (isWrapperClass(clazz)) {
            cacheWrapperClass(clazz);
        } else {
            clazz.getConstructor();
            if (StringUtils.isEmpty(name)) {
                name = findAnnotationName(clazz);
                if (name.length() == 0) {
                    throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
                }
            }
    
            String[] names = NAME_SEPARATOR.split(name);
            if (ArrayUtils.isNotEmpty(names)) {
                cacheActivateClass(clazz, names[0]);
                for (String n : names) {
                    cacheName(clazz, n);
                    saveInExtensionClass(extensionClasses, clazz, name);
                }
            }
        }
    }
    

    Activate 自动激活扩展点

    @Activate表示根据条件自动激活,如@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)表示URL参数中若有cache值,则加载一个扩展点;若url没有则不加载;@Activate(group = Constants.CONSUMER)表示没有条件即默认加载。

    案例

    public static void main(String[] args) {
        URL url=new URL("","",0);
            --有cache则输出11,没有则输出10个
        url=url.addParameter("cache","cache");
        List<Filter> list=ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(url,"cache");
        System.out.println(list.size());
    }
    

    源码分析

    org.apache.dubbo.common.extension.ExtensionLoader#cacheActivateClass
    private void cacheActivateClass(Class<?> clazz, String name) {
        Activate activate = clazz.getAnnotation(Activate.class);
        if (activate != null) {
            //key是spi文件对应的key --> value表示注解类
            cachedActivates.put(name, activate);
        } else {
            // support com.alibaba.dubbo.common.extension.Activate
            com.alibaba.dubbo.common.extension.Activate oldActivate = clazz.getAnnotation(com.alibaba.dubbo.common.extension.Activate.class);
            if (oldActivate != null) {
                cachedActivates.put(name, oldActivate);
            }
        }
    }
    
    org.apache.dubbo.common.extension.ExtensionLoader#getActivateExtension
    public List<T> getActivateExtension(URL url, String[] values, String group) {
        List<T> exts = new ArrayList<>();
        List<String> names = values == null ? new ArrayList<>(0) : Arrays.asList(values);
        //加载默认的扩展点
        if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
            getExtensionClasses();
            for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
                String name = entry.getKey();
                Object activate = entry.getValue();
    
                String[] activateGroup, activateValue;
    
                if (activate instanceof Activate) {
                    activateGroup = ((Activate) activate).group();
                    activateValue = ((Activate) activate).value();
                } else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
                    activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
                    activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
                } else {
                    continue;
                }
                if (isMatchGroup(group, activateGroup)) {
                    T ext = getExtension(name);
                    if (!names.contains(name)
                            && !names.contains(Constants.REMOVE_VALUE_PREFIX + name)
                            && isActive(activateValue, url)) {
                        exts.add(ext);
                    }
                }
            }
            exts.sort(ActivateComparator.COMPARATOR);
        }
        List<T> usrs = new ArrayList<>();
        //加载需要激活的扩展点
        for (int i = 0; i < names.size(); i++) {
            String name = names.get(i);
            if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                    && !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
                if (Constants.DEFAULT_KEY.equals(name)) {
                    if (!usrs.isEmpty()) {
                        exts.addAll(0, usrs);
                        usrs.clear();
                    }
                } else {
                    T ext = getExtension(name);
                    usrs.add(ext);
                }
            }
        }
        if (!usrs.isEmpty()) {
            exts.addAll(usrs);
        }
        return exts;
    }
    

    相关文章

      网友评论

          本文标题:dubbo的内核剖析

          本文链接:https://www.haomeiwen.com/subject/ivkszhtx.html