美文网首页
dubbo技术内幕五 Cluster + LoadBalance

dubbo技术内幕五 Cluster + LoadBalance

作者: 牧羊人刘俏 | 来源:发表于2021-04-14 15:29 被阅读0次

上一篇有介绍,在RegistryProtocol里面有很关键的一句
如下

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
       
        ****
        Invoker invoker = cluster.join(directory);
     
        return invoker;
    }

我们先看cluster的类继承关系,如下


image.png

cluster的继承非常的扁平,标准的策略模式的使用方式,
而cluster的默认的spi使用failover(失败重试的模式),等于说默认实现是FailoverCluster类,
我们看下FailoverCluster的实现如下

public class FailoverCluster implements Cluster {

    public final static String NAME = "failover";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new FailoverClusterInvoker<T>(directory);
    }
}

FailoverCluster将Directory封装成了一个FailoverClusterInvoker的invoke。
我们接着看下AbstractClusterInvoker 的源码中invoke的实现方法如下

Class AbstractClusterInvoker
    public Result invoke(final Invocation invocation) throws RpcException {
        //判断当前invoke是否已经destroyed
        checkWhetherDestroyed();
        LoadBalance loadbalance = null;

        // 将上下文中的Attachments绑定到invocation中
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addAttachments(contextAttachments);
        }
        //通过引用的Directory返回可用的List<Invoker<T>> invokers 
        List<Invoker<T>> invokers = list(invocation);
        //选择loadbalance,没有指定的话默认使用random随机的负载均衡器
        if (invokers != null && !invokers.isEmpty()) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        }
        //下面的这句先忽略
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        //按照spring的惯例,只有do**才是真正干活的方法
        return doInvoke(invocation, invokers, loadbalance);
    }

其中doInvoke在子类中进行了重写,我们看下FailoverClusterInvoker是如何实现的(我将不重要的代码进行了修剪)

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) {
        List<Invoker<T>> copyinvokers = invokers;
        //查看其配置的retries次数,默认是3次
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
         //len是我们失败重试的最大的次数
        for (int i = 0; i < len; i++) {
            //每次重试的时候,都刷新下invokers,保证invokers是最新的
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                copyinvokers = list(invocation);
                // check again
                checkInvokers(copyinvokers, invocation);
            }
            //根据loadbalance算法选择一个invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                //调用选中的invoker执行invoke方法
                Result result = invoker.invoke(invocation);
   
                return result;
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
      
    }

去掉不相关的代码之后,其实代码还是很好理解的,那最重要的还是
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
方法,此方法又回调了父类AbstractClusterInvoker,我们继续的看下(精简了下),其实就一句,那些关于stickyInvoker的逻辑可以先不关注,一般来说除了特殊情况,不要用

    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) {
     
        Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
        return invoker;
    }
 private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) {
      
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
       //如果这个invoker是先前被selected过的,或者当前的invoker不可用,那么触发
      //reselect
        if ((selected != null && selected.contains(invoker))
                || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
            try {
                Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                if (rinvoker != null) {
                    invoker = rinvoker;
                } else {
                    //如果重选不出来,那么选择先前选出来的invoker的下一个(也就是轮询了)
                    int index = invokers.indexOf(invoker);
                    try {
                        //Avoid collision
                        invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
                    } 
                }
            } 
        }
        return invoker;
    }

整个逻辑也非常简单
第一步 使用loadbalance 选出invoker
第二步 如果选出的invoker在先前的selected,那么触发重选,不然结束
第三步 如果触发重选没有选出来,选取先前选出来的invoker的下一个(毕竟先要保证可用)

如上的代码中就两句最重要,如下

Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
 Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);

由于第一句涉及到loadbalance的分析,我们先分析第二句

 private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
           {

        //Allocating one in advance, this list is certain to be used.
        List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());

       //如果需要活性检查,将所有活性的不在selected里面的invokers选取出来
        if (availablecheck) { // invoker.isAvailable() should be checked
            for (Invoker<T> invoker : invokers) {
                if (invoker.isAvailable()) {
                    if (selected == null || !selected.contains(invoker)) {
                        reselectInvokers.add(invoker);
                    }
                }
            }
            //将选取出来的invokers使用loadbalance算法进行进一步的选取
            if (!reselectInvokers.isEmpty()) {
                return loadbalance.select(reselectInvokers, getUrl(), invocation);
            }
        } else { 
            for (Invoker<T> invoker : invokers) {
                if (selected == null || !selected.contains(invoker)) {
                    reselectInvokers.add(invoker);
                }
            }
            if (!reselectInvokers.isEmpty()) {
                return loadbalance.select(reselectInvokers, getUrl(), invocation);
            }
        }
        // 如果到这一步还没有选出来,只能到selected里面去选了
        {
            if (selected != null) {
                for (Invoker<T> invoker : selected) {
                    if ((invoker.isAvailable()) // available first
                            && !reselectInvokers.contains(invoker)) {
                        reselectInvokers.add(invoker);
                    }
                }
            }
            if (!reselectInvokers.isEmpty()) {
                return loadbalance.select(reselectInvokers, getUrl(), invocation);
            }
        }
        return null;
    }

如上的代码我们分析到最后,其实都是调用了loadbalance的select方法。
我们接下来对loadbalance进行详细的分析,在dubbo里面设计的loadbalance的源码都非常的精巧,值得我们慢慢的把玩。
在dubbo中默认的提供了四种负载均衡算法,如下


image.png

首先看LoadBalance接口

@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {

    /**
     * select one invoker in list.
     *
     * @param invokers   invokers.
     * @param url        refer url
     * @param invocation invocation.
     * @return selected invoker.
     */
    @Adaptive("loadbalance")
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

非常的简单,就是从invokers根据策略选出一个Invoker。我们先看默认的实现RandomLoadBalance。

public class RandomLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "random";
   //随机函数
    private final Random random = new Random();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // Number of invokers
        int totalWeight = 0; // The sum of weights
        boolean sameWeight = true; // Every invoker has the same weight?
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            totalWeight += weight; // Sum
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false;
            }
        }
       //如果总的权重大于0且每个invoke的权重不一样
        if (totalWeight > 0 && !sameWeight) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            //根据总权重生成随机数
            int offset = random.nextInt(totalWeight);
            // Return a invoker based on the random value.
            //从左往右进行权重积分,直到积分大于随机权重,返回i,这样设计的目的保证
            //权重大的invoke会被优先的选中
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(random.nextInt(length));
    }

}

那每个invoke的权重weight是如何的计算的呢,在AbstractLoadBalance里面

Class  AbstractLoadBalance
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
        //判断invocation对应的方法是否配置了权重
        int weight = getWeightParameter(invoker, invocation);
         //如果配置了权重,但是远程服务的启动时间小于10分钟,那么将权重打折
        //这样设计的目的保证新启动的机器不会一下子涌进太多的流量
        if (weight > 0) {
            long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
            if (timestamp > 0L) {
                int uptime = (int) (System.currentTimeMillis() - timestamp);
                int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
                if (uptime > 0 && uptime < warmup) {
                    weight = calculateWarmupWeight(uptime, warmup, weight);
                }
            }
        }
        return weight;
    }

由于其他的几个LoadBalance算法比较巧妙,我们单独的分三篇进行介绍

相关文章

网友评论

      本文标题:dubbo技术内幕五 Cluster + LoadBalance

      本文链接:https://www.haomeiwen.com/subject/ngnclltx.html