美文网首页
切片逻辑之CombineFileInputFormat --

切片逻辑之CombineFileInputFormat --

作者: 苏坡闷 | 来源:发表于2019-03-08 16:51 被阅读0次
    将文件按照maxSplitSize进行逻辑切片,划分为若干部分!(在minSplitSizeNode(同一节点的数据块)=0&minSplitSizeRac(同一机架的数据块)=0的情况下)
        判断待切部分是否  <  maxSplitSize,如果小于整个作为 1 part
        maxSplitSize <待切部分 < maxSplitSize * 2, 将待切部分均分为2 part
        待切部分  > maxSplitSize * 2, 先切出 maxSplitSize作为一部分,再循环判断!
        将多个part进行组合,只要大小超过maxSplitSize,这些part就作为1片!
    如果minSplitSizeNode&minSplitSizeRack不为零,则还要另行做判断
    

    1.getSplit()

    @Override
      public List<InputSplit> getSplits(JobContext job) throws IOException {
        long minSizeNode = 0;
        long minSizeRack = 0;
        long maxSize = 0;
        Configuration conf = job.getConfiguration();
    
        // 通过setxxxSplitSize()方法设置的参数值会覆盖掉从配置文件中读取的参数值
        if (minSplitSizeNode != 0) {
          minSizeNode = minSplitSizeNode;
        } else {
          minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0);
        }
        if (minSplitSizeRack != 0) {
          minSizeRack = minSplitSizeRack;
        } else {
          minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0);
        }
        if (maxSplitSize != 0) {
          maxSize = maxSplitSize;
        } else {
    
          //如果maxSize没有配置,整个Node生成一个Split
          maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0);
        
        }
        if (minSizeNode != 0 && maxSize != 0 && minSizeNode > maxSize) {
          throw new IOException("Minimum split size pernode " + minSizeNode +
                                " cannot be larger than maximum split size " +
                                maxSize);
        }
        if (minSizeRack != 0 && maxSize != 0 && minSizeRack > maxSize) {
          throw new IOException("Minimum split size per rack " + minSizeRack +
                                " cannot be larger than maximum split size " +
                                maxSize);
        }
        if (minSizeRack != 0 && minSizeNode > minSizeRack) {
          throw new IOException("Minimum split size per node " + minSizeNode +
                                " cannot be larger than minimum split " +
                                "size per rack " + minSizeRack);
        }
    
        //获取输入路径中的所有文件
        List<FileStatus> stats = listStatus(job);
        List<InputSplit> splits = new ArrayList<InputSplit>();
        if (stats.size() == 0) {
          return splits;    
        }
    
        // 迭代为每个过滤池中的文件生成切片
       //一个切片中的数据块只可能来自于同一个过滤池,但可以来自同一个过滤池中的不同文件
        for (MultiPathFilter onepool : pools) {
          ArrayList<FileStatus> myPaths = new ArrayList<FileStatus>();
    
          
          //获取满足当前过滤池实例onepool的所有文件myPaths
          for (Iterator<FileStatus> iter = stats.iterator(); iter.hasNext();) {
            FileStatus p = iter.next();
            if (onepool.accept(p.getPath())) {
              myPaths.add(p); // add it to my output set
              iter.remove();
            }
          }
          //为mypaths中的文件生成切片
          getMoreSplits(job, myPaths, maxSize, minSizeNode, minSizeRack, splits);
        }
    
        //为不属于任何过滤池的文件生成切片
        getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
    
        //free up rackToNodes map
        rackToNodes.clear();
        return splits;    
      }
    

    2.源码:getMoreSplits()
    无论是满足某过滤池实例 onePool 条件的文件,还是不属于任何过滤池的文件,可以笼统地理解为 "一批文件",getMoreSplits()就是为这一批文件生成切片的。

    /**
       * Return all the splits in the specified set of paths
       */
      private void getMoreSplits(JobContext job, List<FileStatus> stats,
                                 long maxSize, long minSizeNode, long minSizeRack,
                                 List<InputSplit> splits)
        throws IOException {
        Configuration conf = job.getConfiguration();
    
        //OneFileInfo类:代表一个文件 
        OneFileInfo[] files;
    
      
        //rackToBlocks:机架和数据块的对应关系,即某一个机架上有哪些数据块;
        HashMap<String, List<OneBlockInfo>> rackToBlocks = 
                                  new HashMap<String, List<OneBlockInfo>>();
    
        //blockToNodes:数据块与节点的对应关系,即一块数据块的“拷贝”位于哪些节点
        HashMap<OneBlockInfo, String[]> blockToNodes = 
                                  new HashMap<OneBlockInfo, String[]>();
    
        //nodeToBlocks:节点和数据块的对应关系,即某一个节点上有哪些数据块;
        HashMap<String, Set<OneBlockInfo>> nodeToBlocks = 
                                  new HashMap<String, Set<OneBlockInfo>>();
      
        files = new OneFileInfo[stats.size()];
        if (stats.size() == 0) {
          return; 
        }
    
       /**
        * 迭代这"一批文件",为每一个文件构建OneFileInfo对象
        * OneFileInfo对象在构建过程中维护了上述三个对应关系的信息。
        * 迭代完成之后,即可以认为数据块、节点、机架相互之间的对应关系已经建立完毕
        * 接下来可以根据这些信息生成切片
        */
        long totLength = 0;
        int i = 0;
        for (FileStatus stat : stats) {
          files[i] = new OneFileInfo(stat, conf, isSplitable(job, stat.getPath()),
                                     rackToBlocks, blockToNodes, nodeToBlocks,
                                     rackToNodes, maxSize);
          totLength += files[i].getLength();
        }
    
        //切片的形成过程
        createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, 
                     maxSize, minSizeNode, minSizeRack, splits);
      }
    

    3.源码:createSplits()

      @VisibleForTesting
      void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
                         Map<OneBlockInfo, String[]> blockToNodes,
                         Map<String, List<OneBlockInfo>> rackToBlocks,
                         long totLength,
                         long maxSize,
                         long minSizeNode,
                         long minSizeRack,
                         List<InputSplit> splits                     
                        ) {
    
        //保存当前切片所包含的数据块
        ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
    
        //保存当前切片的大小
        long curSplitSize = 0;
        
        int totalNodes = nodeToBlocks.size();
        long totalLength = totLength;
    
        Multiset<String> splitsPerNode = HashMultiset.create();
        Set<String> completedNodes = new HashSet<String>();
        
        while(true) {
          // it is allowed for maxSize to be 0. Disable smoothing load for such cases
    
          //逐个节点(数据块)形成切片
          // process all nodes and create splits that are local to a node. Generate
          // one split per node iteration, and walk over nodes multiple times to
          // distribute the splits across nodes. 
          for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
              .entrySet().iterator(); iter.hasNext();) {
            Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
            
            String node = one.getKey();
            
            // Skip the node if it has previously been marked as completed.
            if (completedNodes.contains(node)) {
              continue;
            }
    
            Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
    
            // for each block, copy it into validBlocks. Delete it from
            // blockToNodes so that the same block does not appear in
            // two different splits.
            Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
            while (oneBlockIter.hasNext()) {
              OneBlockInfo oneblock = oneBlockIter.next();
              
              // Remove all blocks which may already have been assigned to other
              // splits.
              if(!blockToNodes.containsKey(oneblock)) {
                oneBlockIter.remove();
                continue;
              }
            
              validBlocks.add(oneblock);
              blockToNodes.remove(oneblock);
              curSplitSize += oneblock.length;
    
              // if the accumulated split size exceeds the maximum, then
              // create this split.
    
              //如果数据块累积大小大于或等于maxSize,则形成一个切片
              if (maxSize != 0 && curSplitSize >= maxSize) {
                // create an input split and add it to the splits array
                addCreatedSplit(splits, Collections.singleton(node), validBlocks);
                totalLength -= curSplitSize;
                curSplitSize = 0;
    
                splitsPerNode.add(node);
    
                // Remove entries from blocksInNode so that we don't walk these
                // again.
                blocksInCurrentNode.removeAll(validBlocks);
                validBlocks.clear();
    
                // Done creating a single split for this node. Move on to the next
                // node so that splits are distributed across nodes.
                break;
              }
    
            }
            if (validBlocks.size() != 0) {
              // This implies that the last few blocks (or all in case maxSize=0)
              // were not part of a split. The node is complete.
              
              // if there were any blocks left over and their combined size is
              // larger than minSplitNode, then combine them into one split.
              // Otherwise add them back to the unprocessed pool. It is likely
              // that they will be combined with other blocks from the
              // same rack later on.
              // This condition also kicks in when max split size is not set. All
              // blocks on a node will be grouped together into a single split.
    
              // 如果剩余数据块大小大于或等于minSizeNode,则将这些数据块构成一个切片;
           // 如果剩余数据块大小小于minSizeNode,则将这些数据块归还给blockToNodes,交由后期“同一机架”过程处理
    
              if (minSizeNode != 0 && curSplitSize >= minSizeNode
                  && splitsPerNode.count(node) == 0) {
                // haven't created any split on this machine. so its ok to add a
                // smaller one for parallelism. Otherwise group it in the rack for
                // balanced size create an input split and add it to the splits
                // array
                addCreatedSplit(splits, Collections.singleton(node), validBlocks);
                totalLength -= curSplitSize;
                splitsPerNode.add(node);
                // Remove entries from blocksInNode so that we don't walk this again.
                blocksInCurrentNode.removeAll(validBlocks);
                // The node is done. This was the last set of blocks for this node.
              } else {
                // Put the unplaced blocks back into the pool for later rack-allocation.
                for (OneBlockInfo oneblock : validBlocks) {
                  blockToNodes.put(oneblock, oneblock.hosts);
                }
              }
              validBlocks.clear();
              curSplitSize = 0;
              completedNodes.add(node);
            } else { // No in-flight blocks.
              if (blocksInCurrentNode.size() == 0) {
                // Node is done. All blocks were fit into node-local splits.
                completedNodes.add(node);
              } // else Run through the node again.
            }
          }
    
          // Check if node-local assignments are complete.
          if (completedNodes.size() == totalNodes || totalLength == 0) {
            // All nodes have been walked over and marked as completed or all blocks
            // have been assigned. The rest should be handled via rackLock assignment.
            LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
                + completedNodes.size() + ", size left: " + totalLength);
            break;
          }
        }
        //逐个机架(数据块)形成切片
        // if blocks in a rack are below the specified minimum size, then keep them
        // in 'overflow'. After the processing of all racks is complete, these 
        // overflow blocks will be combined into splits.
        //overflowBlocks用于保存“同一机架”过程处理之后剩余的数据块
        ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
        Set<String> racks = new HashSet<String>();
    
        // Process all racks over and over again until there is no more work to do.
        while (blockToNodes.size() > 0) {
    
          // Create one split for this rack before moving over to the next rack. 
          // Come back to this rack after creating a single split for each of the 
          // remaining racks.
          // Process one rack location at a time, Combine all possible blocks that
          // reside on this rack as one split. (constrained by minimum and maximum
          // split size).
    
          //依次处理每个机架 
          for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = 
               rackToBlocks.entrySet().iterator(); iter.hasNext();) {
    
            Map.Entry<String, List<OneBlockInfo>> one = iter.next();
            racks.add(one.getKey());
            List<OneBlockInfo> blocks = one.getValue();
    
            // for each block, copy it into validBlocks. Delete it from 
            // blockToNodes so that the same block does not appear in 
            // two different splits.
            boolean createdSplit = false;
    
            //依次处理该机架的每个数据块
            for (OneBlockInfo oneblock : blocks) {
              if (blockToNodes.containsKey(oneblock)) {
                validBlocks.add(oneblock);
                blockToNodes.remove(oneblock);
                curSplitSize += oneblock.length;
          
                // if the accumulated split size exceeds the maximum, then 
                // create this split.如果数据块累积大小大于或等于maxSize,则形成一个切片
                if (maxSize != 0 && curSplitSize >= maxSize) {
                  // create an input split and add it to the splits array
                  addCreatedSplit(splits, getHosts(racks), validBlocks);
                  createdSplit = true;
                  break;
                }
              }
            }
    
            // if we created a split, then just go to the next rack
            if (createdSplit) {
              curSplitSize = 0;
              validBlocks.clear();
              racks.clear();
              continue;
            }
    
            if (!validBlocks.isEmpty()) {
    
              //如果剩余数据块大小大于或等于minSizeRack,则将这些数据块构成一个切片
              if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
                // if there is a minimum size specified, then create a single split
                // otherwise, store these blocks into overflow data structure
                addCreatedSplit(splits, getHosts(racks), validBlocks);
              } else {
                // There were a few blocks in this rack that 
                // remained to be processed. Keep them in 'overflow' block list. 
                // These will be combined later.
      
                //如果剩余数据块大小小于minSizeRack,则将这些数据块加入overflowBlocks
                overflowBlocks.addAll(validBlocks);
              }
            }
            curSplitSize = 0;
            validBlocks.clear();
            racks.clear();
          }
        }
    
        assert blockToNodes.isEmpty();
        assert curSplitSize == 0;
        assert validBlocks.isEmpty();
        assert racks.isEmpty();
    
        //遍历并累加剩余数据块
        for (OneBlockInfo oneblock : overflowBlocks) {
          validBlocks.add(oneblock);
          curSplitSize += oneblock.length;
    
          // This might cause an exiting rack location to be re-added,
          // but it should be ok.
          for (int i = 0; i < oneblock.racks.length; i++) {
            racks.add(oneblock.racks[i]);
          }
    
          // if the accumulated split size exceeds the maximum, then 
          // create this split.
          // 如果剩余数据块大小大于或等于maxSize,则将这些数据块构成一个切片
          if (maxSize != 0 && curSplitSize >= maxSize) {
            // create an input split and add it to the splits array
            addCreatedSplit(splits, getHosts(racks), validBlocks);
            curSplitSize = 0;
            validBlocks.clear();
            racks.clear();
          }
        }
    
        //剩余数据块形成一个切片
        if (!validBlocks.isEmpty()) {
          addCreatedSplit(splits, getHosts(racks), validBlocks);
        }
      }
    

    参考文献:
    解读:CombineFileInputFormat类

    相关文章

      网友评论

          本文标题:切片逻辑之CombineFileInputFormat --

          本文链接:https://www.haomeiwen.com/subject/zptrpqtx.html