美文网首页我爱编程
四、分布式计算框架MapReduce

四、分布式计算框架MapReduce

作者: 薛定谔的猫_1406 | 来源:发表于2018-04-03 09:19 被阅读0次

    一、MapReduce概述

    MapReduce概述

    二、wordCount入门MapReduce

    wordcount入门MapReduce

    2.1 MapReduce编程模型之Map与Reduce阶段

    Map与Reduce阶段
    MapReduce执行步骤

    2.2 官网关于MapReduce执行步骤的描述

    key:指的是起始位置的偏移量,value指的是对应行的值。key和value必须实现以下两个接口

    inputs and OutPuts
    执行过程

    三、MapReduce核心概念

    3.1、Split:交由MapReduce作业来处理的数据块,是MapReduce中最小的计算单元
    HDFS:blocksize 是HDFS中最小的存储单元 128M
    默认情况下:他们两是一一对应的,当然我们也可以手工设置他们之间的关系(不建议)

    3.2、InputFormat:
    将我们的输入数据进行分片(split): InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
    TextInputFormat: 处理文本格式的数据


    几个reduceTask就有几个输出文件

    四、MapReduce架构之1/2.x

    1.x
    总结1
    总结2
    2.x

    五、Java版本的wordCount功能实现

    package com.imooc.hadoop.mapreduce;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    /**
     * Created by zhouzhouseela on 2018/4/2.
     */
    //这里的Map/Reduce的每一个键值对都要定义好
    public class WordCountApp {
    
        // map是读取输入的文件,投入的的是每一行文本的偏移量:每一行文本的一个键值对,产出一个String:long的键值对,比如
        // hadoop:1,linux:1这样的键值对。
        public class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
    
            //这里的key表示偏移量,value表示每一行字符串
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                    String line = value.toString();
                    String[] words = line.split(" ");
                    for(String word:words){
                        context.write(new Text(word),new LongWritable(1));
                    }
    
            }
        }
        //reduce投入的是map的产出,然后产出的是hadoop:2这样的汇总结果(归并操作)
        public class MyReduce extends Reducer<Text,LongWritable,Text,LongWritable>{
    
            // 因为输入的都是hadoop:1,hadoop1,linux:1这样的,把单词对应的values加起来就得到其频次。
            @Override
            protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
                Iterator<LongWritable> valueIt = values.iterator();
                long sum =0;
                while(valueIt.hasNext()){
                    sum+=valueIt.next().get();
                }
                context.write(key,new LongWritable(sum));
    
            }
        }
    
        public static void main(String[] args) throws Exception {
           Configuration configuration =new Configuration();
            Job job =Job.getInstance(configuration,"wordCount");
            job.setJarByClass(WordCountApp.class);
            //设置作业处理的输入路径
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            //设置map相关的参数
            job.setMapOutputValueClass(MyMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            // 设置reduce相关的参数
            job.setReducerClass(MyReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
    
            //设置作业处理的输出路径
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
    
    
    
        }
    }
    
    
    maven实现wc 解决MR过程中的一个bug
        FileSystem fileSystem = FileSystem.get(configuration);
        if(fileSystem.exists(new Path(args[1]))){
            fileSystem.delete(new Path(args[1]),true);
        }
    

    六、MapReduce编程之Combiner:相当于在本地进行reduce。

    Combiner

    6.1 使用场景

    • 适合求和、次数等
    • 对求平均的场景不适合

    七、MapReduce编程之Partitioner

    Partitioner编程
    分发的数据按照不同的分类来处理并且输出到不同的文件

    一个栗子

    • 假设有如下的文件,要求将相同类型的收入放在一个reduce上处理
      [图片上传失败...(image-eca267-1522660460404)]
    • 代码如下
      ···
      package com.imooc.hadoop.mapreduce;

    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Partitioner;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;
    import java.util.Iterator;

    //这里的Map/Reduce的每一个键值对都要定义好
    public class PartitionerDemo {

    // map是读取输入的文件,投入的的是每一行文本的偏移量:每一行文本的一个键值对,产出一个String:long的键值对,比如
    // hadoop:1,linux:1这样的键值对。
    public class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
    
        //这里的key表示偏移量,value表示每一行字符串
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] words = line.split(" ");
            // 对于如上的格式,每一行中的第一个是名称,第二个是价格
            for(String word:words){
                context.write(new Text(words[0]),new LongWritable(Long.parseLong(words[1])));
            }
    
        }
    }
    //reduce投入的是map的产出,然后产出的是hadoop:2这样的汇总结果(归并操作)
    public class MyReduce extends Reducer<Text,LongWritable,Text,LongWritable>{
    
        // 因为输入的都是hadoop:1,hadoop1,linux:1这样的,把单词对应的values加起来就得到其频次。
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            Iterator<LongWritable> valueIt = values.iterator();
            long sum =0;
            while(valueIt.hasNext()){
                sum+=valueIt.next().get();
            }
            context.write(key,new LongWritable(sum));
    
        }
    }
    // 定义Partition的Class
    public class MyPartitioner extends Partitioner<Text,LongWritable>{
    
        public int getPartition(Text key, LongWritable value, int i) {
            if(key.equals("xiaomi")){
                return 0;
            }
            if(key.equals("hauwei")){
                return 1;
            }
            if(key.equals("iphone7")){
                return 2;
            }
            return 3;
        }
    }
    
    public static void main(String[] args) throws Exception {
    
    
        Configuration configuration =new Configuration();
    
    
        FileSystem fileSystem = FileSystem.get(configuration);
        if(fileSystem.exists(new Path(args[1]))){
            fileSystem.delete(new Path(args[1]),true);
        }
    
        Job job =Job.getInstance(configuration,"wordCount");
        job.setJarByClass(WordCountApp.class);
        //设置作业处理的输入路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        //设置map相关的参数
        job.setMapOutputValueClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 设置reduce相关的参数
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        //设置job的partition
        job.setPartitionerClass(MyPartitioner.class);
        job.setNumReduceTasks(4);
    
        //设置作业处理的输出路径
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    
    
    
    }
    

    }

    ···

    七、Hadoop编程之JobHistory

    JobHistory

    相关文章

      网友评论

        本文标题:四、分布式计算框架MapReduce

        本文链接:https://www.haomeiwen.com/subject/oqrgcftx.html