美文网首页
hadoop (六)重新认识hadoop

hadoop (六)重新认识hadoop

作者: cnliu | 来源:发表于2018-07-31 23:24 被阅读0次

    hadoop (六)重新认识hadoop

    MapReudue 思想

    MapReudue采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各分节点共同完成,然后通过整合各分节点中间结果,得到最终的结果。简单地说,MapRedue就是"任务的分解与结果的汇总"。上述处理过程被高度的抽象为两个函数:mapper和reducer,mapper负责把任务分解成多个任务,reducer负责把分解后多任务处理的结果汇总起来。至于在并行编程中的其他种种复杂问题,如分式。。。。 工作调度、负载均衡、容错处理、网络通讯等,均由MapReduce框架负责处理。

    image

    Mapper阶段:

    框架将任务的输入数据分割成固定大小的片段(split)

    随后将每个split进一步分解成一批键值对<K1,V1>。Hadoop为每一个split创建一个Map任务用于执行用户自定义的map函数,并将对应的split中的<K1,V1>对作为输入,得到计算的中间结果<K2,V2>。

    接着将中间结果按照K2进行排序,并将key值相同的value放在一起形成一个新的列表,开成<K2,list(V2)>元组。最后再根据key值的范围将这些元组进行分组,对应不同的Reduce任务。

    Reducre阶段:

    Reducer把从不同Mapper接收来的数据整合在一起并进行排序,然后调用用户自定义的reduce函数,对输入的<K2,list(V2)>对进行相应的处理,得到健值对<K3,V3>并输出到HDFS

    框架为每个split创建一个Mapper,那么谁来确定Reducers的数量呢? 是配置文件,在mapred-site.xml中,有配置Reducers的数量,默认是1。

    Word Count 解读

    1. 将文件拆分成splits,由于测试文件较小,所以一个文件即为一个split,并将文件按行分割成<key,value>对,这一步由MapReduce框架自动完成,其中key值存的是偏移量。


      image
    2. 将分割好的<key,value>交给用户定义的map方法进行处理,生成新的<key,value>对,这段代码,正是我们所编写过的:

    public static class WordCountMapper extends MapReduceBase implements Mapper<Object ,Text ,Text ,IntWritable>{
    
        private final static IntWritable one = new IntWritable(1);
        private Text word =new Text();
    
        public void map(Object o, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
    
            StringTokenizer itr = new StringTokenizer(text.toString());
            while (itr.hasMoreTokens()){
                word.set(itr.nextToken());
                outputCollector.collect(word,one);
            }
        }
    }
    
    image
    1. 得到map方法输入的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key值相同的value值累加,得到Mapper的最终输出结果

      因为我们设置的Combine为: conf.setCombinerClass(WordCountReducer.class);

      public static class WordCountReducer extends MapReduceBase implements Reducer<Text , IntWritable ,Text ,IntWritable>{

       private IntWritable result = new IntWritable();
      
       public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
           int sum=0;
           while (iterator.hasNext()){
               sum+=iterator.next().get();
           }
      
           result.set(sum);
           outputCollector.collect(text,result);
       }
      

      }

    image
    1. Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为wordCount的输出结果。

      conf.setReducerClass(WordCountReducer.class); 同上。

    image

    关联Map-Combine-Reduce

    public static void main(String[] args) throws Exception{
        String input = "/user/liuzd/in";
        String output = "/user/liuzd/o_t_account/result";
        
        JobConf conf=new JobConf(WordCount.class);
        conf.setJobName("WordCount");
        conf.addResource("classpath:/core-site.xml");
        conf.addResource("classpath:/hdfs-site.xml");
        conf.addResource("classpath:/mapred-site.xml");
        
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        
        conf.setMapperClass(WordCountMapper.class);
        conf.setCombinerClass(WordCountReducer.class);
        conf.setReducerClass(WordCountReducer.class);
        
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        
        FileInputFormat.setInputPaths(conf,new Path(input));
        FileOutputFormat.setOutputPath(conf,new Path(output));
        
        JobClient.runJob(conf);
        System.exit(0);
    }

    相关文章

      网友评论

          本文标题:hadoop (六)重新认识hadoop

          本文链接:https://www.haomeiwen.com/subject/tebkvftx.html