美文网首页
dubbo中Dirctory,Router,LoadBalanc

dubbo中Dirctory,Router,LoadBalanc

作者: dracula337435 | 来源:发表于2019-09-30 07:55 被阅读0次

    先推荐别人两篇文章
    Dubbo源码分析:Directory
    Dubbo源码分析:Router

    问题引入

    想自定义dubbo的行为,很可能绕不开这几类组件
    理解这几类组件的作用,才能根据定制需求,找准最合适的组件

    这几个组件的作用

    DirctoryRouterLoadBalanceFilter顺序,这几类组件是递进的,逐步缩小范围
    先给出一个大致的效果示意图:

        组件              Directory              Router             LoadBalance      Filter
        产物     Invocation ----->  List<Invoker> -----> List<Invoker> -----> Invoker -----> Result
    组件中的方法             list                 route                select         invoke
    

    看上图中,从左到右这几个接口的声明
    org.apache.dubbo.rpc.cluster.Directory的声明

    public interface Directory<T> extends Node {
    
        //省略非关注方法
    
        List<Invoker<T>> list(Invocation invocation) throws RpcException;
    
    }
    

    org.apache.dubbo.rpc.cluster.Router的声明

    public interface Router extends Comparable<Router> {
    
       //省略非关注方法
    
        <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
    
    }
    

    org.apache.dubbo.rpc.cluster.LoadBalance的声明

    public interface LoadBalance {
    
        //省略非关注方法
    
        <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
    
    }
    

    org.apache.dubbo.rpc.Filter的声明

    public interface Filter {
    
        //省略非关注方法
    
        Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException;
    
    }
    

    这几个组件是如何配合起来的

    先到AbstractClusterInvoker,转入AbstractDirectory,执行DirectoryRouter,回到AbstractClusterInvoker,执行LoadBalance
    分别见com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker中的invoke方法:

    @Override
    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
        LoadBalance loadbalance = null;
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && !invokers.isEmpty()) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }
    
    protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                       LoadBalance loadbalance) throws RpcException;
    
    protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        List<Invoker<T>> invokers = directory.list(invocation);
        return invokers;
    }
    

    com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectorylist方法:

    @Override
    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }
        List<Invoker<T>> invokers = doList(invocation);
        List<Router> localRouters = this.routers; // local reference
        if (localRouters != null && !localRouters.isEmpty()) {
            for (Router router : localRouters) {
                try {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
                }
            }
        }
        return invokers;
    }
    

    Filter的介入更复杂一些,要先了解dubboSPI中的Wrapper
    先见ExtensionLoader中的createExtension,在实例化Extension(变量instance)及调用injectExtension(调用setter)后,有如下代码片段:

    Set<Class<?>> wrapperClasses = cachedWrapperClasses;
    if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
        for (Class<?> wrapperClass : wrapperClasses) {
            instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
        }
    }
    

    可见将Extension用各Wrapper逐个包裹起来了,作为返回的Extension
    Set<Class<?>> cachedWrapperClasses的操作可见loadClassisWrapperClass方法:

    private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
        if (!type.isAssignableFrom(clazz)) {
            throw new IllegalStateException("Error when load extension class(interface: " +
                    type + ", class line: " + clazz.getName() + "), class "
                    + clazz.getName() + "is not subtype of interface.");
        }
        if (clazz.isAnnotationPresent(Adaptive.class)) {
            if (cachedAdaptiveClass == null) {
                cachedAdaptiveClass = clazz;
            } else if (!cachedAdaptiveClass.equals(clazz)) {
                throw new IllegalStateException("More than 1 adaptive class found: "
                        + cachedAdaptiveClass.getClass().getName()
                        + ", " + clazz.getClass().getName());
            }
        } else if (isWrapperClass(clazz)) {
            Set<Class<?>> wrappers = cachedWrapperClasses;
            if (wrappers == null) {
                cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                wrappers = cachedWrapperClasses;
            }
            wrappers.add(clazz);
        } else {
            clazz.getConstructor();
            if (name == null || name.length() == 0) {
                name = findAnnotationName(clazz);
                if (name == null || name.length() == 0) {
                    if (clazz.getSimpleName().length() > type.getSimpleName().length()
                            && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                        name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                    } else {
                        throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
                    }
                }
            }
            String[] names = NAME_SEPARATOR.split(name);
            if (names != null && names.length > 0) {
                Activate activate = clazz.getAnnotation(Activate.class);
                if (activate != null) {
                    cachedActivates.put(names[0], activate);
                }
                for (String n : names) {
                    if (!cachedNames.containsKey(clazz)) {
                        cachedNames.put(clazz, n);
                    }
                    Class<?> c = extensionClasses.get(n);
                    if (c == null) {
                        extensionClasses.put(n, clazz);
                    } else if (c != clazz) {
                        throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                    }
                }
            }
        }
    }
    
    private boolean isWrapperClass(Class<?> clazz) {
        try {
            clazz.getConstructor(type);
            return true;
        } catch (NoSuchMethodException e) {
            return false;
        }
    }
    

    然后见com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper,代码如下:

    public class ProtocolFilterWrapper implements Protocol {
    
        private final Protocol protocol;
    
        public ProtocolFilterWrapper(Protocol protocol) {
            if (protocol == null) {
                throw new IllegalArgumentException("protocol == null");
            }
            this.protocol = protocol;
        }
    
        private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
            Invoker<T> last = invoker;
            List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
            if (!filters.isEmpty()) {
                for (int i = filters.size() - 1; i >= 0; i--) {
                    final Filter filter = filters.get(i);
                    final Invoker<T> next = last;
                    last = new Invoker<T>() {
    
                        @Override
                        public Class<T> getInterface() {
                            return invoker.getInterface();
                        }
    
                        @Override
                        public URL getUrl() {
                            return invoker.getUrl();
                        }
    
                        @Override
                        public boolean isAvailable() {
                            return invoker.isAvailable();
                        }
    
                        @Override
                        public Result invoke(Invocation invocation) throws RpcException {
                            return filter.invoke(next, invocation);
                        }
    
                        @Override
                        public void destroy() {
                            invoker.destroy();
                        }
    
                        @Override
                        public String toString() {
                            return invoker.toString();
                        }
                    };
                }
            }
            return last;
        }
    
        @Override
        public int getDefaultPort() {
            return protocol.getDefaultPort();
        }
    
        @Override
        public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
                return protocol.export(invoker);
            }
            return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
        }
    
        @Override
        public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
            if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                return protocol.refer(type, url);
            }
            return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
        }
    
        @Override
        public void destroy() {
            protocol.destroy();
        }
    
    }
    

    其中可见关键的buildInvokerChain方法,将一条List<Filter>链变成了一个嵌套的调用栈

    相关文章

      网友评论

          本文标题:dubbo中Dirctory,Router,LoadBalanc

          本文链接:https://www.haomeiwen.com/subject/ogqhmqtx.html