美文网首页dubbo
dubbo源码愫读(6)dubbo的filter加载过程分析

dubbo源码愫读(6)dubbo的filter加载过程分析

作者: 桥头放牛娃 | 来源:发表于2019-01-29 20:17 被阅读119次

    我们知道dubbo是通过ExtensionLoader进行SPI动态扩展类的加载的,而Filter的织入就是通过动态加载实现的。

    (1)getExtension():

    dobbo的Protocol的SPI实现类是通过此方法进行动态加载的,此方法中通过createExtension()做实际的加载处理。

    public T getExtension(String name) {
        if (StringUtils.isEmpty(name)) {
            throw new IllegalArgumentException("Extension name == null");
        }
        if ("true".equals(name)) {
            return getDefaultExtension();
        }
        Holder<Object> holder = cachedInstances.get(name);
        if (holder == null) {
            cachedInstances.putIfAbsent(name, new Holder<Object>());
            holder = cachedInstances.get(name);
        }
        Object instance = holder.get();
        if (instance == null) {
            synchronized (holder) {
                instance = holder.get();
                if (instance == null) {
                    instance = createExtension(name);
                    holder.set(instance);
                }
            }
        }
        return (T) instance;
    }
    

    (2)createExtension():

    此实现有个至关重要的点,即如果wrapperClasses缓存不为空,则循环用Wapper对其进行包装,最终返回的是进行了多次包装的包装器类。

    private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (CollectionUtils.isNotEmpty(wrapperClasses)) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                    type + ")  could not be instantiated: " + t.getMessage(), t);
        }
    }
    

    (3)loadClass:

    此处是加载SPI类的处理,其中有个分支判断:isWrapperClass(),即判断加载的类是否为Wapper类,是则将其放到Wapper类的缓存中;否则直接进行加载处理。isWrapperClass()的判断为测试一下类是否有带参数的构造函数,有则为Wapper类,否则就不是Wapper类。

    private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
        if (!type.isAssignableFrom(clazz)) {
            throw new IllegalStateException("Error when load extension class(interface: " +
                    type + ", class line: " + clazz.getName() + "), class "
                    + clazz.getName() + "is not subtype of interface.");
        }
        if (clazz.isAnnotationPresent(Adaptive.class)) {
            if (cachedAdaptiveClass == null) {
                cachedAdaptiveClass = clazz;
            } else if (!cachedAdaptiveClass.equals(clazz)) {
                throw new IllegalStateException("More than 1 adaptive class found: "
                        + cachedAdaptiveClass.getClass().getName()
                        + ", " + clazz.getClass().getName());
            }
        } else if (isWrapperClass(clazz)) {
            Set<Class<?>> wrappers = cachedWrapperClasses;
            if (wrappers == null) {
                cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                wrappers = cachedWrapperClasses;
            }
            wrappers.add(clazz);
        } else {
            clazz.getConstructor();
            if (StringUtils.isEmpty(name)) {
                name = findAnnotationName(clazz);
                if (name.length() == 0) {
                    throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
                }
            }
            String[] names = NAME_SEPARATOR.split(name);
            if (names != null && names.length > 0) {
                Activate activate = clazz.getAnnotation(Activate.class);
                if (activate != null) {
                    cachedActivates.put(names[0], activate);
                } else {
                    // support com.alibaba.dubbo.common.extension.Activate
                    com.alibaba.dubbo.common.extension.Activate oldActivate = clazz.getAnnotation(com.alibaba.dubbo.common.extension.Activate.class);
                    if (oldActivate != null) {
                        cachedActivates.put(names[0], oldActivate);
                    }
                }
                for (String n : names) {
                    if (!cachedNames.containsKey(clazz)) {
                        cachedNames.put(clazz, n);
                    }
                    Class<?> c = extensionClasses.get(n);
                    if (c == null) {
                        extensionClasses.put(n, clazz);
                    } else if (c != clazz) {
                        throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                    }
                }
            }
        }
    }
    
    private boolean isWrapperClass(Class<?> clazz) {
        try {
            clazz.getConstructor(type);
            return true;
        } catch (NoSuchMethodException e) {
            return false;
        }
    }
    

    (4)dubbo中的Wrapper类

    dubbo有如下Wapper类:
    包dubbo-rpc->dubbo-rpc-api中:
    filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
    listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
    包dubbo-rpc->dubbo-rpc-api中:
    qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper

    filter:过滤器调用链包装器;
    listener:协议监听器包装器;
    qos:在线运维服务包装器;

    (5)Protocol实现类加载的Wapper链

    通过以上分析,Protocol实现类在加载时,实际加载的包装应该如下:

    ProtocolFilterWrapper -> ProtocolListenerWrapper -> QosProtocolWrapper -> XxxProtocol
    

    则调用Protocol实现类的export()实际调用顺序为:

    ProtocolFilterWrapper.export() -> ProtocolListenerWrapper.export() -> QosProtocolWrapper .export() -> XxxProtocol.export();
    

    (6)ProtocolFilterWrapper.export()实现:

    此处判断暴露协议类型,如果为registry,表示向注册中心暴露,则直接调用对应注册中心实现类的实现;否则调用包装之后的调用链。buildInvokerChain()将加载所有Filter,并将Filter构成调用链,链头为实际Protocol的实现类,链尾为Filter的最后一个,返回最后一个Filter的引用。则对协议包装器类的invoke调用实际是从最后一个Filter开始调用,直到协议实现类,这样就将Filter过滤器织入到调用链中了。

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }
    
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
    
                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }
    
                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }
    
                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }
    
                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        Result result = filter.invoke(next, invocation);
                        if (result instanceof AsyncRpcResult) {
                            AsyncRpcResult asyncResult = (AsyncRpcResult) result;
                            asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
                            return asyncResult;
                        } else {
                            return filter.onResponse(result, invoker, invocation);
                        }
                    }
    
                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }
    
                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }
    

    相关文章

      网友评论

        本文标题:dubbo源码愫读(6)dubbo的filter加载过程分析

        本文链接:https://www.haomeiwen.com/subject/vikfsqtx.html