美文网首页
【HDFS】--增量块汇报[IBR](1)

【HDFS】--增量块汇报[IBR](1)

作者: 小北觅 | 来源:发表于2021-03-12 18:24 被阅读0次

    前言:
    这是增量块汇报的第一篇文章,不讲述增量块汇报的NameNode侧的处理逻辑。主要介绍Datanode侧的逻辑。

    我将带着如下问题去阅读源码:
    1)什么时候会触发增量块汇报?
    2)发送增量块汇报的处理逻辑是什么?
    3)增量块汇报的内容是什么?

    官方文档上有个参数:
    dfs.blockreport.incremental.intervalMsec,默认值是0。单位ms。
    这个参数的描述信息如下:

    If set to a positive integer, the value in ms to wait between sending incremental block reports from the Datanode to the Namenode.
    如果这个参数的值设置为一个正整数,那么就代表每次DN向NN发送增量块汇报时要中间间隔一段时间。

    我们从逆向出发,在代码中搜索这个配置项,发现这个配置项最后是用来构造
    org.apache.hadoop.hdfs.server.datanode.BPServiceActor#ibrManager这个成员变量了。

    ibrManager这个成员变量的类型是IncrementalBlockReportManager。顾名思义,是增量块汇报管理者,用来统一管理增量块汇报的相关操作。

    进到IncrementalBlockReportManager这个类,看一下都有什么方法,看方法名跟发送增量块汇报相关的方法我在图中圈出来了:

    sendIBRs代码如下:

      /** Send IBRs to namenode. */
      void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration,
          String bpid) throws IOException {
        // Generate a list of the pending reports for each storage under the lock
        final StorageReceivedDeletedBlocks[] reports = generateIBRs();
        if (reports.length == 0) {
          // Nothing new to report.
          return;
        }
    
        // Send incremental block reports to the Namenode outside the lock
        if (LOG.isDebugEnabled()) {
          LOG.debug("call blockReceivedAndDeleted: " + Arrays.toString(reports));
        }
        boolean success = false;
        final long startTime = monotonicNow();
        try {
          namenode.blockReceivedAndDeleted(registration, bpid, reports);
          success = true;
        } finally {
    
          if (success) {
            dnMetrics.addIncrementalBlockReport(monotonicNow() - startTime);
            lastIBR = startTime;
          } else {
            // If we didn't succeed in sending the report, put all of the
            // blocks back onto our queue, but only in the case where we
            // didn't put something newer in the meantime.
            putMissing(reports);
          }
        }
      }
    

    关注一下putMissing方法:

    If we didn't succeed in sending the report, put all of the blocks back onto our queue, but only in the case where we didn't put something newer in the meantime.

    putMissing代码如下:

      //这个方法是个同步方法,会占用锁
      private synchronized void putMissing(StorageReceivedDeletedBlocks[] reports) {
        for (StorageReceivedDeletedBlocks r : reports) {
          pendingIBRs.get(r.getStorage()).putMissing(r.getBlocks());
        }
        if (reports.length > 0) {
          readyToSend = true;
        }
      }
    

    putMissing主要就是把IBR根据Block放入PerStorageIBR#blocks这个Map类型的成员变量中。当然这里面有个判断,如果blocks中已经有了相应的块信息,那么就忽略掉。因为此时blocks中保存的对应块的IBR信息才是最新的。也就对应了上面putMissing方法注释中说明的情况。

    sendImmediately方法的代码如下,这个方法的作用是判断是否需要立即发送IBR。用到了readyToSend变量、ibrInterval变量、lastIBR变量。这三个变量看下面的注释也很好理解。

    
      /**
       * If this flag is set then an IBR will be sent immediately by the actor
       * thread without waiting for the IBR timer to elapse.
       */
      private volatile boolean readyToSend = false;
    
      /** The time interval between two IBRs. */
      private final long ibrInterval;
    
      /** The timestamp of the last IBR. */
      private volatile long lastIBR;
    
      boolean sendImmediately() {
        return readyToSend && monotonicNow() - ibrInterval >= lastIBR;
      }
    

    triggerIBR代码如下,这个方法的功能是用来触发IBR。首先设置volatile变量readyToSend的值为true,然后根据传入的force参数判断是否强制触发,如果是强制触发,则更新lastIBR的值为monotonicNow() - ibrInterval,也就是说人为更改上一次IBR的时间用以超过配置的增量块汇报的间隔。然后根据上面提到的sendImmediately函数的返回值决定是否notify在等待的线程,这里抛出个问题后面解答,什么线程在等待呢?

      synchronized void triggerIBR(boolean force) {
        readyToSend = true;
        if (force) {
          lastIBR = monotonicNow() - ibrInterval;
        }
        if (sendImmediately()) {
          notifyAll();
        }
      }
    

    接下来解答上面的问题,什么线程wait了?
    刚才截图类的structure时,除了上面这个三个看起来像IBR的函数外, 还有一个waitTillNextIBR方法,继续顾名思义,wait直到下一次IBR。先来看一下这个方法的实现,然后再看一下这个方法的调用栈,分析一下是什么位置调用了这个函数。

      synchronized void waitTillNextIBR(long waitTime) {
        if (waitTime > 0 && !sendImmediately()) {
          try {
            wait(ibrInterval > 0 && ibrInterval < waitTime? ibrInterval: waitTime);
          } catch (InterruptedException ie) {
            LOG.warn(getClass().getSimpleName() + " interrupted");
          }
        }
      }
    

    wait函数里用了条件判断表达式,这里的ibrInterval我们按照默认值0来计算,所以直接取传入的waitTime参数的值。如果说传入waitTime大于0并且不需要立即发送IBR,那么线程就进行wait,时间为waitTime(ms)。

    接着看waitTillNextIBR的调用点,来到了org.apache.hadoop.hdfs.server.datanode.BPServiceActor#offerService方法里。这个方法是BPServiceActor类的run方法中调用的方法,只要DN运行,这个offerService方法就不断循环的执行。offerService方法中按顺序进行heartbeat、增量块汇报、全量块汇报操作。

    到这里基本上可以回答文章开头的两个问题了:

    DataNode的BPServiceActor线程在DN启动后会一直执行,不断循环的发送heartbeat、IBR(增量块汇报)、FBR(全量块汇报)。然后根据增量块汇报会更新各种记录时间的变量用来辅助调用IBR。比如强制触发IBR等。

    还有一个问题是:IBR中的内容是什么?回答这个问题需要看generateIBRs方法:

    可以看到,本质上发送的内容来自于pendingIBRs这个Map,此Map的key是DatanodeStorage代表了Datanode的一个Storage,可以理解为一个磁盘;此Map的value是PerStorageIBR,代表了每个Storage上的IBR。PerStorageIBR这个类我们前面遇到过,还得他的blocks变量么?就是用来保存新增的IBR和处理失败的IBR的那个blocks呀!

    好,本文完。下一篇将学习IBR在NN侧的处理过程。

    相关文章

      网友评论

          本文标题:【HDFS】--增量块汇报[IBR](1)

          本文链接:https://www.haomeiwen.com/subject/bcfzqltx.html