美文网首页
BlockManager源码分析(1)--PendingReco

BlockManager源码分析(1)--PendingReco

作者: 小北觅 | 来源:发表于2020-08-19 20:02 被阅读0次

    基于hadoop-3.1.0
    分享一波大数据&Java的学习视频和书籍:
    ### Java与大数据资源分享

    BlockManager类是Namenode端管理数据块的非常重要的类。本文就来对它进行剖析。

    首先看BlockManager的构造方法被谁调用:

    进到FSNamesystem的构造方法中。然后在FSNS类中找调用成员变量blockManager的地方。发现只有activate有激活启动的意思,于是去activate方法:

    点进去到BlockManager#activate方法:

    果不其然,activate方法里启动了各种线程,以及注册metric指标等工作。
    从其中调用的方法来看,启动了pendingReconstruction,redundancyThread,storageInfoDefragmenterThread,blockReportThread,注册metric指标,设置blockManager的safemode状态等。

    那我们接下来的分析任务就是这几个线程。本篇文章首先分析第一个,pendingReconstruction。其他的挖坑后面学习。
    pendingReconstruction是PendingReconstructionBlocks类的对象。

    看一下PendingReconstructionBlocks的javadoc:

    这个类主要是对数据块进行一些记账工作。类似于Block可能存放在那个Datanode上这种。

    进到PendingReconstructionBlocks类的start方法:

    其实最终调用的是PendingReconstructionMonitor的run方法,点进去看:

    PendingReconstructionMonitor是一个周期性的线程,用来扫描没完成重建数据块请求的数据块。

    run方法中调用了pendingReconstructionCheck()方法:

    pendingReconstructionCheck方法的逻辑比较简单,就是去检测pendingReconstructions这个数据结构里保存的所有数据复制请求是不是超时(
    默认5分钟),如果超时了就放到timedOutItems这个ArrayList里。最后更新相关metric指标。

    这里我们遇到了两个数据结构:pendingReconstructions,timedOutItems。看一下这两个数据结构是什么样的:

    pendingReconstructions是Map结构,key是BlockInfo,value是PendingBlockInfo。

    timedOutItems是一个ArrayList,里面存放BlockInfo类型的数据。

    所以要搞懂这两个数据结构存储的内容是什么,就得去了解BlockInfo和PendingBlockInfo这两个类。这里简单概括:BlockInfo保存了一个block属于哪个文件INode(通过BlockCollection对象)和存储在那些Datanode里。PendingBlockInfo(数据块复制信息)是对数据块开始复制时间timeStamp、待复制的目标数据节点列表List<DatanodeDescriptor>实例targets的一个封装。

    了解完这两个数据结构的存储内容,我们就来看看什么情况下把数据放入这两个数据结构。

    1.pendingReconstructions

    首先看pendingReconstructions,因为是个Map结构,所以我们看看谁调用了put方法。

    点进去是increment方法:

    这个方法首先尝试从pendingReconstructions根据key获取PendingBlockInfo,如果没有,就调用put。如果发现了原来有这个复制请求,则更新targets和timestamp。

    再来看谁调用了increment方法:


    这里面第一个方法是增量块汇报相关的,我们以后再分析,所以我们看第二个方法validateReconstructionWork。讲这个方法之前说句题外话,我耐不住性子,一层一层往上点,最后发现调用逻辑是在BlockManage的内部类RedundancyMonitor的run方法里。所以说,块待复制请求和冗余监测线程有关系,比如冗余监测线程检测到某些数据块的副本数不满足最小副本条件,就会去调用相关方法添加块复制请求。

    好的,回过头来看validateReconstructionWork方法,这个方法主要的功能是:验证传入的复制任务参数中的数据块的副本数的状态,然后更新neededReconstruction结构和pendingReconstruction结构,也即把复制任务从neededReconstruction中删除或者添加到pendingReconstruction中。

    接着看谁调用了这个方法(validateReconstructionWork),点进来就一处地方:

    发现是computeReconstructionWorkForBlocks方法,这个方法注释写到:把一些数据块重现到full strength(满员)的状态,通过复制或者EC的方法。这个方法的参数是List<List<BlockInfo>>类型,最外层的List代表优先级的意思,因为数据块复制任务也是有优先级的,比如当前就1个副本的块很可能会丢失,所以针对这种块的复制任务它的优先级就要比当前有2个副本的块的复制任务优先级高。这个也留个坑,后续系列会讲解复制任务的优先级。

    逻辑也是非常简单:
    step1:根据优先级遍历需要生成复制任务的blocks,同时生成复制任务到reconWork这个List里。

    step2:为reconWork里的每个复制任务根据副本放置策略选择目标datanode。

    step3:就是调用我们上面提到的validateReconstructionWork方法验证复制任务。

    最后面的代码就是打印一些log信息。

    代码如下:

     /**
       * Reconstruct a set of blocks to full strength through replication or
       * erasure coding
       *
       * @param blocksToReconstruct blocks to be reconstructed, for each priority
       * @return the number of blocks scheduled for replication
       */
      @VisibleForTesting
      int computeReconstructionWorkForBlocks(
          List<List<BlockInfo>> blocksToReconstruct) {
        int scheduledWork = 0;
        List<BlockReconstructionWork> reconWork = new LinkedList<>();
    
        // Step 1: categorize at-risk blocks into replication and EC tasks
        namesystem.writeLock();
        try {
          synchronized (neededReconstruction) {
             //根据优先级遍历需要生成复制任务的blocks
            for (int priority = 0; priority < blocksToReconstruct
                .size(); priority++) {
              for (BlockInfo block : blocksToReconstruct.get(priority)) {
                BlockReconstructionWork rw = scheduleReconstruction(block,
                    priority);
                if (rw != null) {
                  reconWork.add(rw);
                }
              }
            }
          }
        } finally {
          namesystem.writeUnlock();
        }
    
        // Step 2: choose target nodes for each reconstruction task
        final Set<Node> excludedNodes = new HashSet<>();
        for(BlockReconstructionWork rw : reconWork){
          // Exclude all of the containing nodes from being targets.
          // This list includes decommissioning or corrupt nodes.
          excludedNodes.clear();
          for (DatanodeDescriptor dn : rw.getContainingNodes()) {
            excludedNodes.add(dn);
          }
    
          // choose replication targets: NOT HOLDING THE GLOBAL LOCK
          final BlockPlacementPolicy placementPolicy =
              placementPolicies.getPolicy(rw.getBlock().getBlockType());
          rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes);
        }
    
        // Step 3: add tasks to the DN
        namesystem.writeLock();
        try {
          for(BlockReconstructionWork rw : reconWork){
            final DatanodeStorageInfo[] targets = rw.getTargets();
            if(targets == null || targets.length == 0){
              rw.resetTargets();
              continue;
            }
    
            synchronized (neededReconstruction) {
              if (validateReconstructionWork(rw)) {
                scheduledWork++;
              }
            }
          }
        } finally {
          namesystem.writeUnlock();
        }
    
        if (blockLog.isDebugEnabled()) {
          // log which blocks have been scheduled for reconstruction
          for(BlockReconstructionWork rw : reconWork){
            DatanodeStorageInfo[] targets = rw.getTargets();
            if (targets != null && targets.length != 0) {
              StringBuilder targetList = new StringBuilder("datanode(s)");
              for (DatanodeStorageInfo target : targets) {
                targetList.append(' ');
                targetList.append(target.getDatanodeDescriptor());
              }
              blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNodes(),
                  rw.getBlock(), targetList);
            }
          }
    
          blockLog.debug(
              "BLOCK* neededReconstruction = {} pendingReconstruction = {}",
              neededReconstruction.size(), pendingReconstruction.size());
        }
    
        return scheduledWork;
      }
    

    computeReconstructionWorkForBlocks这个方法又被computeBlockReconstructionWork调用。

    代码的逻辑是:根据传入的processToChoose参数(这个参数可配置),选择processToChoose个低冗余副本,传入computeReconstructionWorkForBlocks生成复制任务。

    这个方法又会被computeDatanodeWork调用。这个方法的功能是:计算数据块复制和数据块删除的work,并且在下次heartbeat的时候通知到datanode。代码如下:

    红线框起来的地方分别是计算复制块任务、计算删除块任务。最后返回生成的总任务数。

    这个方法最终最终最终最终被RedundancyMonitor线程类的run方法调用,终于来到最后了。

    run方法的主要工作是:周期性的(redundancyRecheckIntervalMs)调用红框里的二个方法。生成数据块复制任务、处理数据块复制任务、重新扫描之前被推迟处理的数据块。

    至此pendingReconstructions这个数据结构的这条线我们就分析完了,为了理解深刻,可以再反向看一遍,即从run方法看到最初的方法。

    2.timedOutItems

    接下来是第二个数据结构timedOutItems,主要用于存放数据块复制请求超时数据块,之所以设置这个结构,是因为肯定还有其他线程要把这里面的数据块再生成数据块复制的任务。(这里只是比喻,实际是把timedOutItems重新放入队列里)

    分析方法一样,先看谁调用了add方法。

    是pendingReconstructionCheck调用的。

    这个方法在上面介绍pendingReconstructions时分析过,当时我们没关注timeout,现在看一下,这里加了个timeout判断,如果数据块准备复制任务的时间戳+timeout < 当前时间。那么就说明超时了,就加入到timedOutItems中。

    回顾一下本篇文章留下的坑:

    • redundancyThread,storageInfoDefragmenterThread,blockReportThread
    • 数据块生成复制任务的优先级

    END!
    毁灭吧,赶紧的,累了。

    相关文章

      网友评论

          本文标题:BlockManager源码分析(1)--PendingReco

          本文链接:https://www.haomeiwen.com/subject/fhnrjktx.html