美文网首页dubbo
4-dubbo源码分析之集群设计

4-dubbo源码分析之集群设计

作者: 致虑 | 来源:发表于2018-08-30 19:07 被阅读0次
    • 先看官网一张图


      image.png

      这就是dubbo的集群设计了,本章主要解析的就是图中的主要几个蓝色点,简单堆土做个说明:

    • Cluster对于dubbo集群整个管控,会有各种方案,比如快速失败、安全失败等

    • Directory在consumer章节中就已经接触过,主要维护的invoker的动态管理

    • Router一看名词就是路由相关了

    • LoadBalance讲的就是负载均衡

      整体结合起来就是:consumer去远程调用一个具体的provider时,会通过集群中路由、负载均衡等策略选取最终一个具体的服务完成具体调用。


    具体解析

    一.Cluster
    • 看下官网描述:

    Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个

    • 具体接口

      /**
       * Cluster. (SPI, Singleton, ThreadSafe)
       * <p>
       * <a href="http://en.wikipedia.org/wiki/Computer_cluster">Cluster</a>
       * <a href="http://en.wikipedia.org/wiki/Fault-tolerant_system">Fault-Tolerant</a>
       *
       * Cluster 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个
       * 应对出错情况采取的策略-9种实现
       */
      @SPI(FailoverCluster.NAME)
      public interface Cluster {
      
          /**
           * Merge the directory invokers to a virtual invoker.
           *
           * @param <T>
           * @param directory
           * @return cluster invoker
           * @throws RpcException
           */
          @Adaptive
          <T> Invoker<T> join(Directory<T> directory) throws RpcException;
      
      }
      

      很明显默认扩展是FailoverCluster,里面就一个很熟悉的方法,join,在consumer中已经出现过,那么跟踪一下;


      image.png

      MockClusterInvoker里面构造的是FailoverClusterInvoker,因此最终的invoker不断调用下传,继续:


      image.png
    image.png
    image.png
    image.png
    image.png
    image.png

    这个代码无非就是将相关的consumer调用信息进行构造封装,返回,但真正发挥作用的地方就是那个返回的Invoker: MockClusterInvoker-->FailoverClusterInvoker,为什么?因为这一步直接决定最终发起远程调用时所使用的ClusterInvoker,也就是如下的doInvoker方法:

    • 先看MockClusterInvoker

      /**
       * 降级处理方案
       * 原理就是改变注册在zookeeper上的节点信息.从而zookeeper通知重新生成invoker
       */
      @Override
      public Result invoke(Invocation invocation) throws RpcException {
          Result result = null;
      
          String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
          if (value.length() == 0 || value.equalsIgnoreCase("false")) {
              /**
               * 无降级: no mock
               * 这里的invoker是FailoverClusterInvoker
               */
              result = this.invoker.invoke(invocation);
          } else if (value.startsWith("force")) {
              if (logger.isWarnEnabled()) {
                  logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
              }
              /**
               * 屏蔽: force:direct mock
               * mock=force:return+null
               * 表示消费方对方法的调用都直接返回null,不发起远程调用
               * 可用于屏蔽不重要服务不可用的时候,对调用方的影响
               */
              //
              result = doMockInvoke(invocation, null);
          } else {
              /**
               * 容错: fail-mock
               * mock=fail:return+null
               * 表示消费方对该服务的方法调用失败后,再返回null,不抛异常
               * 可用于对不重要服务不稳定的时候,忽略对调用方的影响 */
              try {
                  result = this.invoker.invoke(invocation);
              } catch (RpcException e) {
                  if (e.isBiz()) {
                      throw e;
                  } else {
                      if (logger.isWarnEnabled()) {
                          logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                      }
                      result = doMockInvoke(invocation, e);
                  }
              }
          }
          return result;
      }
      

      这里出现了consumer配置项中的一个重要配置:mock;代码逻辑很清楚,讲的是容器的容错与降级方案。

    • 继续跟着看FailoverClusterInvoker

      @Override
      @SuppressWarnings({"unchecked", "rawtypes"})
      public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
      
          // Invoker列表
          List<Invoker<T>> copyinvokers = invokers;
      
          //确认下Invoker列表不为空
          checkInvokers(copyinvokers, invocation);
      
          //重试次数
          int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
          if (len <= 0) {
              len = 1;
          }
          // retry loop.
          RpcException le = null; // last exception.
          List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
          Set<String> providers = new HashSet<String>(len);
          for (int i = 0; i < len; i++) {
              //Reselect before retry to avoid a change of candidate `invokers`.
              //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
              /**
               * 重试时,进行重新选择,避免重试时invoker列表已发生变化.
               * 注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变
               */
              if (i > 0) {
                  checkWhetherDestroyed();
                  copyinvokers = list(invocation);
                  // check again
                  //重新检查一下
                  checkInvokers(copyinvokers, invocation);
              }
      
              /** 使用loadBalance选择一个Invoker返回 */
              Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
              invoked.add(invoker);
              RpcContext.getContext().setInvokers((List) invoked);
              try {
      
                  /** 使用选择的结果Invoker进行调用,返回结果 */
                  Result result = invoker.invoke(invocation);
                  if (le != null && logger.isWarnEnabled()) {
                      logger.warn("Although retry the method " + invocation.getMethodName()
                              + " in the service " + getInterface().getName()
                              + " was successful by the provider " + invoker.getUrl().getAddress()
                              + ", but there have been failed providers " + providers
                              + " (" + providers.size() + "/" + copyinvokers.size()
                              + ") from the registry " + directory.getUrl().getAddress()
                              + " on the consumer " + NetUtils.getLocalHost()
                              + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                              + le.getMessage(), le);
                  }
                  return result;
              } catch (RpcException e) {
                  if (e.isBiz()) { // biz exception.
                      throw e;
                  }
                  le = e;
              } catch (Throwable e) {
                  le = new RpcException(e.getMessage(), e);
              } finally {
                  providers.add(invoker.getUrl().getAddress());
              }
          }
          throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                  + invocation.getMethodName() + " in the service " + getInterface().getName()
                  + ". Tried " + len + " times of the providers " + providers
                  + " (" + providers.size() + "/" + copyinvokers.size()
                  + ") from the registry " + directory.getUrl().getAddress()
                  + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                  + Version.getVersion() + ". Last error is: "
                  + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
      }
      

      看看发起远程调用的debug情况:


      image.png

      恩 确实进来了,既然FailoverCluster的策略是:失败自动切换,当出现失败,重试其它服务器,那么这个策略的体现逻辑就在这个doInvoker的for循环重试里


      image.png
      len的取值就是配置项retries,即重试次数,默认是3次;注意:重试时,进行重新选择,避免重试时invoker列表已发生变化.
      至于当前invoker节点失败后重试的机制如何,就是select如何再次选择的问题了
      Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
      

      次实现在父类AbstractClusterInvoker中:

       /**
       *
       * 使用loadbalance选择invoker.</br>
       * a)先lb选择,如果在selected列表中 或者 不可用且做检验时,进入下一步(重选),否则直接返回</br>
       * b)重选验证规则:selected > available .保证重选出的结果尽量不在select中,并且是可用的
       *
       * @param selected 已选过的invoker.注意:输入保证不重复
       *
       * Select a invoker using loadbalance policy.</br>
       * a)Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or, 
       * if this invoker is unavailable, then continue step b (reselect), otherwise return the first selected invoker</br>
       * b)Reslection, the validation rule for reselection: selected > available. This rule guarantees that
       * the selected invoker has the minimum chance to be one in the previously selected list, and also 
       * guarantees this invoker is available.
       *
       * @param loadbalance load balance policy
       * @param invocation
       * @param invokers invoker candidates
       * @param selected  exclude selected invokers or not
       * @return
       * @throws RpcException
       */
      protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
          if (invokers == null || invokers.isEmpty()) return null;
      
          String methodName = invocation == null ? "" : invocation.getMethodName();
      
          // sticky,滞连接用于有状态服务,尽可能让客户端总是向同一提供者发起调用,除非该提供者挂了,再连另一台。
          boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
          {
              //ignore overloaded method
              if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
                  stickyInvoker = null;
              }
              //ignore concurrency problem
              if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
                  if (availablecheck && stickyInvoker.isAvailable()) {
                      return stickyInvoker;
                  }
              }
          }
          Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
      
          if (sticky) {
              stickyInvoker = invoker;
          }
          return invoker;
      }
      

      继续看核心方法:doSelect

      private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
      
          if (invokers == null || invokers.isEmpty()) return null;
      
          // 只有一个invoker,直接返回,不需要处理
          if (invokers.size() == 1)  return invokers.get(0);
      
          if (loadbalance == null) {
              loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
          }
      
          /** 通过具体的负载均衡的算法得到一个invoker,最后调用 */
          Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
      
          //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
          /** 如果 selected中包含(优先判断) 或者 不可用&&availablecheck=true 则重试. */
          if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
              try {
      
                  /**
                   * 重新选择
                   * 先从非selected的列表中选择,没有在从selected列表中选择
                   */
                  Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
                  if (rinvoker != null) {
                      invoker = rinvoker;
                  } else {
                      //Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
                      /** 看下第一次选的位置,如果不是最后,选+1位置. */
                      int index = invokers.indexOf(invoker);
                      try {
                          //Avoid collision
                          //最后在避免碰撞
                          invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0);
                      } catch (Exception e) {
                          logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
                      }
                  }
              } catch (Throwable t) {
                  logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t);
              }
          }
          return invoker;
      }
      

      大致如下:

      • 通过具体的负载均衡的算法得到一个invoker(后面详细说负债均衡)
      • 如果 selected中包含(优先判断) 或者 不可用&&availablecheck=true 则重试

      这里有个重要的细节:sticky配置


      image.png

      看代码就知道其作用:

      滞连接用于有状态服务,尽可能让客户端总是向同一提供者发起调用,除非该提供者挂了,再连另一台。

      FailoverCluster失败重试策略就差不多讲完了,大概回顾下:

    • 选择Cluster

    • 决定ClusterInvoker

    • 执行doInvoker时实现具体策略

      这里不是很难,下面各种策略几乎类似方式处理,就简单根据官网介绍下其实现的效果:

    • FailfastCluster

      快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

       public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
    
       public FailfastClusterInvoker(Directory<T> directory) {
           super(directory);
       }
    
       @Override
       public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
           checkInvokers(invokers, invocation);
           Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
           try {
               return invoker.invoke(invocation);
           } catch (Throwable e) {
               if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.
                   throw (RpcException) e;
               }
               throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
           }
         }
       }
    

    代码逻辑一目俩然

    • FailsafeCluster

      失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。

    public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
           private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);
       
           public FailsafeClusterInvoker(Directory<T> directory) {
               super(directory);
           }
       
           @Override
           public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
               try {
                   checkInvokers(invokers, invocation);
                   Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
                   return invoker.invoke(invocation);
               } catch (Throwable e) {
                   logger.error("Failsafe ignore exception: " + e.getMessage(), e);
                   return new RpcResult(); // ignore
               }
           }
    }   
    
    • FailbackCluster

      失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。

       public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
    
           private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class);
       
           private static final long RETRY_FAILED_PERIOD = 5 * 1000;
       
           /**
            * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread}
            * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
            */
           private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2,
                   new NamedInternalThreadFactory("failback-cluster-timer", true));
       
           private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>();
           private volatile ScheduledFuture<?> retryFuture;
       
           public FailbackClusterInvoker(Directory<T> directory) {
               super(directory);
           }
       
           private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
               if (retryFuture == null) {
                   synchronized (this) {
                       if (retryFuture == null) {
                           retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
       
                               @Override
                               public void run() {
                                   // collect retry statistics
                                   try {
                                       retryFailed();
                                   } catch (Throwable t) { // Defensive fault tolerance
                                       logger.error("Unexpected error occur at collect statistic", t);
                                   }
                               }
                           }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
                       }
                   }
               }
               failed.put(invocation, router);
           }
       
           void retryFailed() {
               if (failed.size() == 0) {
                   return;
               }
               for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(failed).entrySet()) {
                   Invocation invocation = entry.getKey();
                   Invoker<?> invoker = entry.getValue();
                   try {
                       invoker.invoke(invocation);
                       failed.remove(invocation);
                   } catch (Throwable e) {
                       logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
                   }
               }
           }
       
           @Override
           protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
               try {
                   checkInvokers(invokers, invocation);
                   Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
                   return invoker.invoke(invocation);
               } catch (Throwable e) {
                   logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e);
                   addFailed(invocation, this);
                   return new RpcResult(); // ignore
               }
           }
    }
    
    • ForkingCluster

      并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2" 来设置最大并行数。

    public class ForkingClusterInvoker<T> extends AbstractClusterInvoker<T> {
           /**
            * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread}
            * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}.
            */
           private final ExecutorService executor = Executors.newCachedThreadPool(new NamedInternalThreadFactory("forking-cluster-timer", true));
       
           public ForkingClusterInvoker(Directory<T> directory) {
               super(directory);
           }
       
           @Override
           @SuppressWarnings({"unchecked", "rawtypes"})
           public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
               checkInvokers(invokers, invocation);
               final List<Invoker<T>> selected;
               final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS);
               final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
               if (forks <= 0 || forks >= invokers.size()) {
                   selected = invokers;
               } else {
                   selected = new ArrayList<Invoker<T>>();
                   for (int i = 0; i < forks; i++) {
                       // TODO. Add some comment here, refer chinese version for more details.
                       Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
                       if (!selected.contains(invoker)) {//Avoid add the same invoker several times.
                           selected.add(invoker);
                       }
                   }
               }
               RpcContext.getContext().setInvokers((List) selected);
               final AtomicInteger count = new AtomicInteger();
               final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>();
               for (final Invoker<T> invoker : selected) {
                   executor.execute(new Runnable() {
                       @Override
                       public void run() {
                           try {
                               Result result = invoker.invoke(invocation);
                               ref.offer(result);
                           } catch (Throwable e) {
                               int value = count.incrementAndGet();
                               if (value >= selected.size()) {
                                   ref.offer(e);
                               }
                           }
                       }
                   });
               }
               try {
                   Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS);
                   if (ret instanceof Throwable) {
                       Throwable e = (Throwable) ret;
                       throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
                   }
                   return (Result) ret;
               } catch (InterruptedException e) {
                   throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e);
               }
           }
    }
    
    • BroadcastCluster

      广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

       public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
    
           private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);
       
           public BroadcastClusterInvoker(Directory<T> directory) {
               super(directory);
           }
       
           @Override
           @SuppressWarnings({"unchecked", "rawtypes"})
           public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
               checkInvokers(invokers, invocation);
               RpcContext.getContext().setInvokers((List) invokers);
               RpcException exception = null;
               Result result = null;
               for (Invoker<T> invoker : invokers) {
                   try {
                       result = invoker.invoke(invocation);
                   } catch (RpcException e) {
                       exception = e;
                       logger.warn(e.getMessage(), e);
                   } catch (Throwable e) {
                       exception = new RpcException(e.getMessage(), e);
                       logger.warn(e.getMessage(), e);
                   }
               }
               if (exception != null) {
                   throw exception;
               }
               return result;
           }
    }
    

    2.LoadBalance
    • 看下主接口
       /**
    * 负载均衡-四种负载均衡策略
    * LoadBalance. (SPI, Singleton, ThreadSafe)
    * <p>
    * <a href="http://en.wikipedia.org/wiki/Load_balancing_(computing)">Load-Balancing</a>
    *
    * @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory)
    */
    @SPI(RandomLoadBalance.NAME)
    public interface LoadBalance {
    
           /**
            * select one invoker in list.
            *
            * @param invokers   invokers.
            * @param url        refer url
            * @param invocation invocation.
            * @return selected invoker.
            */
           @Adaptive("loadbalance")
           <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
    }
    

    默认取的是RandomLoadBalance,那我们就以消费流程去详细解析下这个负债均衡策略。

    • RandomLoadBalance
      既然是负债均衡,那就是发起远程调用时选择provider服务时发挥作用,那我们从默认的FailoverClusterInvoker.doInvoke进入:


      image.png

      出现了loadbalance,那就继续跟踪


      image.png
      image.png
      因为我本地就启了一个provider,因此就无需走负债均衡了,直接返回,但这里如果provider大于1的话,看上面画出的重点:
      先找到AbstractLoadBalance的select方法:
    @Override
       public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    
           if (invokers == null || invokers.isEmpty()) return null;
    
           if (invokers.size() == 1) return invokers.get(0);
    
           // 进行选择,具体的子类实现,我们这里是RandomLoadBalance
           return doSelect(invokers, url, invocation);
       }
    

    又是钩子,具体就顺其到子类了:

       /**
    * random load balance.
    * 默认的策略
    *
    * 随机,按权重设置随机概率。
    * 在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
    *
    * 1.获取invokers的个数,并遍历累加权重
    * 2.若不为第0个,则将当前权重与上一个进行比较,只要有一个不等则认为不等,即:sameWeight=false
    * 3.若总权重>0 且 sameWeight=false 按权重获取随机数,根据随机数合权重相减确定调用节点
    * 4.sameWeight=true,则均等随机调用
    *
    * eg:假设有四个集群节点A,B,C,D,对应的权重分别是1,2,3,4,那么请求到A节点的概率就为1/(1+2+3+4) = 10%.B,C,D节点依次类推为20%,30%,40%.
    */
    public class RandomLoadBalance extends AbstractLoadBalance {
    
       public static final String NAME = "random";
    
       private final Random random = new Random();
    
       @Override
       protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
           int length = invokers.size(); // Number of invokers 总个数
           int totalWeight = 0; // The sum of weights 总权重
           boolean sameWeight = true; // Every invoker has the same weight? 权重是否都一样
           for (int i = 0; i < length; i++) {
               int weight = getWeight(invokers.get(i), invocation);
               totalWeight += weight; // Sum 累计总权重
               if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) {
                   sameWeight = false; // 计算所有权重是否都一样
               }
           }
    
           // eg: 总权重为10(1+2+3+4),那么怎么做到按权重随机呢?根据10随机出一个整数,假如为随机出来的是2.然后依次和权重相减,比如2(随机数)-1(A的权重) = 1,然后1(上一步计算的结果)-2(B的权重) = -1,此时-1 < 0,那么则调用B,其他的以此类推
           if (totalWeight > 0 && !sameWeight) {
               // 如果权重不相同且权重大于0.则按总权重数随机
               // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
               int offset = random.nextInt(totalWeight);
               // 确定随机值落在那个片段上
               // Return a invoker based on the random value.
               for (int i = 0; i < length; i++) {
                   offset -= getWeight(invokers.get(i), invocation);
                   if (offset < 0) {
                       return invokers.get(i);
                   }
               }
           }
           // 如果权重相同或权重为0则均等随机
           // If all invokers have the same weight value or totalWeight=0, return evenly.
           return invokers.get(random.nextInt(length));
       }
    }
    

    当前策略的算法在注释中很清楚了,这里不在细说。其他三种负债均衡其实处理方式大致相同,简单列一下:

    • RoundRobinLoadBalance

    轮循,按公约后的权重设置轮循比率。

       @Override
       protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
           String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
           int length = invokers.size(); // Number of invokers invokers的个数
           int maxWeight = 0; // The maximum weight // 最大权重
           int minWeight = Integer.MAX_VALUE; // The minimum weight 最小权重
           final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
           int weightSum = 0;
           for (int i = 0; i < length; i++) {
               int weight = getWeight(invokers.get(i), invocation);
               maxWeight = Math.max(maxWeight, weight); // Choose the maximum weight  累计最大权重
               minWeight = Math.min(minWeight, weight); // Choose the minimum weight  累计最小权重
               if (weight > 0) {
                   invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
                   weightSum += weight;
               }
           }
           AtomicPositiveInteger sequence = sequences.get(key);
           if (sequence == null) {
               sequences.putIfAbsent(key, new AtomicPositiveInteger());
               sequence = sequences.get(key);
           }
           int currentSequence = sequence.getAndIncrement();
           if (maxWeight > 0 && minWeight < maxWeight) {  // 如果权重不一样
               int mod = currentSequence % weightSum;
               for (int i = 0; i < maxWeight; i++) {
                   for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                       final Invoker<T> k = each.getKey();
                       final IntegerWrapper v = each.getValue();
                       if (mod == 0 && v.getValue() > 0) {
                           return k;
                       }
                       if (v.getValue() > 0) {
                           v.decrement();
                           mod--;
                       }
                   }
               }
           }
           // Round robin 取模循环
           return invokers.get(currentSequence % length);
       }
    
    • LeastActiveLoadBalance

    最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
    使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大

    举个实际的例子:
    A请求接受一个请求时计数+1,请求完再-1;B请求接受一个请求时,计数+1,请求完计数-1;按照这种逻辑,如果请求中的节点肯定比没有请求的计数低,因此找计数低的服务处理。场景就是:处理越慢的服务,计数越容易高,因此将后面请求分发给计数低的服务会更加友好。

       @Override
       protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
           int length = invokers.size(); // Number of invokers ,invoker总数
           int leastActive = -1; // The least active value of all invokers ,所有invoker的最小活跃数
           int leastCount = 0; // The number of invokers having the same least active value (leastActive)  拥有最小活跃数的Invoker是的个数
           int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)  拥有最小活跃数的Invoker的下标,也就是将最小活跃的invoker集中放入新数组,以便后续遍历
           int totalWeight = 0; // The sum of weights  总权重
           int firstWeight = 0; // Initial value, used for comparision  初始权重,用于计算是否相同
           boolean sameWeight = true; // Every invoker has the same weight value?  是否所有invoker的权重都相同
           for (int i = 0; i < length; i++) {
               Invoker<T> invoker = invokers.get(i);
               int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Active number 活跃数
               int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // Weight
               if (leastActive == -1 || active < leastActive) { // Restart, when find a invoker having smaller least active value.  如果发现更小的活跃数则重新开始
                   leastActive = active; // Record the current least active value 记录下最小的活跃数
                   leastCount = 1; // Reset leastCount, count again based on current leastCount 重新统计最小活跃数的个数
                   leastIndexs[0] = i; // Reset  重置小标
                   totalWeight = weight; // Reset
                   firstWeight = weight; // Record the weight the first invoker 重置第一个权重
                   sameWeight = true; // Reset, every invoker has the same weight value?  重置是否权重相同标识
               } else if (active == leastActive) { // If current invoker's active value equals with leaseActive, then accumulating.  累计相同的最小活跃数
                   leastIndexs[leastCount++] = i; // Record index number of this invoker  累计相同的最小活跃invoker的小标
                   totalWeight += weight; // Add this invoker's weight to totalWeight. 累加总权重
                   // If every invoker has the same weight?  是否所有权重一样
                   if (sameWeight && i > 0
                           && weight != firstWeight) {
                       sameWeight = false;
                   }
               }
           }
           // assert(leastCount > 0)
           if (leastCount == 1) {
               // 如果只有一个最小则直接返回
               // If we got exactly one invoker having the least active value, return this invoker directly.
               return invokers.get(leastIndexs[0]);
           }
           if (!sameWeight && totalWeight > 0) {
               // 如果权重不相同且总权重大于0,则按总权重随机
               // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
               int offsetWeight = random.nextInt(totalWeight);
               // 按随机数去值
               // Return a invoker based on the random value.
               for (int i = 0; i < leastCount; i++) {
                   int leastIndex = leastIndexs[i];
                   offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                   if (offsetWeight <= 0)
                       return invokers.get(leastIndex);
               }
           }
           // 如果权重相同或总权重为0,则均等随机
           // If all invokers have the same weight value or totalWeight=0, return evenly.
           return invokers.get(leastIndexs[random.nextInt(leastCount)]);
       }
    
    • ConsistentHashLoadBalance

    一致性 Hash,相同参数的请求总是发到同一提供者。当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。

       protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
           String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
           int identityHashCode = System.identityHashCode(invokers);
           ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
           if (selector == null || selector.identityHashCode != identityHashCode) {
               selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
               selector = (ConsistentHashSelector<T>) selectors.get(key);
           }
           return selector.select(invocation);
       }
    

    具体相关算法:
    http://en.wikipedia.org/wiki/Consistent_hashing


    3.Router
    • 请求被路由到哪个服务器,靠的就是路由啦,先看下主接口:
       /**
    * Router. (SPI, Prototype, ThreadSafe)
    * <p>
    * <a href="http://en.wikipedia.org/wiki/Routing">Routing</a>
    *
    * @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory)
    * @see com.alibaba.dubbo.rpc.cluster.Directory#list(Invocation)
    */
    public interface Router extends Comparable<Router> {
    
           /**
            * get the router url.
            *
            * @return url
            */
           URL getUrl();
       
           /**
            * route.
            *
            * @param invokers
            * @param url        refer url
            * @param invocation
            * @return routed invokers
            * @throws RpcException
            */
           <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
    }
    
    核心方法已经出现了。我们还是按照原有思路debug一下:
    
    image.png image.png
    image.png

    OK ,路由核心出现了,上面方法做了两件事:
    - 1.RegistryDirectory doList(invocation)将所有可用的invokers根据参数条件筛选出来;
    - 2.根据路由规则,将directory中筛选出来的invokers进行过滤,比如MockInvokersSelector将所有mock invokers过滤掉。

    image.png
    image.png

    过滤出来的invokers再返回即完成路由操作。路由执行大体流程就是如此,接下来列一下几个路由策略:

    • ScriptRouter

      脚本路由规则 支持 JDK 脚本引擎的所有脚本,比如:javascript, jruby, groovy 等,通过 type=javascript 参数设置脚本类型,缺省为 javascript。

      @Override
      @SuppressWarnings("unchecked")
      public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
          try {
              List<Invoker<T>> invokersCopy = new ArrayList<Invoker<T>>(invokers);
              Compilable compilable = (Compilable) engine;
              Bindings bindings = engine.createBindings();
              bindings.put("invokers", invokersCopy);
              bindings.put("invocation", invocation);
              bindings.put("context", RpcContext.getContext());
              CompiledScript function = compilable.compile(rule);
              Object obj = function.eval(bindings);
              if (obj instanceof Invoker[]) {
                  invokersCopy = Arrays.asList((Invoker<T>[]) obj);
              } else if (obj instanceof Object[]) {
                  invokersCopy = new ArrayList<Invoker<T>>();
                  for (Object inv : (Object[]) obj) {
                      invokersCopy.add((Invoker<T>) inv);
                  }
              } else {
                  invokersCopy = (List<Invoker<T>>) obj;
              }
              return invokersCopy;
          } catch (ScriptException e) {
              //fail then ignore rule .invokers.
              logger.error("route error , rule has been ignored. rule: " + rule + ", method:" + invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e);
              return invokers;
          }
      }
      
    • ConditionRouter

    条件路由: 根据dubbo管理控制台配置的路由规则来过滤相关的invoker,这里会实时触发RegistryDirectory类的notify方法,通知本地重建invokers

    ```
    @Override
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
        if (invokers == null || invokers.isEmpty()) {
            return invokers;
        }
        try {
            if (!matchWhen(url, invocation)) {
                return invokers;
            }
            List<Invoker<T>> result = new ArrayList<Invoker<T>>();
            if (thenCondition == null) {
                logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
                return result;
            }
            for (Invoker<T> invoker : invokers) {
                if (matchThen(invoker.getUrl(), url)) {
                    result.add(invoker);
                }
            }
            if (!result.isEmpty()) {
                return result;
            } else if (force) {
                logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY));
                return result;
            }
        } catch (Throwable t) {
            logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
        }
        return invokers;
    }
    ```
    

    OK 路由基本就分析到这里;


    4.Directory
    • 这个在consumer中已经分析过了,简单看看官网描述:

      Directory 代表多个 Invoker,可以把它看成 List<Invoker> ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更

       /**
    * Directory. (SPI, Prototype, ThreadSafe)
    * <p>
    * <a href="http://en.wikipedia.org/wiki/Directory_service">Directory Service</a>
    *
    * Directory 代表多个 Invoker,可以把它看成 List<Invoker> ,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更
    *
    * @see com.alibaba.dubbo.rpc.cluster.Cluster#join(Directory)
    */
    public interface Directory<T> extends Node {
    
           /**
            * get service type.
            *
            * @return service type.
            */
           Class<T> getInterface();
       
           /**
            * list invokers.
            *
            * @return invokers
            */
           List<Invoker<T>> list(Invocation invocation) throws RpcException;
    }
    

    而此处list方法的核心逻辑也是在分析Route中就已经见过了,不在分析;
    Directory能够动态根据注册中心维护Invokers列表,是因为相关Listener在被notify之后会触发methodInvokerMap和urlInvokerMap等缓存的相关变动;最后在list方法中也就实时取出了最新的invokers;看下之前的流程就清楚了;

    • StaticDirectory

    构造方法传入invokers,因此这个Directory的invokers是不会动态变化的,使用场景不多;

    public StaticDirectory(List<Invoker<T>> invokers) {
           this(null, invokers, null);
       }
    
       public StaticDirectory(List<Invoker<T>> invokers, List<Router> routers) {
           this(null, invokers, routers);
       }
    
       public StaticDirectory(URL url, List<Invoker<T>> invokers) {
           this(url, invokers, null);
       }
    
       public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {
           super(url == null && invokers != null && !invokers.isEmpty() ? invokers.get(0).getUrl() : url, routers);
           if (invokers == null || invokers.isEmpty())
               throw new IllegalArgumentException("invokers == null");
           this.invokers = invokers;
       }
    
    • RegistryDirectory

    根据注册中心的推送变更,动态维护invokers列表;

    整个集群大致模块就到这里。

    相关文章

      网友评论

        本文标题:4-dubbo源码分析之集群设计

        本文链接:https://www.haomeiwen.com/subject/ldbdwftx.html