上一篇有介绍,在RegistryProtocol里面有很关键的一句
如下
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
****
Invoker invoker = cluster.join(directory);
return invoker;
}
我们先看cluster的类继承关系,如下

cluster的继承非常的扁平,标准的策略模式的使用方式,
而cluster的默认的spi使用failover(失败重试的模式),等于说默认实现是FailoverCluster类,
我们看下FailoverCluster的实现如下
public class FailoverCluster implements Cluster {
public final static String NAME = "failover";
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<T>(directory);
}
}
FailoverCluster将Directory封装成了一个FailoverClusterInvoker的invoke。
我们接着看下AbstractClusterInvoker 的源码中invoke的实现方法如下
Class AbstractClusterInvoker
public Result invoke(final Invocation invocation) throws RpcException {
//判断当前invoke是否已经destroyed
checkWhetherDestroyed();
LoadBalance loadbalance = null;
// 将上下文中的Attachments绑定到invocation中
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addAttachments(contextAttachments);
}
//通过引用的Directory返回可用的List<Invoker<T>> invokers
List<Invoker<T>> invokers = list(invocation);
//选择loadbalance,没有指定的话默认使用random随机的负载均衡器
if (invokers != null && !invokers.isEmpty()) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
}
//下面的这句先忽略
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//按照spring的惯例,只有do**才是真正干活的方法
return doInvoke(invocation, invokers, loadbalance);
}
其中doInvoke在子类中进行了重写,我们看下FailoverClusterInvoker是如何实现的(我将不重要的代码进行了修剪)
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) {
List<Invoker<T>> copyinvokers = invokers;
//查看其配置的retries次数,默认是3次
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
//len是我们失败重试的最大的次数
for (int i = 0; i < len; i++) {
//每次重试的时候,都刷新下invokers,保证invokers是最新的
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
//根据loadbalance算法选择一个invoker
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
//调用选中的invoker执行invoke方法
Result result = invoker.invoke(invocation);
return result;
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
}
去掉不相关的代码之后,其实代码还是很好理解的,那最重要的还是
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
方法,此方法又回调了父类AbstractClusterInvoker,我们继续的看下(精简了下),其实就一句,那些关于stickyInvoker的逻辑可以先不关注,一般来说除了特殊情况,不要用
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) {
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
return invoker;
}
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) {
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
//如果这个invoker是先前被selected过的,或者当前的invoker不可用,那么触发
//reselect
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rinvoker != null) {
invoker = rinvoker;
} else {
//如果重选不出来,那么选择先前选出来的invoker的下一个(也就是轮询了)
int index = invokers.indexOf(invoker);
try {
//Avoid collision
invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
}
}
}
}
return invoker;
}
整个逻辑也非常简单
第一步 使用loadbalance 选出invoker
第二步 如果选出的invoker在先前的selected,那么触发重选,不然结束
第三步 如果触发重选没有选出来,选取先前选出来的invoker的下一个(毕竟先要保证可用)
如上的代码中就两句最重要,如下
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
由于第一句涉及到loadbalance的分析,我们先分析第二句
private Invoker<T> reselect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected, boolean availablecheck)
{
//Allocating one in advance, this list is certain to be used.
List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size());
//如果需要活性检查,将所有活性的不在selected里面的invokers选取出来
if (availablecheck) { // invoker.isAvailable() should be checked
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
//将选取出来的invokers使用loadbalance算法进行进一步的选取
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
} else {
for (Invoker<T> invoker : invokers) {
if (selected == null || !selected.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}
// 如果到这一步还没有选出来,只能到selected里面去选了
{
if (selected != null) {
for (Invoker<T> invoker : selected) {
if ((invoker.isAvailable()) // available first
&& !reselectInvokers.contains(invoker)) {
reselectInvokers.add(invoker);
}
}
}
if (!reselectInvokers.isEmpty()) {
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}
return null;
}
如上的代码我们分析到最后,其实都是调用了loadbalance的select方法。
我们接下来对loadbalance进行详细的分析,在dubbo里面设计的loadbalance的源码都非常的精巧,值得我们慢慢的把玩。
在dubbo中默认的提供了四种负载均衡算法,如下

首先看LoadBalance接口
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
/**
* select one invoker in list.
*
* @param invokers invokers.
* @param url refer url
* @param invocation invocation.
* @return selected invoker.
*/
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
非常的简单,就是从invokers根据策略选出一个Invoker。我们先看默认的实现RandomLoadBalance。
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
//随机函数
private final Random random = new Random();
@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int totalWeight = 0; // The sum of weights
boolean sameWeight = true; // Every invoker has the same weight?
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // Sum
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
//如果总的权重大于0且每个invoke的权重不一样
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
//根据总权重生成随机数
int offset = random.nextInt(totalWeight);
// Return a invoker based on the random value.
//从左往右进行权重积分,直到积分大于随机权重,返回i,这样设计的目的保证
//权重大的invoke会被优先的选中
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(random.nextInt(length));
}
}
那每个invoke的权重weight是如何的计算的呢,在AbstractLoadBalance里面
Class AbstractLoadBalance
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
//判断invocation对应的方法是否配置了权重
int weight = getWeightParameter(invoker, invocation);
//如果配置了权重,但是远程服务的启动时间小于10分钟,那么将权重打折
//这样设计的目的保证新启动的机器不会一下子涌进太多的流量
if (weight > 0) {
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
int uptime = (int) (System.currentTimeMillis() - timestamp);
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
由于其他的几个LoadBalance算法比较巧妙,我们单独的分三篇进行介绍
网友评论