Balancer.runOneIteration()--》Dispatcher.dispatchAndCheckContinue()
Dispatcher.dispatchAndCheckContinue()--》dispatchBlockMoves()
1. dispatchBlockMoves()
对每个source进行block移动的处理,相应的线程会选择要移动的block,向proxy source发送请求来进行block移动的初始化操作。这个过程是流式控制的。如果有太多un-confirmed block要移动,block选择的操作会被锁住。
final long bytesLastMoved = getBytesMoved();
//java.util.concurrent.Future可以获取任务的执行结果
final Future<?>[] futures = new Future<?>[sources.size()];
//sources是HashSet<Source>集合类对象
final Iterator<Source> i = sources.iterator();
for (int j = 0; j < futures.length; j++) {
final Source s = i.next();
//dispatchExecutor是一个ExecutorService对象,ExecutorService.submit()会返回一个对象
futures[j] = dispatchExecutor.submit(new Runnable() {
@Override
public void run() {
s.dispatchBlocks();
}
});
}
2. dispatchBlocks()
这个方法会迭代地进行以下步骤:首先选取要移动的block,然后向proxy source发送请求
//Time.monotonicNow()会调用System.nanoTime(),以毫微秒为单位
final long startTime = Time.monotonicNow();
this.blocksToReceive = 2 * getScheduledSize();
boolean isTimeUp = false;
int noPendingMoveIteration = 0;
while (!isTimeUp && getScheduledSize() > 0
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
final PendingMove p = chooseNextMove();
if (p != null) {
// Reset no pending move counter
noPendingMoveIteration=0;
executePendingMove(p);
continue;
}
3. chooseNextMove()
for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
final Task task = i.next();
final DDatanode target = task.target.getDDatanode();
final PendingMove pendingBlock = new PendingMove(this, task.target);
if (target.addPendingBlock(pendingBlock)) {
// target is not busy, so do a tentative block allocation
if (pendingBlock.chooseBlockAndProxy()) {
long blockSize = pendingBlock.block.getNumBytes();
incScheduledSize(-blockSize);
task.size -= blockSize;
if (task.size == 0) {
i.remove();
}
return pendingBlock;
} else {
// cancel the tentative move
target.removePendingBlock(pendingBlock);
}
}
}
return null;
网友评论