前言:
这是增量块汇报的第一篇文章,不讲述增量块汇报的NameNode侧的处理逻辑。主要介绍Datanode侧的逻辑。
我将带着如下问题去阅读源码:
1)什么时候会触发增量块汇报?
2)发送增量块汇报的处理逻辑是什么?
3)增量块汇报的内容是什么?
官方文档上有个参数:
dfs.blockreport.incremental.intervalMsec
,默认值是0。单位ms。
这个参数的描述信息如下:
If set to a positive integer, the value in ms to wait between sending incremental block reports from the Datanode to the Namenode.
如果这个参数的值设置为一个正整数,那么就代表每次DN向NN发送增量块汇报时要中间间隔一段时间。
我们从逆向出发,在代码中搜索这个配置项,发现这个配置项最后是用来构造
org.apache.hadoop.hdfs.server.datanode.BPServiceActor#ibrManager
这个成员变量了。
ibrManager这个成员变量的类型是IncrementalBlockReportManager。顾名思义,是增量块汇报管理者,用来统一管理增量块汇报的相关操作。
进到IncrementalBlockReportManager这个类,看一下都有什么方法,看方法名跟发送增量块汇报相关的方法我在图中圈出来了:

sendIBRs代码如下:
/** Send IBRs to namenode. */
void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration,
String bpid) throws IOException {
// Generate a list of the pending reports for each storage under the lock
final StorageReceivedDeletedBlocks[] reports = generateIBRs();
if (reports.length == 0) {
// Nothing new to report.
return;
}
// Send incremental block reports to the Namenode outside the lock
if (LOG.isDebugEnabled()) {
LOG.debug("call blockReceivedAndDeleted: " + Arrays.toString(reports));
}
boolean success = false;
final long startTime = monotonicNow();
try {
namenode.blockReceivedAndDeleted(registration, bpid, reports);
success = true;
} finally {
if (success) {
dnMetrics.addIncrementalBlockReport(monotonicNow() - startTime);
lastIBR = startTime;
} else {
// If we didn't succeed in sending the report, put all of the
// blocks back onto our queue, but only in the case where we
// didn't put something newer in the meantime.
putMissing(reports);
}
}
}
关注一下putMissing方法:
If we didn't succeed in sending the report, put all of the blocks back onto our queue, but only in the case where we didn't put something newer in the meantime.
putMissing代码如下:
//这个方法是个同步方法,会占用锁
private synchronized void putMissing(StorageReceivedDeletedBlocks[] reports) {
for (StorageReceivedDeletedBlocks r : reports) {
pendingIBRs.get(r.getStorage()).putMissing(r.getBlocks());
}
if (reports.length > 0) {
readyToSend = true;
}
}
putMissing主要就是把IBR根据Block放入PerStorageIBR#blocks这个Map类型的成员变量中。当然这里面有个判断,如果blocks中已经有了相应的块信息,那么就忽略掉。因为此时blocks中保存的对应块的IBR信息才是最新的。也就对应了上面putMissing方法注释中说明的情况。
sendImmediately方法的代码如下,这个方法的作用是判断是否需要立即发送IBR。用到了readyToSend变量、ibrInterval变量、lastIBR变量。这三个变量看下面的注释也很好理解。
/**
* If this flag is set then an IBR will be sent immediately by the actor
* thread without waiting for the IBR timer to elapse.
*/
private volatile boolean readyToSend = false;
/** The time interval between two IBRs. */
private final long ibrInterval;
/** The timestamp of the last IBR. */
private volatile long lastIBR;
boolean sendImmediately() {
return readyToSend && monotonicNow() - ibrInterval >= lastIBR;
}
triggerIBR代码如下,这个方法的功能是用来触发IBR。首先设置volatile变量readyToSend的值为true,然后根据传入的force参数判断是否强制触发,如果是强制触发,则更新lastIBR的值为monotonicNow() - ibrInterval,也就是说人为更改上一次IBR的时间用以超过配置的增量块汇报的间隔。然后根据上面提到的sendImmediately函数的返回值决定是否notify在等待的线程,这里抛出个问题后面解答,什么线程在等待呢?
synchronized void triggerIBR(boolean force) {
readyToSend = true;
if (force) {
lastIBR = monotonicNow() - ibrInterval;
}
if (sendImmediately()) {
notifyAll();
}
}
接下来解答上面的问题,什么线程wait了?
刚才截图类的structure时,除了上面这个三个看起来像IBR的函数外, 还有一个waitTillNextIBR方法,继续顾名思义,wait直到下一次IBR。先来看一下这个方法的实现,然后再看一下这个方法的调用栈,分析一下是什么位置调用了这个函数。
synchronized void waitTillNextIBR(long waitTime) {
if (waitTime > 0 && !sendImmediately()) {
try {
wait(ibrInterval > 0 && ibrInterval < waitTime? ibrInterval: waitTime);
} catch (InterruptedException ie) {
LOG.warn(getClass().getSimpleName() + " interrupted");
}
}
}
wait函数里用了条件判断表达式,这里的ibrInterval我们按照默认值0来计算,所以直接取传入的waitTime参数的值。如果说传入waitTime大于0并且不需要立即发送IBR,那么线程就进行wait,时间为waitTime(ms)。
接着看waitTillNextIBR的调用点,来到了org.apache.hadoop.hdfs.server.datanode.BPServiceActor#offerService
方法里。这个方法是BPServiceActor类的run方法中调用的方法,只要DN运行,这个offerService方法就不断循环的执行。offerService方法中按顺序进行heartbeat、增量块汇报、全量块汇报操作。

到这里基本上可以回答文章开头的两个问题了:
DataNode的BPServiceActor线程在DN启动后会一直执行,不断循环的发送heartbeat、IBR(增量块汇报)、FBR(全量块汇报)。然后根据增量块汇报会更新各种记录时间的变量用来辅助调用IBR。比如强制触发IBR等。
还有一个问题是:IBR中的内容是什么?回答这个问题需要看generateIBRs方法:

可以看到,本质上发送的内容来自于pendingIBRs这个Map,此Map的key是DatanodeStorage代表了Datanode的一个Storage,可以理解为一个磁盘;此Map的value是PerStorageIBR,代表了每个Storage上的IBR。PerStorageIBR这个类我们前面遇到过,还得他的blocks变量么?就是用来保存新增的IBR和处理失败的IBR的那个blocks呀!

好,本文完。下一篇将学习IBR在NN侧的处理过程。
网友评论