MapReduce(二):分片

作者: b91cbec6a902 | 来源:发表于2019-01-11 10:52 被阅读0次

概述

基于Hadoop 2.x

核心方法:org.apache.hadoop.mapreduce.JobSubmitter#writeSplits

org.apache.hadoop.mapreduce.InputFormat 负责创建分片和分片记录读取器

// 创建分片,这个分片只是逻辑上的分片,不包含数据。每个分片都会启动一个Map进程处理。
public abstract  List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException;

public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context ) throws IOException, InterruptedException;

org.apache.hadoop.mapreduce.InputSplit 具体的分片信息

// 当前分片的大小,也就是字节数
public abstract long getLength() throws IOException, InterruptedException;

// 当前分片所属文件的地址,因为文件有多个副本,所以地址有多个
public abstract String[] getLocations() throws IOException, InterruptedException;

// SplitLocationInfo用来描述当前分片所属文件存储状况,是磁盘,还是内存,具体位置在哪。返回null代表文件的所有副本存储在磁盘。
public SplitLocationInfo[] getLocationInfo() throws IOException {
    return null;
}

具体的分片逻辑

org.apache.hadoop.mapreduce.lib.input.FileInputFormat

1、FileInputFormat中如何确定分片的大小?

// 最小分片大小,默认1
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));

// 最大分片大小,默认Long.MAX_VALUE
long maxSize = getMaxSplitSize(job);

// 块大小。文件系统是HDFS的话,就是HDFS块大小。
long blockSize = file.getBlockSize();

// 计算分片大小
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
}

从上面的逻辑可以看出,分片的大小为:maxSize,blockSize,minSize三者的中间值。

2、怎么分片?
①首先需要知道,被处理的这些文件的状态。

List<FileStatus> files = listStatus(job);
public class FileStatus implements Writable, Comparable<FileStatus> {
    // 地址
    private Path path;
    // 总字节数
    private long length;
    // 是否是目录
    private boolean isdir;
    // 副本数量
    private short block_replication;
    // 文件在文件系统(HDFS)上的块大小
    private long blocksize;
    private long modification_time;
    private long access_time;
    private FsPermission permission;
    private String owner;
    private String group;
    private Path symlink;
}

如果是分布式文件系统(HDFS)还有文件的所有块的信息。

public class LocatedFileStatus extends FileStatus {
    private BlockLocation[] locations;
}
public class BlockLocation {
    // 文件块所在的服务器的hostname,因为块有多个副本,所以有多个
    private String[] hosts; 
    // 文件块的所有缓存的hostname
    private String[] cachedHosts; // Datanode hostnames with a cached replica
    // 访问这个文件块的路径(IP:端口号)
    private String[] names; // Datanode IP:xferPort for accessing the block
    // 访问这个文件块的全路径
    private String[] topologyPaths; // Full path name in network topology
    // 文件块所有副本的storageId
    private String[] storageIds; // Storage ID of each replica
    // 文件块所有副本的存储类型
    private StorageType[] storageTypes; // Storage type of each replica
    // 文件块的起始位置偏移量
    private long offset;  // Offset of the block in the file
    // 文件块的长度(字节大小)
    private long length;
    // 是否是脏的文件块
    private boolean corrupt;
}

②开始分片

List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {...}

这里需要注意:分片的单位为一个具体的文件(这个文件在HDFS有可能有很多Block)。

// 文件的总大小
long bytesRemaining = length;
// 按照分片大小,开始makeSplit
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  splits.add(makeSplit(path, length-bytesRemaining, splitSize,
              blkLocations[blkIndex].getHosts(),
              blkLocations[blkIndex].getCachedHosts()));
  bytesRemaining -= splitSize;
}

if (bytesRemaining != 0) {
  int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
  splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
             blkLocations[blkIndex].getHosts(),
             blkLocations[blkIndex].getCachedHosts()));
}

SPLIT_SLOP :常量,值为1.1
bytesRemaining:还未进行分片的总大小
splitSize:分片的大小
bytesRemaining/splitSize > SPLIT_SLOP:如果未分片的大小 / 分片的大小 <= 1.1,那么就把剩余的划为一个分片。
举个栗子:一个129M的文件,在HDFS上会有两个block,一个128M,一个1M。分片大小为128M,那么最后分片的时候只会产生一个分片。

如果文件的大小为0怎么办?

//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));

仍然会创建一个分片,分片长度为0。

总结

1、分片的大小为maxSize,blockSize,minSize三者的中间值。
2、分片是以一个具体的文件为单位的,也就是说一个逻辑分片不会跨越两个文件的block。
3、同一个文件的分片,有可能跨越这个文件的两个block。
4、为了提高Map任务的数据本地性,应使InputSplit大小与block大小相同。

相关文章

  • MapReduce(二):分片

    概述 基于Hadoop 2.x 核心方法:org.apache.hadoop.mapreduce.JobSubmi...

  • hive map数的控制

    背景:最近执行一个 select count(*),发现mapreduce计算分片数很慢,且分片数的大小对不上 确...

  • MapReduce

    MapReduce模型 MapReduce采用“分而治之”策略,一个大规模数据集进行分片,多个Map任务并行处理。...

  • MapReduce

    MapReduce任务执行过程 MapReduce物理实现 1、一个数据块对应1~N个分片,默认是一个。 2.一...

  • MapReduce个人学习之路——Five

    1.MapReduce 1.1工作流程 按照时间顺序包括:输入分片(input split)、map阶段、comb...

  • Hadoop--MapReduce阶段的Split阶段

    一、介绍 在MapReduce处理过程中主要分为四个阶段:Split(分片)阶段、Map阶段、Shuffle(混排...

  • 四、分布式计算框架MapReduce

    一、MapReduce概述 二、wordCount入门MapReduce 2.1 MapReduce编程模型之Ma...

  • 大数据开发:MapReduce分片详解

    在大数据领域,MapReduce的计算模式,至今来说仍然具有典型的代表意义,以Hadoop、Spark为首的大数据...

  • 第十章 计算层优化之数据倾斜

    在MapReduce执行的过程中,会把任务的原始数据分片到多个Task中执行。想象以下场景,当任务的多数Task都...

  • 『学概念找员外』分片技术(四)

    【本文由“币嗨Bihi内容合伙人计划赞助”】 往期回顾:分片技术(一)分片技术(二)分片技术(三) 这个应该是分片...

网友评论

    本文标题:MapReduce(二):分片

    本文链接:https://www.haomeiwen.com/subject/qlnvrqtx.html