MapReduce框架原理

作者: Manfestain | 来源:发表于2020-05-23 15:00 被阅读0次

    最全的MapReduce框架原理,方便以后复习。知识点来自尚硅谷的课程学习。课程链接


    一、InputFormat数据输入

    1. 切片与MapTask并行度决定机制

    数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。实际存储在磁盘上,还是按照HDFS将数据分成一个一个Block进行存储。
    MapTask的并行速度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。

    决定机制原理
    • 当切片大小为100M时,不同节点之间需要数据传输,耗费大量IO,不高效;
    • 当切分时,只针对单个文件进行切分,不考虑文件之间的大小;
    • MapTask的数量由切片数量决定,切片的大小默认是块大小。

    2. Job提交流程源码解析

    Job提交流程源码解析
    waitForCompletion();
    
    // 1 建立连接
    connect();
      // 1.1 创建提交Job的代理
      new Cluster(getConfiguration());
      // 1.2 判断是本地yarn还是远程yarn
      initialize(jobTrackAddr, conf);
    
    // 2 提交job
    submitter.submitJobInternal(Job.this, cluster)
      // 2.1 创建给集群提交数据的Stag路径
      Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
      // 2.2 获取jobid,并创建Job路径
      JobID jobId = submitClient.getNewJobID();
      // 2.3 拷贝jar包到集群
      copyAndConfigureFiles(job, submitJobDir);
      rUpLoader.uploadFiles(job, jobSubmitDir);
      // 2.4 计算切片,生成切片规划文件
      writeSplits(job, submitJobDir);
      maps = writeNewSplits(job, jobSubmitDir);
      input.getSplits(job);
      // 2.5 向Stag路径写XML配置文件
      writeConf(conf, submitJobFile);
      conf.writeXml(out);
      // 2.6 提交Job,返回提交状态
      status = submitClint.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
    

    3. FileInputFormat切片机制

    • 根据计算公式,切片大小默认为块大小,本地模式切片大小为32M;
    • 公式中,默认minSize=1maxSize=Long.MAXValue
    • 切片大小设置:maxSize调的小于blockSize,则切片会变小。minSize调的比blockSize大,则可以让切片变大;
    • MrAppMaster根据切片数量计算MapTask个数;
    • 获取切片的文件名:
      String name = inputSplit.getPath().getName();
    • 根据文件类型获取切片信息:
      FileSplit inputSplit = (FileSplit) context.getInputSplit();

    4. CombineTextInputFormat切片机制

    框架默认的TextInputFormat切片机制是对任务按照文件进行切片,当有大量小文件时,就会产生大量的MapTask,处理效率及其低下。

    CombineTextFormat用于小文件过多的场景,从逻辑上将多个小文件规划到一个切片中,交给一个MapTask处理。


    • 虚拟存储切片最大值可以任意设置,但是要根据实际的小文件大小来具体设置:
      combineTextFormat.setMaxInputSplitSize(job, 4194304); // 4M
    • 主要分为两个过程:虚拟存储过程和切片过程。
    • 当存储时当文件大于4M,会判断是否比2*4M大。
    • 在实际使用时,需要处理很多小文件时,可以按如下操作,在Driver类中增加如下信息:
    // 如果不设置InputFormat,默认是TextInputFormat.class
    job.setInputFormatClass(CombineTextInputForamt.class);
    // 设置虚拟存储切片最大值
    CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
    

    5. FileInputFormat实现类

    在运行MapReduce程序时,针对不同格式的输入文件,MapReduce是如何读取这些数据的?

    FileInputFormat常见的接口实现类包括:

    • TextInputFormat
    • KeyValueTextInputFormat
    • NLineInputFormat
    • CombineTextInputFormat
    • 自定义的InputFormat
    5.1 TextInputFormat

    TextinputFormat是默认的FileInputFormat实现类。按行读取每条记录,健是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何终止符(换行,回车等),Text类型。

    5.2 KeyValueTextInputFormat

    每一行均为一条记录,被分割符分割为key和value,默认的分割符是"\t"。可以在驱动类中进行设置:
    conf.set(KeyValueLineRecoredReader,KEY_VALUE_SEPERATOR, "\t");
    此时的key是每行排在分割符之前的Text序列。

    5.3 NLineInputFormat

    如果使用NLineInpurFormat,代表每个MapTask进程处理的InputSplit不再按照Block块去切分,而是按照NLineInputFormat指定的行数N来切分。
    输入文件的总行数 \div N=切片数,如果不整除,切片数=商+1
    此时的key-value与TextInputFormat生成的一样。可以在驱动类中进行设置:
    NLineInputFormat.setNumLinesPerSplit(job, 3);

    5.4 自定义InputFormat

    具体步骤:
    1)自定义一个类继承FileInputFormat;
    2)改写RecordReader,实现自定义读取并封装为key-value;
    3)在输出时使用SequenceFileOutPutFormat输出合并文件。

    SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式,文件路径+文件名为key,文件内容为value。

    自定义InputFormat案例:


    实现步骤

    代码实现:
    WholeFileInputFormat.java

    package Inputformat;
    
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import java.io.IOException;
    
    public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
    
        @Override
        public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    
            WholeRecordReader recordReader = new WholeRecordReader();
            recordReader.initialize(split, context);
    
            return recordReader;
        }
    }
    

    WholeRecordReader.java

    package Inputformat;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.BytesWritable;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    import java.io.IOException;
    
    public class WholeRecordReader extends RecordReader<Text, BytesWritable> {
    
        FileSplit split;
        Configuration configuration;
        Text k = new Text();
        BytesWritable v = new BytesWritable();
        boolean isProgress = true;
    
        // 初始化
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            this.split = (FileSplit) split;
            configuration = context.getConfiguration();
        }
    
        // 核心的业务逻辑
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
    
            if (isProgress) {
                byte[] buf = new byte[(int) split.getLength()];
    
                // 1 获取Fs对象
                Path path = split.getPath();
                FileSystem fs = path.getFileSystem(configuration);
    
                // 2 获取输入流
                FSDataInputStream fis = fs.open(path);
    
                // 3 拷贝
                IOUtils.readFully(fis, buf, 0, buf.length);
    
                // 4 封装v
                v.set(buf, 0, buf.length);
    
                // 5 封装k
                k.set(path.toString());
    
                // 6 关闭资源
                IOUtils.closeStream(fis);
    
                isProgress = false;
    
                return true;
            }
    
            return false;
        }
    
        public Text getCurrentKey() throws IOException, InterruptedException {
            return k;
        }
    
        public BytesWritable getCurrentValue() throws IOException, InterruptedException {
            return v;
        }
    
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }
    
        public void close() throws IOException {
    
        }
    }
    

    完成相应的Mapper和Reducer类,并在Driver中增加两句:

    // 4 设置输入的InputFormat
    job.setInputFormatClass(WholeFileInputFormat.class);
    
    // 5 设置输出的OutputFormat
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    

    二、Shuffle机制

    Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

    1. Partition分区

    分区就是将计算结果按照条件输出到不同文件中。比如按照手机号归属地将不同省份输出到不同文件中。

    1.1系统默认的分区是Hash分区:
    public class Hashpartitioner<K, V> extends Partitioner<K, V> {
      public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hasCode() & Integer.MAX_VALUE) % numReduceTasks;
      }
    }
    
    • 默认分区是根据key的HashCode对ReduceTasks个数取模得到的;
    • 用户没法控制那个Key存储到那个分区。
    1.2 自定义Partition分区

    自定义步骤:



    案例实现:

    按照手机号前三位将统计结果写入到不同的文件。

    自定义Partition类ProvincePartitioner.java

    package Flowsum;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    
        @Override
        public int getPartition(Text key, FlowBean value, int numPartitions) {
    
            // 1 获取手机号前三位
            String prePhoneNum = key.toString().substring(0, 3);
    
            int partition = 3;
            if ("136".equals(prePhoneNum)) {
                partition = 0;
            }else if ("137".equals(prePhoneNum)) {
                partition = 1;
            }else if ("138".equals(prePhoneNum)) {
                partition = 2;
            }
            
            return partition;
        }
    }
    

    编写Mapper和Reducer,在Driver类中增加如下代码:

    // 5 设置Partition分区
     job.setPartitionerClass(ProvincePartitioner.class);
     job.setNumReduceTasks(4);
    
    • 在定义分区时,分区号一定要严格按照顺序从0开始;
    • 在设置分区数时:
      当忘记设置或者设置为1,则最终只会产生一个文件,程序会将所有的分区数据统统写入一个文件;
      当设置的分区数在1和实际分区数之间会报错(IO异常);
      当设置的分区数大于实际分区数,会生成期待的结果,但是会多出超出分区个数个空文件,因为实际运行中,没有对应的数据传给多余的ReduceTask。
    1.3 WritableComparable排序

    排序是MapReduce中的重要操作。

    MapTask和ReduceTask均会对数据按照key进行排序 ,属于Hadoop的默认行为。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序

    MapReduce排序分类:


    自定义排序:
    bean对象作为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

    示例编写:

    按照电话所属区将统计数据存储到不同的文件中,并在每个文件中实现按总流量的逆序。

    重写编写FlowBean.java

    package Sort;
    
    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class FlowBean implements WritableComparable<FlowBean> {
    
        private long upFlow;
        private long downFlow;
        private long sumFlow;
    
        public FlowBean() {
            super();
        }
    
        public FlowBean(long upFlow, long downFlow) {
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            sumFlow = upFlow + downFlow;
        }
    
        // 核心的比较
        public int compareTo(FlowBean bean) {
    
            int result;
    
            if (sumFlow > bean.getSumFlow()) {
                result = -1;
            }else if(sumFlow < bean.getSumFlow()) {
                result = 1;
            }else{
                result = 0;
            }
    
            return result;
        }
    
        // 序列化方法
        public void write(DataOutput out) throws IOException {
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        // 反序列化方法
        public void readFields(DataInput in) throws IOException {
            upFlow = in.readLong();
            downFlow = in.readLong();
            sumFlow = in.readLong();
        }
    
        public long getUpFlow() {
            return upFlow;
        }
    
        public void setUpFlow(long upFlow) {
            this.upFlow = upFlow;
        }
    
        public long getDownFlow() {
            return downFlow;
        }
    
        public void setDownFlow(long downFlow) {
            this.downFlow = downFlow;
        }
    
        public long getSumFlow() {
            return sumFlow;
        }
    
        public void setSumFlow(long sumFlow) {
            this.sumFlow = sumFlow;
        }
    
        @Override
        public String toString() {
            return upFlow + "\t" + downFlow + "\t" + sumFlow;
        }
    }
    
    • 此时,要继承WritableComparable类;
    • 重写compareTo方法,这是排序比较的核心;

    编写Mapper类,FlowCountSortMapper.java

    package Sort;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
    
        FlowBean k = new FlowBean();
        Text v = new Text();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            // 1 获取一行
            String line = value.toString();
    
            // 2 拆分
            String[] fields = line.split("\t");
    
            // 3 封装
            k.setUpFlow(Long.parseLong(fields[1]));   // 流量作为key
            k.setDownFlow(Long.parseLong(fields[2]));
            k.setSumFlow(Long.parseLong(fields[3]));
    
            v.set(fields[0]);  // 电话号码作为Value
    
            // 3 写出
            context.write(k, v);
        }
    }
    
    • 需要对数据按照总流量排序,所以Map阶段结束后的key应该是Flowbean对象,value为电话号码.

    编写Reducer类,FlowCountSortReducer.java

    package Sort;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
    
        @Override
        protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    
            for (Text value : values) {
                context.write(value, key);
            }
        }
    }
    
    • 排序结束后,生成的数据还是按照电话,流量展示,所以此时输出的key为电话号码,value为FlowBean。

    编写Partition类,ProvincePartitioner.java

    package Sort;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class ProvincePartitioner extends Partitioner<FlowBean, Text> {
    
        public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
    
            String prePhoneNum = text.toString().substring(0, 3);
    
            int partition = 3;
    
            if ("136".equals(prePhoneNum)) {
                partition= 0;
            }else if ("137".equals(prePhoneNum)) {
                partition = 1;
            }else if ("138".equals(prePhoneNum)) {
                partition = 2;
            }
    
            return partition;
        }
    }
    
    • 在Reducer阶段,需要将不同所属区的数据写入不同文件,此时需要对数据进行Partition分区。

    编写驱动类,FlowCountSortDriver.java

    package Sort;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class FlowCountSortDriver {
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(FlowCountSortDriver.class);
            job.setMapperClass(FlowCountSortMapper.class);
            job.setReducerClass(FlowCountSortReducer.class);
    
            job.setPartitionerClass(ProvincePartitioner.class);
            job.setNumReduceTasks(4);
    
            job.setMapOutputKeyClass(FlowBean.class);
            job.setMapOutputValueClass(Text.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
    
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.waitForCompletion(true);
        }
    }
    

    相关文章

      网友评论

        本文标题:MapReduce框架原理

        本文链接:https://www.haomeiwen.com/subject/ivdcahtx.html