美文网首页
hdfs写之namenode端addBlock实现<五>

hdfs写之namenode端addBlock实现<五>

作者: 古语1 | 来源:发表于2019-10-10 11:44 被阅读0次

    一、流程图

    二、NameNodeRpcServer的addBlock实现

    1、客户端会通过addBlock方法通过rpc调用namenode方法

    addBlock 调用getAdditionalBlock

    
      @Override
      public LocatedBlock addBlock(String src, String clientName,
          ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
          String[] favoredNodes)
          throws IOException {
        checkNNStartup();
        if (stateChangeLog.isDebugEnabled()) {
          stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
              + " fileId=" + fileId + " for " + clientName);
        }
        Set<Node> excludedNodesSet = null;
        if (excludedNodes != null) {
          excludedNodesSet = new HashSet<Node>(excludedNodes.length);
          for (Node node : excludedNodes) {
            excludedNodesSet.add(node);
          }
        }
        List<String> favoredNodesList = (favoredNodes == null) ? null
            : Arrays.asList(favoredNodes);
        LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
            clientName, previous, excludedNodesSet, favoredNodesList);
        if (locatedBlock != null)
          metrics.incrAddBlockOps();
        return locatedBlock;
      }
    

    该方法获取要写的datanode并通过storeAllocatedBlock存储信息,。

     /**
       * The client would like to obtain an additional block for the indicated
       * filename (which is being written-to).  Return an array that consists
       * of the block, plus a set of machines.  The first on this list should
       * be where the client writes data.  Subsequent items in the list must
       * be provided in the connection to the first datanode.
       *
       * Make sure the previous blocks have been reported by datanodes and
       * are replicated.  Will return an empty 2-elt array if we want the
       * client to "try again later".
       */
      LocatedBlock getAdditionalBlock(String src, long fileId, String clientName,
          ExtendedBlock previous, Set<Node> excludedNodes, 
          List<String> favoredNodes) throws IOException {
        LocatedBlock[] onRetryBlock = new LocatedBlock[1];
        //获取要写的datanode
        DatanodeStorageInfo targets[] = getNewBlockTargets(src, fileId,
            clientName, previous, excludedNodes, favoredNodes, onRetryBlock);
        if (targets == null) {
          assert onRetryBlock[0] != null : "Retry block is null";
          // This is a retry. Just return the last block.
          return onRetryBlock[0];
        }
        LocatedBlock newBlock = storeAllocatedBlock(
            src, fileId, clientName, previous, targets);
        return newBlock;
      }
    

    通过getNewBlockTargets方法获取要写入的datanode

    /**
       * Part I of getAdditionalBlock().
       * Analyze the state of the file under read lock to determine if the client
       * can add a new block, detect potential retries, lease mismatches,
       * and minimal replication of the penultimate block.
       * 
       * Generate target DataNode locations for the new block,
       * but do not create the new block yet.
       */
      DatanodeStorageInfo[] getNewBlockTargets(String src, long fileId,
          String clientName, ExtendedBlock previous, Set<Node> excludedNodes,
          List<String> favoredNodes, LocatedBlock[] onRetryBlock) throws IOException {
        final long blockSize;
        final int replication;
        final byte storagePolicyID;
        Node clientNode = null;
        String clientMachine = null;
    
        NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {}  inodeId {}" +
            " for {}", src, fileId, clientName);
    
        checkOperation(OperationCategory.READ);
        byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
        FSPermissionChecker pc = getPermissionChecker();
        readLock();
        try {
          checkOperation(OperationCategory.READ);
          src = dir.resolvePath(pc, src, pathComponents);
          FileState fileState = analyzeFileState(
              src, fileId, clientName, previous, onRetryBlock);
          final INodeFile pendingFile = fileState.inode;
          // Check if the penultimate block is minimally replicated
          //
          if (!checkFileProgress(src, pendingFile, false)) {
            throw new NotReplicatedYetException("Not replicated yet: " + src);
          }
          src = fileState.path;
    
          if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
            // This is a retry. No need to generate new locations.
            // Use the last block if it has locations.
            return null;
          }
          if (pendingFile.getBlocks().length >= maxBlocksPerFile) {
            throw new IOException("File has reached the limit on maximum number of"
                + " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
                + "): " + pendingFile.getBlocks().length + " >= "
                + maxBlocksPerFile);
          }
          blockSize = pendingFile.getPreferredBlockSize();
          clientMachine = pendingFile.getFileUnderConstructionFeature()
              .getClientMachine();
          clientNode = blockManager.getDatanodeManager().getDatanodeByHost(
              clientMachine);
          replication = pendingFile.getFileReplication();
          storagePolicyID = pendingFile.getStoragePolicyID();
        } finally {
          readUnlock();
        }
    
        if (clientNode == null) {
          clientNode = getClientNode(clientMachine);
        }
    
        // choose targets for the new block to be allocated.
        return getBlockManager().chooseTarget4NewBlock( 
            src, replication, clientNode, excludedNodes, blockSize, favoredNodes,
            storagePolicyID);
      }
    
    

    storeAllocatedBlock方法中会调用commitOrCompleteLastBlock方法提交上一个block,并调用persistNewBlock持久化editlog,analyzeFileState方法分析block请求状态并且将分配好的block保存到onTetryBlock中。

    
      /**
       * Part II of getAdditionalBlock().
       * Should repeat the same analysis of the file state as in Part 1,
       * but under the write lock.
       * If the conditions still hold, then allocate a new block with
       * the new targets, add it to the INode and to the BlocksMap.
       */
      LocatedBlock storeAllocatedBlock(String src, long fileId, String clientName,
          ExtendedBlock previous, DatanodeStorageInfo[] targets) throws IOException {
        Block newBlock = null;
        long offset;
        checkOperation(OperationCategory.WRITE);
        waitForLoadingFSImage();
        writeLock();
        try {
          checkOperation(OperationCategory.WRITE);
          // Run the full analysis again, since things could have changed
          // while chooseTarget() was executing.
          LocatedBlock[] onRetryBlock = new LocatedBlock[1];
          //分析当前Block分配请求类型
          FileState fileState = 
              analyzeFileState(src, fileId, clientName, previous, onRetryBlock);
          final INodeFile pendingFile = fileState.inode;
          src = fileState.path;
    
          if (onRetryBlock[0] != null) {
            if (onRetryBlock[0].getLocations().length > 0) {
              // This is a retry. Just return the last block if having locations.
              return onRetryBlock[0];
            } else {
              // add new chosen targets to already allocated block and return
              BlockInfoContiguous lastBlockInFile = pendingFile.getLastBlock();
              ((BlockInfoContiguousUnderConstruction) lastBlockInFile)
                  .setExpectedLocations(targets);
              offset = pendingFile.computeFileSize();
              return makeLocatedBlock(lastBlockInFile, targets, offset);
            }
          }
    
          // commit the last block and complete it if it has minimum replicas
          commitOrCompleteLastBlock(pendingFile, fileState.iip,
                                    ExtendedBlock.getLocalBlock(previous));
    
          // allocate new block, record block locations in INode.
          newBlock = createNewBlock();
          INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
          saveAllocatedBlock(src, inodesInPath, newBlock, targets);
          //block信息持久到EditLog中
          persistNewBlock(src, pendingFile);
          offset = pendingFile.computeFileSize();
        } finally {
          writeUnlock();
        }
        getEditLog().logSync();
    
        // Return located block
        return makeLocatedBlock(newBlock, targets, offset);
      }
    
    

    三、BlockManager的chooseTarget4NewBlock实现

      /**
       * Choose target datanodes for creating a new block.
       * 
       * @throws IOException
       *           if the number of targets < minimum replication.
       * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
       *      Set, long, List, BlockStoragePolicy)
       */
      public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
          final int numOfReplicas, final Node client,
          final Set<Node> excludedNodes,
          final long blocksize,
          final List<String> favoredNodes,
          final byte storagePolicyID) throws IOException {
        List<DatanodeDescriptor> favoredDatanodeDescriptors = 
            getDatanodeDescriptors(favoredNodes);
        final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID);
    //选择出要写的datanode节点
        final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
            numOfReplicas, client, excludedNodes, blocksize, 
            favoredDatanodeDescriptors, storagePolicy);
        if (targets.length < minReplication) {
          throw new IOException("File " + src + " could only be replicated to "
              + targets.length + " nodes instead of minReplication (="
              + minReplication + ").  There are "
              + getDatanodeManager().getNetworkTopology().getNumOfLeaves()
              + " datanode(s) running and "
              + (excludedNodes == null? "no": excludedNodes.size())
              + " node(s) are excluded in this operation.");
        }
        return targets;
      }
    

    四、BlockPlacementPolicyDefault.chooseTarget实现

    分配datanode策略

    
      @Override
      DatanodeStorageInfo[] chooseTarget(String src,
          int numOfReplicas,
          Node writer,
          Set<Node> excludedNodes,
          long blocksize,
          List<DatanodeDescriptor> favoredNodes,
          BlockStoragePolicy storagePolicy) {
        try {
          if (favoredNodes == null || favoredNodes.size() == 0) {
            // Favored nodes not specified, fall back to regular block placement.
            return chooseTarget(src, numOfReplicas, writer,
                new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, 
                excludedNodes, blocksize, storagePolicy);
          }
    
          Set<Node> favoriteAndExcludedNodes = excludedNodes == null ?
              new HashSet<Node>() : new HashSet<Node>(excludedNodes);
          final List<StorageType> requiredStorageTypes = storagePolicy
              .chooseStorageTypes((short)numOfReplicas);
          final EnumMap<StorageType, Integer> storageTypes =
              getRequiredStorageTypes(requiredStorageTypes);
    
          // Choose favored nodes
          List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>();
          boolean avoidStaleNodes = stats != null
              && stats.isAvoidingStaleDataNodesForWrite();
    
          int maxNodesAndReplicas[] = getMaxNodesPerRack(0, numOfReplicas);
          numOfReplicas = maxNodesAndReplicas[0];
          int maxNodesPerRack = maxNodesAndReplicas[1];
    
          for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) {
            DatanodeDescriptor favoredNode = favoredNodes.get(i);
            // Choose a single node which is local to favoredNode.
            // 'results' is updated within chooseLocalNode
            final DatanodeStorageInfo target = chooseLocalStorage(favoredNode,
                favoriteAndExcludedNodes, blocksize, maxNodesPerRack,
                results, avoidStaleNodes, storageTypes, false);
            if (target == null) {
              LOG.warn("Could not find a target for file " + src
                  + " with favored node " + favoredNode); 
              continue;
            }
            favoriteAndExcludedNodes.add(target.getDatanodeDescriptor());
          }
    
          if (results.size() < numOfReplicas) {
            // Not enough favored nodes, choose other nodes.
            numOfReplicas -= results.size();
            DatanodeStorageInfo[] remainingTargets = 
                chooseTarget(src, numOfReplicas, writer, results,
                    false, favoriteAndExcludedNodes, blocksize, storagePolicy);
            for (int i = 0; i < remainingTargets.length; i++) {
              results.add(remainingTargets[i]);
            }
          }
          return getPipeline(writer,
              results.toArray(new DatanodeStorageInfo[results.size()]));
        } catch (NotEnoughReplicasException nr) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Failed to choose with favored nodes (=" + favoredNodes
                + "), disregard favored nodes hint and retry.", nr);
          }
          // Fall back to regular block placement disregarding favored nodes hint
          return chooseTarget(src, numOfReplicas, writer, 
              new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, 
              excludedNodes, blocksize, storagePolicy);
        }
      }
    

    具体实现选择策略:

    1. 如果还没有分配的datanode,则调用chooseLocalStorage分配本地节点
    2. 如果已经0或者1个已经分配则调用chooseRemoteRack远程选择一台datanode
    3. 如果前两个节点在一个机架上,则调用chooseRemoteRack在另一个机架上分配
    4. 如果是新的block,则调用chooseLocalRack本机架分配
    5. 如果副本数大于2,则调用chooseRandom随机选择

    选择的时候,都会调用isGoodTarget方法进行判断是否是个满足选择条件的datanode,根据存储空间,网络负载情况、是否下限、xceiver线程数是否满足。

    
      /**
       * choose <i>numOfReplicas</i> from all data nodes
       * @param numOfReplicas additional number of replicas wanted
       * @param writer the writer's machine, could be a non-DatanodeDescriptor node
       * @param excludedNodes datanodes that should not be considered as targets
       * @param blocksize size of the data to be written
       * @param maxNodesPerRack max nodes allowed per rack
       * @param results the target nodes already chosen
       * @param avoidStaleNodes avoid stale nodes in replica choosing
       * @return local node of writer (not chosen node)
       */
      private Node chooseTarget(int numOfReplicas,
                                Node writer,
                                final Set<Node> excludedNodes,
                                final long blocksize,
                                final int maxNodesPerRack,
                                final List<DatanodeStorageInfo> results,
                                final boolean avoidStaleNodes,
                                final BlockStoragePolicy storagePolicy,
                                final EnumSet<StorageType> unavailableStorages,
                                final boolean newBlock) {
        if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
          return (writer instanceof DatanodeDescriptor) ? writer : null;
        }
        final int numOfResults = results.size();
        final int totalReplicasExpected = numOfReplicas + numOfResults;
    
        if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
          writer = results.get(0).getDatanodeDescriptor();
        }
    
        // Keep a copy of original excludedNodes
        final Set<Node> oldExcludedNodes = new HashSet<Node>(excludedNodes);
    
        // choose storage types; use fallbacks for unavailable storages
        final List<StorageType> requiredStorageTypes = storagePolicy
            .chooseStorageTypes((short) totalReplicasExpected,
                DatanodeStorageInfo.toStorageTypes(results),
                unavailableStorages, newBlock);
        final EnumMap<StorageType, Integer> storageTypes =
            getRequiredStorageTypes(requiredStorageTypes);
        if (LOG.isTraceEnabled()) {
          LOG.trace("storageTypes=" + storageTypes);
        }
    
        try {
          if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
            throw new NotEnoughReplicasException(
                "All required storage types are unavailable: "
                + " unavailableStorages=" + unavailableStorages
                + ", storagePolicy=" + storagePolicy);
          }
    
          //如果还没有分配的datanode,则分配本地节点
          if (numOfResults == 0) {
            writer = chooseLocalStorage(writer, excludedNodes, blocksize,
                maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
                    .getDatanodeDescriptor();
            if (--numOfReplicas == 0) {
              return writer;
            }
          }
          final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
          if (numOfResults <= 1) {
            //远程选择一台datanode
            chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
                results, avoidStaleNodes, storageTypes);
            if (--numOfReplicas == 0) {
              return writer;
            }
          }
          if (numOfResults <= 2) {
            final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
            if (clusterMap.isOnSameRack(dn0, dn1)) {
              //如果之前两个节点在一个机架上,选择一个其他机架的
    
              chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
                  results, avoidStaleNodes, storageTypes);
            } else if (newBlock){
              //新block则本机架选择
              chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
                  results, avoidStaleNodes, storageTypes);
            } else {
              //都不满足,本机架选择
              chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
                  results, avoidStaleNodes, storageTypes);
            }
            if (--numOfReplicas == 0) {
              return writer;
            }
          }
          //如果副本数大于2,则随机选择
          chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
              maxNodesPerRack, results, avoidStaleNodes, storageTypes);
        } catch (NotEnoughReplicasException e) {
          final String message = "Failed to place enough replicas, still in need of "
              + (totalReplicasExpected - results.size()) + " to reach "
              + totalReplicasExpected
              + " (unavailableStorages=" + unavailableStorages
              + ", storagePolicy=" + storagePolicy
              + ", newBlock=" + newBlock + ")";
    
          if (LOG.isTraceEnabled()) {
            LOG.trace(message, e);
          } else {
            LOG.warn(message + " " + e.getMessage());
          }
    
          if (avoidStaleNodes) {
            // Retry chooseTarget again, this time not avoiding stale nodes.
    
            // excludedNodes contains the initial excludedNodes and nodes that were
            // not chosen because they were stale, decommissioned, etc.
            // We need to additionally exclude the nodes that were added to the 
            // result list in the successful calls to choose*() above.
            for (DatanodeStorageInfo resultStorage : results) {
              addToExcludedNodes(resultStorage.getDatanodeDescriptor(), oldExcludedNodes);
            }
            // Set numOfReplicas, since it can get out of sync with the result list
            // if the NotEnoughReplicasException was thrown in chooseRandom().
            numOfReplicas = totalReplicasExpected - results.size();
            return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
                maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
                newBlock);
          }
    
          boolean retry = false;
          // simply add all the remaining types into unavailableStorages and give
          // another try. No best effort is guaranteed here.
          for (StorageType type : storageTypes.keySet()) {
            if (!unavailableStorages.contains(type)) {
              unavailableStorages.add(type);
              retry = true;
            }
          }
          if (retry) {
            for (DatanodeStorageInfo resultStorage : results) {
              addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
                  oldExcludedNodes);
            }
            numOfReplicas = totalReplicasExpected - results.size();
            return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
                maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
                newBlock);
          }
        }
        return writer;
      }
    
    

    相关文章

      网友评论

          本文标题:hdfs写之namenode端addBlock实现<五>

          本文链接:https://www.haomeiwen.com/subject/qwpvuctx.html