美文网首页
Dispatcher类

Dispatcher类

作者: 此间少年仍犹在 | 来源:发表于2018-08-22 16:59 被阅读0次

    Balancer.runOneIteration()--》Dispatcher.dispatchAndCheckContinue()
    Dispatcher.dispatchAndCheckContinue()--》dispatchBlockMoves()

    1. dispatchBlockMoves()

     对每个source进行block移动的处理,相应的线程会选择要移动的block,向proxy source发送请求来进行block移动的初始化操作。这个过程是流式控制的。如果有太多un-confirmed block要移动,block选择的操作会被锁住。

    final long bytesLastMoved = getBytesMoved();
        //java.util.concurrent.Future可以获取任务的执行结果
        final Future<?>[] futures = new Future<?>[sources.size()];
    
        //sources是HashSet<Source>集合类对象
        final Iterator<Source> i = sources.iterator();
        for (int j = 0; j < futures.length; j++) {
          final Source s = i.next();
          //dispatchExecutor是一个ExecutorService对象,ExecutorService.submit()会返回一个对象
          futures[j] = dispatchExecutor.submit(new Runnable() {
            @Override
            public void run() {
              s.dispatchBlocks();
            }
          });
        }
    
    2. dispatchBlocks()

     这个方法会迭代地进行以下步骤:首先选取要移动的block,然后向proxy source发送请求

          //Time.monotonicNow()会调用System.nanoTime(),以毫微秒为单位
          final long startTime = Time.monotonicNow();
          this.blocksToReceive = 2 * getScheduledSize();
          boolean isTimeUp = false;
          int noPendingMoveIteration = 0;
          while (!isTimeUp && getScheduledSize() > 0
              && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
            final PendingMove p = chooseNextMove();
            if (p != null) {
              // Reset no pending move counter
              noPendingMoveIteration=0;
              executePendingMove(p);
              continue;
            }
    
    3. chooseNextMove()
          for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
            final Task task = i.next();
            final DDatanode target = task.target.getDDatanode();
            final PendingMove pendingBlock = new PendingMove(this, task.target);
            if (target.addPendingBlock(pendingBlock)) {
              // target is not busy, so do a tentative block allocation
              if (pendingBlock.chooseBlockAndProxy()) {
                long blockSize = pendingBlock.block.getNumBytes();
                incScheduledSize(-blockSize);
                task.size -= blockSize;
                if (task.size == 0) {
                  i.remove();
                }
                return pendingBlock;
              } else {
                // cancel the tentative move
                target.removePendingBlock(pendingBlock);
              }
            }
          }
          return null;
    

    相关文章

      网友评论

          本文标题:Dispatcher类

          本文链接:https://www.haomeiwen.com/subject/znzdpftx.html