接上一篇挖的坑:BlockManager源码分析(1)--PendingReconstructionMonitor
本文学习redundancyThread
![](https://img.haomeiwen.com/i5679451/a937e18280ad0351.png)
首先看一下redundancyThread的类型,是个Daemon类型,传入的参数是RedundancyMonitor。
![](https://img.haomeiwen.com/i5679451/f5155a502c31c76e.png)
所以我们直接去RedundancyMonitor的run方法里看:
![](https://img.haomeiwen.com/i5679451/1452e09ef44f444d.png)
在上一篇BlockManager源码分析(1)中我们已经初步介绍了这个方法的逻辑,当时主要是逆推回computeDatanodeWork这个方法。接下来我们介绍processPendingReconstructions和rescanPostponedMisreplicatedBlocks这两个方法。
1. processPendingReconstructions()
此方法的功能是处理待复制的数据块。点进去看一下:
![](https://img.haomeiwen.com/i5679451/80915af2b78f450d.png)
方法注释写道:如果有重建请求超时了,那就把他们重新放到neededReconstruction队列里。可见上一篇文章中的timedOutItems中的块,会被RedundancyMonitor线程处理。方法的逻辑也是比较简单易懂:遍历timeedOutItems,从blocksMap里找到对应的数据块信息,然后判断是否需要重建,如果需要就把重建相关信息添加到neededReconstruction里。
2. rescanPostponedMisreplicatedBlocks()
下面看第二个方法。方法名是rescanPostponedMisreplicatedBlocks,意为重新扫描之前推迟处理的副本数不正确的块。此方法里大量使用了postponedMisreplicatedBlocks这个队列,这个队列的作用是这样的:当Namenode 进行failover的时候,多余的块(over-replicated)是不能被处理的,直到这个数据块的所有副本所在的Datanode都向新的Active节点进行了块汇报。这么做的原因是防止发生数据块丢失。引自《Hadoop2.x HDFS源码深度剖析》的图:
![](https://img.haomeiwen.com/i5679451/623e6c0f741c90f3.png)
代码如下:
/**
* Rescan the list of blocks which were previously postponed.
*/
void rescanPostponedMisreplicatedBlocks() {
if (getPostponedMisreplicatedBlocksCount() == 0) {
return;
}
namesystem.writeLock();
long startTime = Time.monotonicNow();
long startSize = postponedMisreplicatedBlocks.size();
try {
Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
for (int i=0; i < blocksPerPostpondedRescan && it.hasNext(); i++) {
Block b = it.next();
it.remove();
BlockInfo bi = getStoredBlock(b);
if (bi == null) {
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
"Postponed mis-replicated block {} no longer found " +
"in block map.", b);
continue;
}
//核心方法,处理MisReplicatedBlock.
MisReplicationResult res = processMisReplicatedBlock(bi);
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
"Re-scanned block {}, result is {}", b, res);
//如果处理的返回值是postpone,也就是现在还是处理不了,
//就添加到rescannedMisreplicatedBlocks队列里
if (res == MisReplicationResult.POSTPONE) {
rescannedMisreplicatedBlocks.add(b);
}
}
} finally {
//把rescannedMisreplicatedBlocks中的未处理的块重新添加回postponedMisreplicatedBlocks
//等待下一个周期继续再次处理
postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks);
rescannedMisreplicatedBlocks.clear();
long endSize = postponedMisreplicatedBlocks.size();
namesystem.writeUnlock();
LOG.info("Rescan of postponedMisreplicatedBlocks completed in {}" +
" msecs. {} blocks are left. {} blocks were removed.",
(Time.monotonicNow() - startTime), endSize, (startSize - endSize));
}
}
可以看到该方法中调用的核心方法是processMisReplicatedBlock,来处理postponedMisreplicatedBlocks中得到的数据块。所以我们进到processMisReplicatedBlock方法:
/**
* Process a single possibly misreplicated block. This adds it to the
* appropriate queues if necessary, and returns a result code indicating
* what happened with it.
*/
//doc里写了,处理misreplicated block,把它加到恰当的队列里,
//同时返回一个结果码表明它发生了什么操作
private MisReplicationResult processMisReplicatedBlock(BlockInfo block) {
//如果块被删除了,那就添加到invalidateBlocks队列,等待删除
if (block.isDeleted()) {
// block does not belong to any file
addToInvalidates(block);
return MisReplicationResult.INVALID;
}
//如果块还没构建完,则不用处理,直接返回状态
if (!block.isComplete()) {
// Incomplete blocks are never considered mis-replicated --
// they'll be reached when they are completed or recovered.
return MisReplicationResult.UNDER_CONSTRUCTION;
}
// calculate current redundancy
//计算期望的冗余数和当前存活的副本数,判断是否需要重建,
//如果需要那么就放到neededReconstruction队列里。
//并且返回UNDER_REPLICATED状态
short expectedRedundancy = getExpectedRedundancyNum(block);
NumberReplicas num = countNodes(block);
final int numCurrentReplica = num.liveReplicas();
// add to low redundancy queue if need to be
if (isNeededReconstruction(block, num)) {
if (neededReconstruction.add(block, numCurrentReplica,
num.readOnlyReplicas(), num.outOfServiceReplicas(),
expectedRedundancy)) {
return MisReplicationResult.UNDER_REPLICATED;
}
}
//判断是否副本数超过了配置的期望的副本数,如果是就进行处理。
if (shouldProcessExtraRedundancy(num, expectedRedundancy)) {
//如果有一个副本是stale的,那就不能操作,防止数据块丢失
//返回POSTPONE后又会被上一层函数加入到这个队列,等待下一次处理。
if (num.replicasOnStaleNodes() > 0) {
// If any of the replicas of this block are on nodes that are
// considered "stale", then these replicas may in fact have
// already been deleted. So, we cannot safely act on the
// over-replication until a later point in time, when
// the "stale" nodes have block reported.
return MisReplicationResult.POSTPONE;
}
// extra redundancy block
//处理副本数超过配置的数据块
processExtraRedundancyBlock(block, expectedRedundancy, null, null);
return MisReplicationResult.OVER_REPLICATED;
}
return MisReplicationResult.OK;
}
整体的逻辑不复杂,下面主要看处理副本数超过配置的数据块的方法:processExtraRedundancyBlock
代码如下:
/**
* Find how many of the containing nodes are "extra", if any.
* If there are any extras, call chooseExcessRedundancies() to
* mark them in the excessRedundancyMap.
*/
//查看多少个node是"extra"的,如果有,调用chooseExcessRedundancies
//把他们放进excessRedundancyMap。
private void processExtraRedundancyBlock(final BlockInfo block,
final short replication, final DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint) {
assert namesystem.hasWriteLock();
if (addedNode == delNodeHint) {
delNodeHint = null;
}
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>();
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas
.getNodes(block);
for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
if (storage.getState() != State.NORMAL) {
continue;
}
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (storage.areBlockContentsStale()) {
LOG.trace("BLOCK* processExtraRedundancyBlock: Postponing {}"
+ " since storage {} does not yet have up-to-date information.",
block, storage);
postponeBlock(block);
return;
}
if (!isExcess(cur, block)) {
//如果副本数没超,就进到这个if里
if (cur.isInService()) {
// exclude corrupt replicas
if (corruptNodes == null || !corruptNodes.contains(cur)) {
nonExcess.add(storage);
}
}
}
}
//核心方法
chooseExcessRedundancies(nonExcess, block, replication, addedNode,
delNodeHint);
}
这个方法会循环判断传入的block的DatanodeStorageInfo的状态,如果不是Normal状态就判断下一个storage。接着判断是不是stale状态,如果是stale状态,添加到postpone队列。如果没有过多副本,那就放到nonExcess队列中。接着调用chooseExcessRedundancies方法。
走到chooseExcessRedundancies方法里,注意传入的最后两个参数都是null:
![](https://img.haomeiwen.com/i5679451/3fdd9c145a53eff4.png)
note:BlockStoragePolicy这个类是一种用来描述一个数据块的副本如何选择存储策略的类。StorageType类用来表示Datanode的存储类型,比如SSD,DISK,RAM_DISK等。
chooseExcessRedundancies方法的目的就是选出要删除的多余的存储类型。
if和else分别处理EC块和普通连续块的。我们这里关注普通块。也就是else里的逻辑。
根据复杂的块副本存放策略,找到需要删除的storage。也就是excessTypes变量。最后调用chooseExcessRedundancyContiguous。
看下chooseExcessRedundancyContiguous方法,其实删除副本的策略和他这个注释我没看懂,功力目前太差。留个坑,以后看。
/**
* We want sufficient redundancy for the block, but we now have too many.
* In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
*
* srcNodes.size() - dstNodes.size() == replication
*
* We pick node that make sure that replicas are spread across racks and
* also try hard to pick one with least free space.
* The algorithm is first to pick a node with least free space from nodes
* that are on a rack holding more than one replicas of the block.
* So removing such a replica won't remove a rack.
* If no such a node is available,
* then pick a node with least free space
*/
private void chooseExcessRedundancyContiguous(
final Collection<DatanodeStorageInfo> nonExcess, BlockInfo storedBlock,
short replication, DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint, List<StorageType> excessTypes) {
BlockPlacementPolicy replicator = placementPolicies.getPolicy(CONTIGUOUS);
//根本块放置策略找到将要删除的副本
List<DatanodeStorageInfo> replicasToDelete = replicator
.chooseReplicasToDelete(nonExcess, nonExcess, replication, excessTypes,
addedNode, delNodeHint);
for (DatanodeStorageInfo chosenReplica : replicasToDelete) {
//处理选择的多余的冗余副本(实质上就是把副本加到对应的队列中)
processChosenExcessRedundancy(nonExcess, chosenReplica, storedBlock);
}
}
接着看processChosenExcessRedundancy方法:
private void processChosenExcessRedundancy(
final Collection<DatanodeStorageInfo> nonExcess,
final DatanodeStorageInfo chosen, BlockInfo storedBlock) {
nonExcess.remove(chosen);
//添加到多余副本的Map中
excessRedundancyMap.add(chosen.getDatanodeDescriptor(), storedBlock);
//
// The 'excessblocks' tracks blocks until we get confirmation
// that the datanode has deleted them; the only way we remove them
// is when we get a "removeBlock" message.
//
// The 'invalidate' list is used to inform the datanode the block
// should be deleted. Items are removed from the invalidate list
// upon giving instructions to the datanodes.
//
final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen);
//添加到invalidateBlocks中,等待发给datanode删除块。
addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor());
blockLog.debug("BLOCK* chooseExcessRedundancies: "
+ "({}, {}) is added to invalidated blocks set", chosen, storedBlock);
}
至此redundancyThread的内容学习完了。
挖了个坑儿:删除超量副本时的选择策略。在网上大致搜了一下原理:
https://zhuanlan.zhihu.com/p/114632147
当块副本过量时,NameNode会选择删除一份副本。NameNode更倾向不减少托管副本的机架数量,接着倾向从有最小磁盘可用空间的DataNode上删除副本。目标是在不减少块可用性的情况下平衡DataNode的存储利用率。
NameNode也确保,一个块的所有副本不会在同一个机架上。如果检测到这个情况,NameNode以副本数不足对待,并使用相同的策略复制块,存放到不同的机架。在NameNode收到副本已创建的通知后,这个块的副本就过量了。然后NameNode就决定删除一个旧备份,因为策略倾向不减少机架数量。
END!
毁灭吧,赶紧的,累了。
网友评论