美文网首页
【HDFS】--RPC--delete

【HDFS】--RPC--delete

作者: 小北觅 | 来源:发表于2021-01-12 10:58 被阅读0次

    分享一波大数据&Java的学习视频和书籍
    ### Java与大数据资源分享​links.jianshu.com

    今儿个简单过一下HDFS的delete rpc的逻辑,一起来看看delete操作的实现逻辑。
    并未进行整个流程总结,留待后续理解更深时总结。

    看一下NameNodeRpcServer类中delete的实现。org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer#delete

    @Override // ClientProtocol
      public boolean delete(String src, boolean recursive) throws IOException {
        checkNNStartup(); //检查NN启动状态
        if (stateChangeLog.isDebugEnabled()) {
          stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
              + ", recursive=" + recursive);
        }
        namesystem.checkOperation(OperationCategory.WRITE); //Standby节点不能进行写操作
        //RetryCache用来缓存上次处理过的重试请求(通过client id + Call id唯一标识)
        CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
        if (cacheEntry != null && cacheEntry.isSuccess()) {
          return true; // Return previous response
        }
    
        boolean ret = false;
        try {
          //调用FSNamesystem#delete方法
          ret = namesystem.delete(src, recursive, cacheEntry != null);
        } finally {
          //保存到RetryCache中
          RetryCache.setState(cacheEntry, ret);
        }
        //增加相关metric。
        if (ret) 
          metrics.incrDeleteFileOps();
        return ret;
      }
    

    继续往下看FSNamesystem#delete方法:

      /**
       * Remove the indicated file from namespace.
       * 
       * @see ClientProtocol#delete(String, boolean) for detailed description and 
       * description of exceptions
       */
      boolean delete(String src, boolean recursive, boolean logRetryCache)
          throws IOException {
        final String operationName = "delete";
        BlocksMapUpdateInfo toRemovedBlocks = null;
        checkOperation(OperationCategory.WRITE);
        final FSPermissionChecker pc = getPermissionChecker();
        writeLock();
        boolean ret = false;
        try {
          checkOperation(OperationCategory.WRITE);
          checkNameNodeSafeMode("Cannot delete " + src);
          //获得将要移除的Blocks
          toRemovedBlocks = FSDirDeleteOp.delete(
              this, pc, src, recursive, logRetryCache);
          ret = toRemovedBlocks != null;
        } catch (AccessControlException e) {
          logAuditEvent(false, operationName, src);
          throw e;
        } finally {
          writeUnlock(operationName);
        }
        getEditLog().logSync();
        if (toRemovedBlocks != null) {
          // 真正删除block的地方
          removeBlocks(toRemovedBlocks); // Incremental deletion of blocks
        }
        logAuditEvent(true, operationName, src); //记录审计日志
        return ret;
      }
    

    我们继续看removeBlocks方法:

      /**
       * From the given list, incrementally remove the blocks from blockManager
       * Writelock is dropped and reacquired every BLOCK_DELETION_INCREMENT to
       * ensure that other waiters on the lock can get in. See HDFS-2938
       *
       * 给定要删除的block的list,从blockManager中增量移除blocks。
       * 写锁会被释放,然后每删除BLOCK_DELETION_INCREMENT个block时再重新获取一次写锁,这样确保其他等待写锁的线程能够得到执行。
       *
       * @param blocks
       *          An instance of {@link BlocksMapUpdateInfo} which contains a list
       *          of blocks that need to be removed from blocksMap
       */
      void removeBlocks(BlocksMapUpdateInfo blocks) {
        // 得到要删除的Block List; TODO
        List<BlockInfo> toDeleteList = blocks.getToDeleteList();
        Iterator<BlockInfo> iter = toDeleteList.iterator();
        // 循环List,Remove Block
        while (iter.hasNext()) {
          writeLock();
          try {
            for (int i = 0; i < blockDeletionIncrement && iter.hasNext(); i++) {
              blockManager.removeBlock(iter.next());
            }
          } finally {
            writeUnlock("removeBlocks");
          }
        }
      }
    

    接着我们就进到了BlockManager#removeBlock方法:

    public void removeBlock(BlockInfo block) {
        assert namesystem.hasWriteLock();
        // No need to ACK blocks that are being removed entirely
        // from the namespace, since the removal of the associated
        // file already removes them from the block map below.
        block.setNumBytes(BlockCommand.NO_ACK);
        addToInvalidates(block);
        removeBlockFromMap(block);
        // Remove the block from pendingReconstruction and neededReconstruction
        // pendingReconstruction是已经生成复制指令,待发送给DN的block队列
        // neededReconstruction是准备生成复制指令的block队列。
        PendingBlockInfo remove = pendingReconstruction.remove(block);
        if (remove != null) {
          DatanodeStorageInfo.decrementBlocksScheduled(remove.getTargets()
              .toArray(new DatanodeStorageInfo[remove.getTargets().size()]));
        }
        neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL);
        // 从推迟处理的block队列里删除块信息
        postponedMisreplicatedBlocks.remove(block);
      }
    

    这个方法主要是将block信息加入到块删除队列(invalidates)。然后从块关系map中移除此块的信息,并从pending和needed队列中移除关于此块的信息,避免无用复制。

    下面看一下BlockManager#addToInvalidates方法:

     /**
       * Adds block to list of blocks which will be invalidated on all its
       * datanodes.
       */
      private void addToInvalidates(BlockInfo storedBlock) {
        if (!isPopulatingReplQueues()) {
          return;
        }
        StringBuilder datanodes = blockLog.isDebugEnabled()
            ? new StringBuilder() : null;
        for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
          if (storage.getState() != State.NORMAL) {
            continue;
          }
          final DatanodeDescriptor node = storage.getDatanodeDescriptor();
          final Block b = getBlockOnStorage(storedBlock, storage);
          if (b != null) {
            // 在删除队列中添加 (块信息,datanode信息)
            invalidateBlocks.add(b, node, false);
            if (datanodes != null) {
              datanodes.append(node).append(" ");
            }
          }
        }
        if (datanodes != null && datanodes.length() != 0) {
          blockLog.debug("BLOCK* addToInvalidates: {} {}", storedBlock, datanodes);
        }
      }
    

    接着走进org.apache.hadoop.hdfs.server.blockmanagement.InvalidateBlocks#add方法:
    这是一个synchronized方法。

    接着我们看putBlocksSet方法:

    就很简单了,就是判断一下是EC块还是普通块,然后给put到set中,底层调用的是Map接口的put方法(因为hashset的底层实现就是map)

    添加到InvalidateBlocks队列后,就等着定时线程来读取然后下发删除指令给Datanode了。

    相关文章

      网友评论

          本文标题:【HDFS】--RPC--delete

          本文链接:https://www.haomeiwen.com/subject/sqhkaktx.html