美文网首页dubbo
dubbo - 负载均衡调用过程

dubbo - 负载均衡调用过程

作者: 晴天哥_王志 | 来源:发表于2019-05-07 13:06 被阅读76次

    开篇

    这篇文章的主要目的是讲清楚ClusterInvoker存在多个invoker对象进行负载均衡的调用过程,也就描述从调用到负载均衡选择的调用链路,真正的负载均衡部分的逻辑在后面由单独讲解。

    selector 调用时序图

    dubbo selector调用时序图
    说明:
    • RegistryProtocol的doRefer()方法内部cluster.join()负责创建ClusterInvoker对象,所有的cluster的invoker的选择逻辑都在这个函数实现。

    • FailoverClusterInvoker继承自AbstractClusterInvoker,AbstractClusterInvoker抽象出了通用的所有类型ClusterInvoker需要使用的方法,核心的如select、doSelect、invoke等通用方法,将具体不同实现的doInvoke方法抽象给具体实现类实现。

    • AbstractClusterInvoker的invoke()方法作为调用的入口,内部功能主要是根据loadBalance策略选择invoker进行执行。

    • AbstractClusterInvoker的select() -> doSelect()方法内部根据不同loadBalance策略实现invoker的调用。

    • 通过选择具体的invoker对象后调用invoke()方法实现远程调用。

    dubbo cluster 调用过程源码

    FailoverCluster对象创建入口

    说明:

    • 核心在于cluster.join()方法会调用到如FailoverCluster类,建议通过debug打断点方式跟踪调用栈。
    public class RegistryProtocol implements Protocol {
    
        private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
            // 构建RegistryDirectory,可以把它理解为注册资源,其中包含了消费者/服务/路由等相关信息,其同时也是回调监听器
            RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
            directory.setRegistry(registry);
            directory.setProtocol(protocol);
            // all attributes of REFER_KEY
            Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
    
            // 构建subscribeUrl信息,主要拼接consumer:xxx的url地址
            URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
    
            if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                    && url.getParameter(Constants.REGISTER_KEY, true)) {
                // 向注册中心注册服务消费者
                registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                        Constants.CHECK_KEY, String.valueOf(false)));
            }
    
            // 从注册中心订阅服务提供者(即引用的服务)
            directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                    Constants.PROVIDERS_CATEGORY
                            + "," + Constants.CONFIGURATORS_CATEGORY
                            + "," + Constants.ROUTERS_CATEGORY));
    
            // 从invoker当中选择其中一个返回
            Invoker invoker = cluster.join(directory);
    
            // 注册消费者
            ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
            return invoker;
        }
    }
    

    FailoverClusterInvoker 负载均衡器举例

    • 按照以下流程进行进行invoker的选择:AbstractClusterInvoker.invoke() -> FailoverClusterInvoker.doInvoke() -> AbstractClusterInvoker. select() -> AbstractClusterInvoker.doselect() -> AbstractLoadBalance.select()。

    • invoke()和select()调用链的关键路径在于invoke由父类触发后在实现类FailoverClusterInvoker执行,然后调用的负载均衡因为是通用的所以在父类AbstractClusterInvoker中实现。

    • directory.list(invocation)负责选择从ReigstryDeirectory获取服务引用的invokers列表进行负载均衡选择

    • invoker选择的过程中,在AbstractClusterInvoker.invoke()阶段会根据规则生成LoadBalance对象进行invoker的选择,在调用过程中生成LoadBalance的对象。

    • LoadBalance的选择是按照URL中配置的信息负载均衡器进行选择,如果没有配置就走默认的负载均衡器。

    • AbstractLoadBalance作为负载均衡器基类,提供了select()的执行流程,具体的doSelect动作由各个实现类去实现。

    public class FailoverCluster implements Cluster {
    
        public final static String NAME = "failover";
    
        public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
            return new FailoverClusterInvoker<T>(directory);
        }
    }
    
    
    public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
    
        private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
    
        public FailoverClusterInvoker(Directory<T> directory) {
            super(directory);
        }
    
        @SuppressWarnings({"unchecked", "rawtypes"})
        public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
            List<Invoker<T>> copyinvokers = invokers;
            checkInvokers(copyinvokers, invocation);
            int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
            if (len <= 0) {
                len = 1;
            }
            // retry loop.
            RpcException le = null; // last exception.
            List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
            Set<String> providers = new HashSet<String>(len);
            for (int i = 0; i < len; i++) {
                //Reselect before retry to avoid a change of candidate `invokers`.
                //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
                if (i > 0) {
                    checkWhetherDestroyed();
                    copyinvokers = list(invocation);
                    // check again
                    checkInvokers(copyinvokers, invocation);
                }
                Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
                invoked.add(invoker);
                RpcContext.getContext().setInvokers((List) invoked);
                try {
                    Result result = invoker.invoke(invocation);
                    return result;
                } catch (RpcException e) {
    
                } catch (Throwable e) {
    
                } finally {
                    providers.add(invoker.getUrl().getAddress());
                }
            }
        }
    }
    
    
    public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
    
        public Result invoke(final Invocation invocation) throws RpcException {
    
            checkWhetherDestroyed();
    
            LoadBalance loadbalance;
    
            List<Invoker<T>> invokers = list(invocation);
            if (invokers != null && invokers.size() > 0) {
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                        .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
            } else {
                loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
            }
            RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    
            return doInvoke(invocation, invokers, loadbalance);
        }
    
    
        protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
            List<Invoker<T>> invokers = directory.list(invocation);
            return invokers;
        }
    
    
        protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
            if (invokers == null || invokers.size() == 0)
                return null;
            String methodName = invocation == null ? "" : invocation.getMethodName();
    
            boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
            {
                //ignore overloaded method
                if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
                    stickyInvoker = null;
                }
                //ignore cucurrent problem
                if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
                    if (availablecheck && stickyInvoker.isAvailable()) {
                        return stickyInvoker;
                    }
                }
            }
            Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
    
            if (sticky) {
                stickyInvoker = invoker;
            }
            return invoker;
        }
    
    
        private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
            if (invokers == null || invokers.size() == 0)
                return null;
            if (invokers.size() == 1)
                return invokers.get(0);
            // If we only have two invokers, use round-robin instead.
            if (invokers.size() == 2 && selected != null && selected.size() > 0) {
                return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
            }
            Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
    
            //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
            if ((selected != null && selected.contains(invoker))
                    || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
                try {
                    Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                    if (rinvoker != null) {
                        invoker = rinvoker;
                    } else {
                        //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                        int index = invokers.indexOf(invoker);
                        try {
                            //Avoid collision
                            invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
                        } catch (Exception e) {
                            logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                        }
                    }
                } catch (Throwable t) {
                    logger.error("clustor relselect fail reason is :" + t.getMessage() + " if can not slove ,you can set cluster.availablecheck=false in url", t);
                }
            }
            return invoker;
        }
    }
    

    AbstractLoadBalance执行过程

    • AbstractLoadBalance的select()方法作为负载均衡选择器的入口位置,提供通用的getWeight()方法获取权重。

    • AbstractLoadBalance的doSelect()由具体的实现类实现。

    public abstract class AbstractLoadBalance implements LoadBalance {
    
        static int calculateWarmupWeight(int uptime, int warmup, int weight) {
            int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
            return ww < 1 ? 1 : (ww > weight ? weight : ww);
        }
    
        public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
            if (invokers == null || invokers.size() == 0)
                return null;
            if (invokers.size() == 1)
                return invokers.get(0);
            return doSelect(invokers, url, invocation);
        }
    
        protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);
    
        protected int getWeight(Invoker<?> invoker, Invocation invocation) {
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
            if (weight > 0) {
                long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
                if (timestamp > 0L) {
                    int uptime = (int) (System.currentTimeMillis() - timestamp);
                    int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
                    if (uptime > 0 && uptime < warmup) {
                        weight = calculateWarmupWeight(uptime, warmup, weight);
                    }
                }
            }
            return weight;
        }
    }
    

    相关文章

      网友评论

        本文标题:dubbo - 负载均衡调用过程

        本文链接:https://www.haomeiwen.com/subject/kfpjoqtx.html