HDFS balance策略详解

作者: 伍柒大人的三言两语 | 来源:发表于2018-07-01 17:21 被阅读51次

    【前言】线上长时间运行的大规模Hadoop集群,各个datanode节点磁盘空间使用率经常会出现分布不均衡的情况,尤其在新增和下架节点、或者人为干预副本数量的时候。节点空间使用率不均匀会导致计算引擎频繁在跨节点拷贝数据(A节点上运行的task所需数据在其它节点上),引起不必要的耗时和带宽。另外,当部分节点空间使用率很高但未满(90%左右)时,分配在该节点上的task会存在任务失败的风险。因此,引入balance策略使集群中的节点空间使用率均匀分布必不可少。

    一. balancer命令详解

    hdfs --config /hadoop-client/conf balancer
    -threshold  10                    \\集群平衡的条件,datanode间磁盘使用率相差阈值,区间选择:0~100
    -policy datanode                  \\默认为datanode,datanode级别的平衡策略
    -exclude  -f  /tmp/ip1.txt        \\默认为空,指定该部分ip不参与balance, -f:指定输入为文件
    -include  -f  /tmp/ip2.txt        \\默认为空,只允许该部分ip参与balance,-f:指定输入为文件
     -idleiterations  5               \\迭代次数,默认为 5
    

    hdfs balance时datanode之间数据迁移的带宽设置(/hadoop-client/conf/hdfs-site.xml, 修改需重启hdfs):

    <property>
        <name>dfs.datanode.balance.bandwidthPerSec</name>
        <value>6250000</value>
    </property>
    <备注:6250000 / (1024 * 1024) = 6M/s>
    

    动态增大带宽(不需重启,需要切换到hdfs用户,不可设置太大,会占用mapreduce任务的带宽):

    hdfs dfsadmin -fs hdfs://${active-namenode-hostname}:8020 -setBalancerBandwidth 104857600
    

    balance脚本在满足以下任何一个条件都会自动退出:

     * The cluster is balanced;
     * No block can be moved;
     * No block has been moved for specified consecutive iterations (5 by default);
     * An IOException occurs while communicating with the namenode;
     * Another balancer is running.
    

    二. 源码解析

    源码路径:org.apache.hadoop.hdfs.server.balancer

    统计需要balance的datanode:

      private boolean shouldIgnore(DatanodeInfo dn) {
        // ignore decommissioned nodes (忽略已经下架的datanode)
        final boolean decommissioned = dn.isDecommissioned();
        // ignore decommissioning nodes(忽略正在下架的datanode)
        final boolean decommissioning = dn.isDecommissionInProgress();
        // ignore nodes in exclude list (忽略参数:-exclude配置的datanode)
        final boolean excluded = Util.isExcluded(excludedNodes, dn);
        // ignore nodes not in the include list (if include list is not empty)
        // (如果参数:-include不为空,忽略不在include列表里的datanode)
        final boolean notIncluded = !Util.isIncluded(includedNodes, dn);
    
        if (decommissioned || decommissioning || excluded || notIncluded) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("Excluding datanode " + dn
                + ": decommissioned=" + decommissioned
                + ", decommissioning=" + decommissioning
                + ", excluded=" + excluded
                + ", notIncluded=" + notIncluded);
          }
          return true;
        }
        return false;
      }
    

    集群平均使用率(计算公式):average = totalUsedSpaces * 100 / totalCapacities
    totalUsedSpaces:各datanode已使用空间(dfsUsed,不包含non dfsUsed)相加;
    totalCapacities:各datanode总空间(DataNode配置的服务器磁盘目录)相加;

      void initAvgUtilization() {
        for(StorageType t : StorageType.asList()) {
          final long capacity = totalCapacities.get(t);
          if (capacity > 0L) {
            final double avg  = totalUsedSpaces.get(t)*100.0/capacity;
            avgUtilizations.set(t, avg);
          }
        }
      }
    

    单个datanode使用率:utilization = dfsUsed * 100.0 / capacity
    dfsUsed:当前datanode dfs(dfsUsed,不包含non dfsUsed)已使用空间;
    capacity:当前datanode(DataNode配置的服务器磁盘目录)总空间;

        Double getUtilization(DatanodeStorageReport r, final StorageType t) {
          long capacity = 0L;
          long dfsUsed = 0L;
          for(StorageReport s : r.getStorageReports()) {
            if (s.getStorage().getStorageType() == t) {
              capacity += s.getCapacity();
              dfsUsed += s.getDfsUsed();
            }
          }
          return capacity == 0L? null: dfsUsed*100.0/capacity;
        }
    

    单个datanode使用率与集群平均使用率差值:utilizationDiff = utilization - average
    单个datanode utilizationDiff与阈值的差值: thresholdDiff = |utilizationDiff| - threshold

    需要迁移或者可以迁入的空间:maxSize2Move = |utilizationDiff| * capacity

    可以迁入的空间计算:Math.min(remaining, maxSizeToMove)
    需要迁移的空间计算:Math.min(max, maxSizeToMove)
    remaining:datanode节点剩余空间
    max:默认单个datanode单次balance迭代可以迁移的最大空间限制,缺省10G)
    默认迭代次数为5,即运行一次balance脚本,单个datanode可以最大迁移的空间为:5*10G = 50G

        for(DatanodeStorageReport r : reports) {
          final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
          final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo());
          for(StorageType t : StorageType.getMovableTypes()) {
            final Double utilization = policy.getUtilization(r, t);
            if (utilization == null) { // datanode does not have such storage type 
              continue;
            }
            
            final double average = policy.getAvgUtilization(t);
            if (utilization >= average && !isSource) {
              LOG.info(dn + "[" + t + "] has utilization=" + utilization
                  + " >= average=" + average
                  + " but it is not specified as a source; skipping it.");
              continue;
            }
    
            final double utilizationDiff = utilization - average;
            final long capacity = getCapacity(r, t);
            final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
            final long maxSize2Move = computeMaxSize2Move(capacity,
                getRemaining(r, t), utilizationDiff, maxSizeToMove);
    
            final StorageGroup g;
            if (utilizationDiff > 0) {
              final Source s = dn.addSource(t, maxSize2Move, dispatcher);
              if (thresholdDiff <= 0) { // within threshold
                aboveAvgUtilized.add(s);
              } else {
                overLoadedBytes += percentage2bytes(thresholdDiff, capacity);
                overUtilized.add(s);
              }
              g = s;
            } else {
              g = dn.addTarget(t, maxSize2Move);
              if (thresholdDiff <= 0) { // within threshold
                belowAvgUtilized.add(g);
              } else {
                underLoadedBytes += percentage2bytes(thresholdDiff, capacity);
                underUtilized.add(g);
              }
            }
            dispatcher.getStorageGroupMap().put(g);
          }
    

    差值判断后datanode的保存队列:

    overUtilized:utilizationDiff > 0 && thresholdDiff > 0        <使用率超过平均值,且差值大于阈值>
    aboveAvgUtilized:utilizationDiff > 0 && thresholdDiff <= 0   <使用率超过平均值,且差值小于等于阈值>
    belowAvgUtilized:utilizationDiff < 0 && thresholdDiff <= 0   <使用率低于平均值,且差值小于等于阈值>
    underUtilized:utilizationDiff > 0 && thresholdDiff > 0       <使用率低于平均值,且差值大于等于阈值>
    

    需要迁移数据的节点(Source类型): overUtilized, aboveAvgUtilized
    能够迁入数据的节点(Target类型): underUtilized, belowAvgUtilized

    数据迁移配对(原则:1. 优先为同机架,其次为其它机架; 2. 一对多配对):
    第一步[Source -> Target]:each overUtilized datanode => one or more underUtilized datanodes
    第二步[Source -> Target]:match each remaining overutilized datanode => one or more belowAvgUtilized datanodes
    第三步[Target -> Source]:each remaining underutilized datanode (step 1未和overUtilized匹配过) => one or more aboveAvgUtilized datanodes

      /** Decide all <source, target> pairs according to the matcher. */
      private void chooseStorageGroups(final Matcher matcher) {
        /* first step: match each overUtilized datanode (source) to
         * one or more underUtilized datanodes (targets).
         */
        LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => underUtilized");
        chooseStorageGroups(overUtilized, underUtilized, matcher);
        
        /* match each remaining overutilized datanode (source) to 
         * below average utilized datanodes (targets).
         * Note only overutilized datanodes that haven't had that max bytes to move
         * satisfied in step 1 are selected
         */
        LOG.info("chooseStorageGroups for " + matcher + ": overUtilized => belowAvgUtilized");
        chooseStorageGroups(overUtilized, belowAvgUtilized, matcher);
    
        /* match each remaining underutilized datanode (target) to 
         * above average utilized datanodes (source).
         * Note only underutilized datanodes that have not had that max bytes to
         * move satisfied in step 1 are selected.
         */
        LOG.info("chooseStorageGroups for " + matcher + ": underUtilized => aboveAvgUtilized");
        chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher);
      }
    

    构建每一对<source, target>时,需要计算当前可以迁移或者迁入的空间大小。
    dispatcher创建dispatchExecutor线程池执行数据迁移调度。

      private void matchSourceWithTargetToMove(Source source, StorageGroup target) {
        long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove());
        final Task task = new Task(target, size);
        source.addTask(task);
        target.incScheduledSize(task.getSize());
        dispatcher.add(source, target);
        LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from "
            + source.getDisplayName() + " to " + target.getDisplayName());
      }
    

    【结语】
    1. 对于一些大型的HDFS集群(随时可能扩容或下架服务器),balance脚本需要作为后台常驻进程;
    2. 根据官方建议,脚本需要部署在相对空闲的服务器上;
    3. 停止脚本通过kill进程实现(建议不kill,后台运行完会自动停止,多次执行同时也只会有一个线程存在,其它自动失败);

    针对datanode存储维护,可以针对以下几个方向进行优化:
    * 通过参数(threshold)增加迭代次数,以增加datanode允许迁移的数据;   
    * 通过参数(exclude, include)设计合理的允许进行balance策略的服务器,比如将使用率最低(20%)和最高(20%)的进行balance策略;
    * 通过参数(threshold )设计合理的阈值;
    <备注:理想状态能够通过程序自动发现调整参数,无需人为介入>
    

    博客主页:https://www.jianshu.com/u/e97bb429f278

    相关文章

      网友评论

        本文标题:HDFS balance策略详解

        本文链接:https://www.haomeiwen.com/subject/cwxzyftx.html