美文网首页
BlockManager源码分析(2)--redundancyT

BlockManager源码分析(2)--redundancyT

作者: 小北觅 | 来源:发表于2020-08-21 15:10 被阅读0次

接上一篇挖的坑:BlockManager源码分析(1)--PendingReconstructionMonitor

本文学习redundancyThread

首先看一下redundancyThread的类型,是个Daemon类型,传入的参数是RedundancyMonitor。

所以我们直接去RedundancyMonitor的run方法里看:

在上一篇BlockManager源码分析(1)中我们已经初步介绍了这个方法的逻辑,当时主要是逆推回computeDatanodeWork这个方法。接下来我们介绍processPendingReconstructions和rescanPostponedMisreplicatedBlocks这两个方法。

1. processPendingReconstructions()

此方法的功能是处理待复制的数据块。点进去看一下:

方法注释写道:如果有重建请求超时了,那就把他们重新放到neededReconstruction队列里。可见上一篇文章中的timedOutItems中的块,会被RedundancyMonitor线程处理。方法的逻辑也是比较简单易懂:遍历timeedOutItems,从blocksMap里找到对应的数据块信息,然后判断是否需要重建,如果需要就把重建相关信息添加到neededReconstruction里。

2. rescanPostponedMisreplicatedBlocks()

下面看第二个方法。方法名是rescanPostponedMisreplicatedBlocks,意为重新扫描之前推迟处理的副本数不正确的块。此方法里大量使用了postponedMisreplicatedBlocks这个队列,这个队列的作用是这样的:当Namenode 进行failover的时候,多余的块(over-replicated)是不能被处理的,直到这个数据块的所有副本所在的Datanode都向新的Active节点进行了块汇报。这么做的原因是防止发生数据块丢失。引自《Hadoop2.x HDFS源码深度剖析》的图:

代码如下:

 
/**
   * Rescan the list of blocks which were previously postponed.
   */
  void rescanPostponedMisreplicatedBlocks() {
    if (getPostponedMisreplicatedBlocksCount() == 0) {
      return;
    }
    namesystem.writeLock();
    long startTime = Time.monotonicNow();
    long startSize = postponedMisreplicatedBlocks.size();
    try {
      Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
      for (int i=0; i < blocksPerPostpondedRescan && it.hasNext(); i++) {
        Block b = it.next();
        it.remove();

        BlockInfo bi = getStoredBlock(b);
        if (bi == null) {
          LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
              "Postponed mis-replicated block {} no longer found " +
              "in block map.", b);
          continue;
        }
        //核心方法,处理MisReplicatedBlock.
        MisReplicationResult res = processMisReplicatedBlock(bi);
        LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
            "Re-scanned block {}, result is {}", b, res);
        //如果处理的返回值是postpone,也就是现在还是处理不了,
        //就添加到rescannedMisreplicatedBlocks队列里
        if (res == MisReplicationResult.POSTPONE) {
          rescannedMisreplicatedBlocks.add(b);
        }
      }
    } finally {
      //把rescannedMisreplicatedBlocks中的未处理的块重新添加回postponedMisreplicatedBlocks
      //等待下一个周期继续再次处理
      postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks);
      rescannedMisreplicatedBlocks.clear();
      long endSize = postponedMisreplicatedBlocks.size();
      namesystem.writeUnlock();
      LOG.info("Rescan of postponedMisreplicatedBlocks completed in {}" +
          " msecs. {} blocks are left. {} blocks were removed.",
          (Time.monotonicNow() - startTime), endSize, (startSize - endSize));
    }
  }

可以看到该方法中调用的核心方法是processMisReplicatedBlock,来处理postponedMisreplicatedBlocks中得到的数据块。所以我们进到processMisReplicatedBlock方法:


 /**
   * Process a single possibly misreplicated block. This adds it to the
   * appropriate queues if necessary, and returns a result code indicating
   * what happened with it.
   */
  //doc里写了,处理misreplicated block,把它加到恰当的队列里,
  //同时返回一个结果码表明它发生了什么操作
  private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
    //如果块被删除了,那就添加到invalidateBlocks队列,等待删除
    if (block.isDeleted()) {
      // block does not belong to any file
      addToInvalidates(block);
      return MisReplicationResult.INVALID;
    }
    //如果块还没构建完,则不用处理,直接返回状态
    if (!block.isComplete()) {
      // Incomplete blocks are never considered mis-replicated --
      // they'll be reached when they are completed or recovered.
      return MisReplicationResult.UNDER_CONSTRUCTION;
    }
    // calculate current redundancy
    //计算期望的冗余数和当前存活的副本数,判断是否需要重建,
    //如果需要那么就放到neededReconstruction队列里。
    //并且返回UNDER_REPLICATED状态
    short expectedRedundancy = getExpectedRedundancyNum(block);
    NumberReplicas num = countNodes(block);
    final int numCurrentReplica = num.liveReplicas();
    // add to low redundancy queue if need to be
    if (isNeededReconstruction(block, num)) {
      if (neededReconstruction.add(block, numCurrentReplica,
          num.readOnlyReplicas(), num.outOfServiceReplicas(),
          expectedRedundancy)) {
        return MisReplicationResult.UNDER_REPLICATED;
      }
    }
    //判断是否副本数超过了配置的期望的副本数,如果是就进行处理。
    if (shouldProcessExtraRedundancy(num, expectedRedundancy)) {
      //如果有一个副本是stale的,那就不能操作,防止数据块丢失
      //返回POSTPONE后又会被上一层函数加入到这个队列,等待下一次处理。
      if (num.replicasOnStaleNodes() > 0) {
        // If any of the replicas of this block are on nodes that are
        // considered "stale", then these replicas may in fact have
        // already been deleted. So, we cannot safely act on the
        // over-replication until a later point in time, when
        // the "stale" nodes have block reported.
        return MisReplicationResult.POSTPONE;
      }
      
      // extra redundancy block
      //处理副本数超过配置的数据块
      processExtraRedundancyBlock(block, expectedRedundancy, null, null);
      return MisReplicationResult.OVER_REPLICATED;
    }
    
    return MisReplicationResult.OK;
  }

整体的逻辑不复杂,下面主要看处理副本数超过配置的数据块的方法:processExtraRedundancyBlock

代码如下:

 
/**
   * Find how many of the containing nodes are "extra", if any.
   * If there are any extras, call chooseExcessRedundancies() to
   * mark them in the excessRedundancyMap.
   */
   //查看多少个node是"extra"的,如果有,调用chooseExcessRedundancies
   //把他们放进excessRedundancyMap。
  private void processExtraRedundancyBlock(final BlockInfo block,
      final short replication, final DatanodeDescriptor addedNode,
      DatanodeDescriptor delNodeHint) {
    assert namesystem.hasWriteLock();
    if (addedNode == delNodeHint) {
      delNodeHint = null;
    }
    Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
    Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
        .getNodes(block);
    for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
      if (storage.getState() != State.NORMAL) {
        continue;
      }
      final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
      if (storage.areBlockContentsStale()) {
        LOG.trace("BLOCK* processExtraRedundancyBlock: Postponing {}"
            + " since storage {} does not yet have up-to-date information.",
            block, storage);
        postponeBlock(block);
        return;
      }
      if (!isExcess(cur, block)) {
       //如果副本数没超,就进到这个if里
        if (cur.isInService()) {
          // exclude corrupt replicas
          if (corruptNodes == null || !corruptNodes.contains(cur)) {
            nonExcess.add(storage);
          }
        }
      }
    }
    //核心方法
    chooseExcessRedundancies(nonExcess, block, replication, addedNode,
        delNodeHint);
  }

这个方法会循环判断传入的block的DatanodeStorageInfo的状态,如果不是Normal状态就判断下一个storage。接着判断是不是stale状态,如果是stale状态,添加到postpone队列。如果没有过多副本,那就放到nonExcess队列中。接着调用chooseExcessRedundancies方法。

走到chooseExcessRedundancies方法里,注意传入的最后两个参数都是null:

note:BlockStoragePolicy这个类是一种用来描述一个数据块的副本如何选择存储策略的类。StorageType类用来表示Datanode的存储类型,比如SSD,DISK,RAM_DISK等。

chooseExcessRedundancies方法的目的就是选出要删除的多余的存储类型。
if和else分别处理EC块和普通连续块的。我们这里关注普通块。也就是else里的逻辑。

根据复杂的块副本存放策略,找到需要删除的storage。也就是excessTypes变量。最后调用chooseExcessRedundancyContiguous。

看下chooseExcessRedundancyContiguous方法,其实删除副本的策略和他这个注释我没看懂,功力目前太差。留个坑,以后看。


 /**
   * We want sufficient redundancy for the block, but we now have too many.
   * In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
   *
   * srcNodes.size() - dstNodes.size() == replication
   *
   * We pick node that make sure that replicas are spread across racks and
   * also try hard to pick one with least free space.
   * The algorithm is first to pick a node with least free space from nodes
   * that are on a rack holding more than one replicas of the block.
   * So removing such a replica won't remove a rack. 
   * If no such a node is available,
   * then pick a node with least free space
   */
  private void chooseExcessRedundancyContiguous(
      final Collection<DatanodeStorageInfo> nonExcess, BlockInfo storedBlock,
      short replication, DatanodeDescriptor addedNode,
      DatanodeDescriptor delNodeHint, List<StorageType> excessTypes) {
    BlockPlacementPolicy replicator = placementPolicies.getPolicy(CONTIGUOUS);
    //根本块放置策略找到将要删除的副本
    List<DatanodeStorageInfo> replicasToDelete = replicator
        .chooseReplicasToDelete(nonExcess, nonExcess, replication, excessTypes,
            addedNode, delNodeHint);
    for (DatanodeStorageInfo chosenReplica : replicasToDelete) {
      //处理选择的多余的冗余副本(实质上就是把副本加到对应的队列中)
      processChosenExcessRedundancy(nonExcess, chosenReplica, storedBlock);
    }
  }

接着看processChosenExcessRedundancy方法:

  private void processChosenExcessRedundancy(
      final Collection<DatanodeStorageInfo> nonExcess,
      final DatanodeStorageInfo chosen, BlockInfo storedBlock) {
    nonExcess.remove(chosen);
     //添加到多余副本的Map中
    excessRedundancyMap.add(chosen.getDatanodeDescriptor(), storedBlock);
    //
    // The 'excessblocks' tracks blocks until we get confirmation
    // that the datanode has deleted them; the only way we remove them
    // is when we get a "removeBlock" message.
    //
    // The 'invalidate' list is used to inform the datanode the block
    // should be deleted.  Items are removed from the invalidate list
    // upon giving instructions to the datanodes.
    //
    final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen);
    //添加到invalidateBlocks中,等待发给datanode删除块。
    addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor());
    blockLog.debug("BLOCK* chooseExcessRedundancies: "
        + "({}, {}) is added to invalidated blocks set", chosen, storedBlock);
  }

至此redundancyThread的内容学习完了。
挖了个坑儿:删除超量副本时的选择策略。在网上大致搜了一下原理:
https://zhuanlan.zhihu.com/p/114632147
当块副本过量时,NameNode会选择删除一份副本。NameNode更倾向不减少托管副本的机架数量,接着倾向从有最小磁盘可用空间的DataNode上删除副本。目标是在不减少块可用性的情况下平衡DataNode的存储利用率。

NameNode也确保,一个块的所有副本不会在同一个机架上。如果检测到这个情况,NameNode以副本数不足对待,并使用相同的策略复制块,存放到不同的机架。在NameNode收到副本已创建的通知后,这个块的副本就过量了。然后NameNode就决定删除一个旧备份,因为策略倾向不减少机架数量。

END!
毁灭吧,赶紧的,累了。

相关文章

网友评论

      本文标题:BlockManager源码分析(2)--redundancyT

      本文链接:https://www.haomeiwen.com/subject/ofidjktx.html