美文网首页
HDFS Balancer源码阅读

HDFS Balancer源码阅读

作者: 此间少年仍犹在 | 来源:发表于2018-07-13 16:20 被阅读0次
    1. Dispatcher.init()方法

     init方法首先会请求DataNode节点报告

        //向NameNode请求DataNode报告,会调用ClientProtocol相应的方法
        final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport();
        final List<DatanodeStorageReport> trimmed = new ArrayList<DatanodeStorageReport>(); 
        // create network topology and classify utilization collections:
        // over-utilized, above-average, below-average and under-utilized.
        for (DatanodeStorageReport r : DFSUtil.shuffle(reports)) {
          final DatanodeInfo datanode = r.getDatanodeInfo();
          if (shouldIgnore(datanode)) {
            continue;
          }
          trimmed.add(r);
          //cluster是NetworkTopology类的对象
          cluster.add(datanode);
        }
        return trimmed;
    
    2. Dispatcher.shouldIgnore()方法

     Decommission的意思是DataNode从HDFS集群中移除掉,includeNodes和excludedNodes是使用balancer命令传入的两个参数,可以指定在哪些DataNode中执行balance,也可以在balance排除哪些DataNode。参数如下所示:

    [-exclude [-f <hosts-file> | <comma-separated list of hosts>]] Excludes the specified datanodes.
    [-include [-f <hosts-file> | <comma-separated list of hosts>]] Includes only the specified datanodes.

      private boolean shouldIgnore(DatanodeInfo dn) {
        // ignore decommissioned nodes
        final boolean decommissioned = dn.isDecommissioned();
        // ignore decommissioning nodes
        final boolean decommissioning = dn.isDecommissionInProgress();
        // ignore nodes in exclude list
        final boolean excluded = Util.isExcluded(excludedNodes, dn);
        // ignore nodes not in the include list (if include list is not empty)
        final boolean notIncluded = !Util.isIncluded(includedNodes, dn);
    
        if (decommissioned || decommissioning || excluded || notIncluded) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", "
                + decommissioning + ", " + excluded + ", " + notIncluded);
          }
          return true;
        }
        return false;
      }
    
    3. Balancer.init()方法

    这个方法https://www.jianshu.com/p/f7c1cd476601中写的比较详细,可以参考一下。
     其中policy属于BalancingPolicy实例,即Balancer平衡的策略,同样在使用balancer命令时可以指定该参数。平衡策略有两种:DataNode级别和BlockPool级别,BlockPool策略仅适用于HDFS Federation。该参数使用方法如下:

    [-policy <policy>] the balancing policy: datanode or blockpool

     接下里会根据获取的DataNode信息,计算出网络拓扑和集群平均存储使用率。

      /**
       * Given a datanode storage set, build a network topology and decide
       * over-utilized storages, above average utilized storages, 
       * below average utilized storages, and underutilized storages. 
       * The input datanode storage set is shuffled in order to randomize
       * to the storage matching later on.
       *
       * @return the number of bytes needed to move in order to balance the cluster.
       */
      private long init(List<DatanodeStorageReport> reports) {
        // compute average utilization
        for (DatanodeStorageReport r : reports) {
          policy.accumulateSpaces(r);
        }
        policy.initAvgUtilization();
    
        // create network topology and classify utilization collections: 
        //   over-utilized, above-average, below-average and under-utilized.
        long overLoadedBytes = 0L, underLoadedBytes = 0L;
        for(DatanodeStorageReport r : reports) {
          final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
          for(StorageType t : StorageType.getMovableTypes()) {
            final Double utilization = policy.getUtilization(r, t);
            if (utilization == null) { // datanode does not have such storage type 
              continue;
            }
            
            final long capacity = getCapacity(r, t);
            //DataNode使用率与集群平均使用率差值
            final double utilizationDiff = utilization - policy.getAvgUtilization(t);
            //DataNode utilizationDiff与阈值的差值
            final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
            final long maxSize2Move = computeMaxSize2Move(capacity,
                getRemaining(r, t), utilizationDiff, threshold);
    
            final StorageGroup g;
            if (utilizationDiff > 0) {
              final Source s = dn.addSource(t, maxSize2Move, dispatcher);
              if (thresholdDiff <= 0) { // within threshold
                aboveAvgUtilized.add(s);
              } else {
                overLoadedBytes += precentage2bytes(thresholdDiff, capacity);
                overUtilized.add(s);
              }
              g = s;
            } else {
              g = dn.addTarget(t, maxSize2Move);
              if (thresholdDiff <= 0) { // within threshold
                belowAvgUtilized.add(g);
              } else {
                underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
                underUtilized.add(g);
              }
            }
            dispatcher.getStorageGroupMap().put(g);
          }
        }
    
        logUtilizationCollections();
        
        Preconditions.checkState(dispatcher.getStorageGroupMap().size()
            == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
               + belowAvgUtilized.size(),
            "Mismatched number of storage groups");
        
        // return number of bytes to be moved in order to make the cluster balanced
        return Math.max(overLoadedBytes, underLoadedBytes);
      }
    
    4. BalancingPolicy.Node
        void accumulateSpaces(DatanodeStorageReport r) {
          for(StorageReport s : r.getStorageReports()) {
            final StorageType t = s.getStorage().getStorageType();
            //DataNode总空间
            totalCapacities.add(t, s.getCapacity());
            //DataNode已使用空间
            totalUsedSpaces.add(t, s.getDfsUsed());
          }
        }
    
      void initAvgUtilization() {
        for(StorageType t : StorageType.asList()) {
          final long capacity = totalCapacities.get(t);
          if (capacity > 0L) {
            //计算平均存储使用率
            final double avg  = totalUsedSpaces.get(t)*100.0/capacity;
            avgUtilizations.set(t, avg);
          }
        }
      }
    

    相关文章

      网友评论

          本文标题:HDFS Balancer源码阅读

          本文链接:https://www.haomeiwen.com/subject/huyjpftx.html