美文网首页
Dispatcher类

Dispatcher类

作者: 此间少年仍犹在 | 来源:发表于2018-08-22 16:59 被阅读0次

Balancer.runOneIteration()--》Dispatcher.dispatchAndCheckContinue()
Dispatcher.dispatchAndCheckContinue()--》dispatchBlockMoves()

1. dispatchBlockMoves()

 对每个source进行block移动的处理,相应的线程会选择要移动的block,向proxy source发送请求来进行block移动的初始化操作。这个过程是流式控制的。如果有太多un-confirmed block要移动,block选择的操作会被锁住。

final long bytesLastMoved = getBytesMoved();
    //java.util.concurrent.Future可以获取任务的执行结果
    final Future<?>[] futures = new Future<?>[sources.size()];

    //sources是HashSet<Source>集合类对象
    final Iterator<Source> i = sources.iterator();
    for (int j = 0; j < futures.length; j++) {
      final Source s = i.next();
      //dispatchExecutor是一个ExecutorService对象,ExecutorService.submit()会返回一个对象
      futures[j] = dispatchExecutor.submit(new Runnable() {
        @Override
        public void run() {
          s.dispatchBlocks();
        }
      });
    }
2. dispatchBlocks()

 这个方法会迭代地进行以下步骤:首先选取要移动的block,然后向proxy source发送请求

      //Time.monotonicNow()会调用System.nanoTime(),以毫微秒为单位
      final long startTime = Time.monotonicNow();
      this.blocksToReceive = 2 * getScheduledSize();
      boolean isTimeUp = false;
      int noPendingMoveIteration = 0;
      while (!isTimeUp && getScheduledSize() > 0
          && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
        final PendingMove p = chooseNextMove();
        if (p != null) {
          // Reset no pending move counter
          noPendingMoveIteration=0;
          executePendingMove(p);
          continue;
        }
3. chooseNextMove()
      for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
        final Task task = i.next();
        final DDatanode target = task.target.getDDatanode();
        final PendingMove pendingBlock = new PendingMove(this, task.target);
        if (target.addPendingBlock(pendingBlock)) {
          // target is not busy, so do a tentative block allocation
          if (pendingBlock.chooseBlockAndProxy()) {
            long blockSize = pendingBlock.block.getNumBytes();
            incScheduledSize(-blockSize);
            task.size -= blockSize;
            if (task.size == 0) {
              i.remove();
            }
            return pendingBlock;
          } else {
            // cancel the tentative move
            target.removePendingBlock(pendingBlock);
          }
        }
      }
      return null;

相关文章

网友评论

      本文标题:Dispatcher类

      本文链接:https://www.haomeiwen.com/subject/znzdpftx.html