MapReduce

作者: Aluha_f289 | 来源:发表于2020-05-19 20:38 被阅读0次

    MapReduce模型

    MapReduce采用“分而治之”策略,一个大规模数据集进行分片,多个Map任务并行处理。实现“计算向数据靠拢”理念,而不比大量移动数据造成网络开销。
    MapReduce采用Master/Slave架构,一个Master,若干Slave。Master运行JobTracker负责作业调度,Slave运行TaskTracker负责具体作业处理。

    JobTracker

    1、负责任务调度与资源监控。

    2、监控Job和TaskTracker的健康状态,一旦失败,相应任务就要发生转移。

    3、跟踪任务进度,汇报给调度器,调度器根据在资源空闲时,分配合适的任务。

    TaskTracker

    1、定期使用“心跳”向JobTracker报告任务进度,同时接受新任务。

    2、使用“slot”等量划分资源,调度的基本单位,一个Task只有拥有一个“slot”才能执行,调度器就是把空闲的“slot”分配给Task,分为Map slot和Reduce slot。

    Task

    分为Map Task和Reduce Task,都由TaskTracker启动。

    image

    MapReduce执行过程

    image

    InputFormat对HDFS中的数据进行加载,进行split(逻辑分片,HDFS中的Block是物理分片),RR(RecordReader)将各个分片的数据从HDFS中读取出来以键值对输出作为Map函数(用户程序自己编写的逻辑)进行输入,输出中间结果进行Shufflc,传给Reduce函数输出最终结果。

    Split

    逻辑上进行分片,分片的依据用户可以自定义,但分片的数量决定了Map任务的数量,理想分片是HDFS的块。Reduce任务的数量通常是比集群中Reduce slot槽的总量略小一点。

    Shufflc

    分为Map端Shufflc和Reduce端Shufflc

    image

    Map端Shufflc

    image

    每个任务配一个缓存,溢写比例0.8

    1、分区默认采用哈希函数

    2、排序是默认操作

    3、合并不能改变最终结果,不一定发生

    4、Map任务全部结束前对溢写的文件(大于预定值可以再次合并)进行归并,得到一个大的本地文件

    5、JobTracker会检测Map任务进度,通知Reduce任务来处理数据

    Reduce端Shufflc

    image

    来自不同Map机器的数据先写入缓存,归并数据,对溢写文件进行归并,输入给Reduce任务。数据小的话不发生溢写直接给Reduce。

    MapReduce编程(重写map和reduce任务,实现词频统计。)

    package org.apache.hadoop.examples;
     
    import java.io.IOException;
    import java.util.Iterator;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
     
    public class WordCount {
        public WordCount() {
        }
       
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
            if(otherArgs.length < 2) {
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
            }
     
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(WordCount.class);
            job.setMapperClass(WordCount.TokenizerMapper.class);
            job.setCombinerClass(WordCount.IntSumReducer.class);
            job.setReducerClass(WordCount.IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
     
            for(int i = 0; i < otherArgs.length - 1; ++i) {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
            }
     
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
            System.exit(job.waitForCompletion(true)?0:1);
        }
       
      /**
        *Reduce类
        **/
        public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();
     
            public IntSumReducer() {
            }
     
            public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                int sum = 0;
     
                IntWritable val;
                for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
                    val = (IntWritable)i$.next();
                }
     
                this.result.set(sum);
                context.write(key, this.result);
            }
        }
     
      /**
        *Map类
        **/
        public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
            private static final IntWritable one = new IntWritable(1);
            private Text word = new Text();
     
            public TokenizerMapper() {
            }
       
            public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
     
                while(itr.hasMoreTokens()) {
                    this.word.set(itr.nextToken());
                    context.write(this.word, one);
                }
     
            }
        }
    
    }
    
    

    版权声明:本文为CSDN博主「隔壁阿源」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/weixin_41768073/java/article/details/82830833

    相关文章

      网友评论

          本文标题:MapReduce

          本文链接:https://www.haomeiwen.com/subject/zqeiohtx.html