1. Dispatcher.init()方法
init方法首先会请求DataNode节点报告
//向NameNode请求DataNode报告,会调用ClientProtocol相应的方法
final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport();
final List<DatanodeStorageReport> trimmed = new ArrayList<DatanodeStorageReport>();
// create network topology and classify utilization collections:
// over-utilized, above-average, below-average and under-utilized.
for (DatanodeStorageReport r : DFSUtil.shuffle(reports)) {
final DatanodeInfo datanode = r.getDatanodeInfo();
if (shouldIgnore(datanode)) {
continue;
}
trimmed.add(r);
//cluster是NetworkTopology类的对象
cluster.add(datanode);
}
return trimmed;
2. Dispatcher.shouldIgnore()方法
Decommission的意思是DataNode从HDFS集群中移除掉,includeNodes和excludedNodes是使用balancer命令传入的两个参数,可以指定在哪些DataNode中执行balance,也可以在balance排除哪些DataNode。参数如下所示:
[-exclude [-f <hosts-file> | <comma-separated list of hosts>]] Excludes the specified datanodes.
[-include [-f <hosts-file> | <comma-separated list of hosts>]] Includes only the specified datanodes.
private boolean shouldIgnore(DatanodeInfo dn) {
// ignore decommissioned nodes
final boolean decommissioned = dn.isDecommissioned();
// ignore decommissioning nodes
final boolean decommissioning = dn.isDecommissionInProgress();
// ignore nodes in exclude list
final boolean excluded = Util.isExcluded(excludedNodes, dn);
// ignore nodes not in the include list (if include list is not empty)
final boolean notIncluded = !Util.isIncluded(includedNodes, dn);
if (decommissioned || decommissioning || excluded || notIncluded) {
if (LOG.isTraceEnabled()) {
LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", "
+ decommissioning + ", " + excluded + ", " + notIncluded);
}
return true;
}
return false;
}
3. Balancer.init()方法
这个方法https://www.jianshu.com/p/f7c1cd476601中写的比较详细,可以参考一下。
其中policy属于BalancingPolicy实例,即Balancer平衡的策略,同样在使用balancer命令时可以指定该参数。平衡策略有两种:DataNode级别和BlockPool级别,BlockPool策略仅适用于HDFS Federation。该参数使用方法如下:
[-policy <policy>] the balancing policy: datanode or blockpool
接下里会根据获取的DataNode信息,计算出网络拓扑和集群平均存储使用率。
/**
* Given a datanode storage set, build a network topology and decide
* over-utilized storages, above average utilized storages,
* below average utilized storages, and underutilized storages.
* The input datanode storage set is shuffled in order to randomize
* to the storage matching later on.
*
* @return the number of bytes needed to move in order to balance the cluster.
*/
private long init(List<DatanodeStorageReport> reports) {
// compute average utilization
for (DatanodeStorageReport r : reports) {
policy.accumulateSpaces(r);
}
policy.initAvgUtilization();
// create network topology and classify utilization collections:
// over-utilized, above-average, below-average and under-utilized.
long overLoadedBytes = 0L, underLoadedBytes = 0L;
for(DatanodeStorageReport r : reports) {
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
for(StorageType t : StorageType.getMovableTypes()) {
final Double utilization = policy.getUtilization(r, t);
if (utilization == null) { // datanode does not have such storage type
continue;
}
final long capacity = getCapacity(r, t);
//DataNode使用率与集群平均使用率差值
final double utilizationDiff = utilization - policy.getAvgUtilization(t);
//DataNode utilizationDiff与阈值的差值
final double thresholdDiff = Math.abs(utilizationDiff) - threshold;
final long maxSize2Move = computeMaxSize2Move(capacity,
getRemaining(r, t), utilizationDiff, threshold);
final StorageGroup g;
if (utilizationDiff > 0) {
final Source s = dn.addSource(t, maxSize2Move, dispatcher);
if (thresholdDiff <= 0) { // within threshold
aboveAvgUtilized.add(s);
} else {
overLoadedBytes += precentage2bytes(thresholdDiff, capacity);
overUtilized.add(s);
}
g = s;
} else {
g = dn.addTarget(t, maxSize2Move);
if (thresholdDiff <= 0) { // within threshold
belowAvgUtilized.add(g);
} else {
underLoadedBytes += precentage2bytes(thresholdDiff, capacity);
underUtilized.add(g);
}
}
dispatcher.getStorageGroupMap().put(g);
}
}
logUtilizationCollections();
Preconditions.checkState(dispatcher.getStorageGroupMap().size()
== overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size()
+ belowAvgUtilized.size(),
"Mismatched number of storage groups");
// return number of bytes to be moved in order to make the cluster balanced
return Math.max(overLoadedBytes, underLoadedBytes);
}
4. BalancingPolicy.Node
void accumulateSpaces(DatanodeStorageReport r) {
for(StorageReport s : r.getStorageReports()) {
final StorageType t = s.getStorage().getStorageType();
//DataNode总空间
totalCapacities.add(t, s.getCapacity());
//DataNode已使用空间
totalUsedSpaces.add(t, s.getDfsUsed());
}
}
void initAvgUtilization() {
for(StorageType t : StorageType.asList()) {
final long capacity = totalCapacities.get(t);
if (capacity > 0L) {
//计算平均存储使用率
final double avg = totalUsedSpaces.get(t)*100.0/capacity;
avgUtilizations.set(t, avg);
}
}
}
网友评论