hadoop (六)重新认识hadoop
MapReudue 思想
MapReudue采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各分节点共同完成,然后通过整合各分节点中间结果,得到最终的结果。简单地说,MapRedue就是"任务的分解与结果的汇总"。上述处理过程被高度的抽象为两个函数:mapper和reducer,mapper负责把任务分解成多个任务,reducer负责把分解后多任务处理的结果汇总起来。至于在并行编程中的其他种种复杂问题,如分式。。。。 工作调度、负载均衡、容错处理、网络通讯等,均由MapReduce框架负责处理。
imageMapper阶段:
框架将任务的输入数据分割成固定大小的片段(split)
随后将每个split进一步分解成一批键值对<K1,V1>。Hadoop为每一个split创建一个Map任务用于执行用户自定义的map函数,并将对应的split中的<K1,V1>对作为输入,得到计算的中间结果<K2,V2>。
接着将中间结果按照K2进行排序,并将key值相同的value放在一起形成一个新的列表,开成<K2,list(V2)>元组。最后再根据key值的范围将这些元组进行分组,对应不同的Reduce任务。
Reducre阶段:
Reducer把从不同Mapper接收来的数据整合在一起并进行排序,然后调用用户自定义的reduce函数,对输入的<K2,list(V2)>对进行相应的处理,得到健值对<K3,V3>并输出到HDFS
框架为每个split创建一个Mapper,那么谁来确定Reducers的数量呢? 是配置文件,在mapred-site.xml中,有配置Reducers的数量,默认是1。
Word Count 解读
-
将文件拆分成splits,由于测试文件较小,所以一个文件即为一个split,并将文件按行分割成<key,value>对,这一步由MapReduce框架自动完成,其中key值存的是偏移量。
image -
将分割好的<key,value>交给用户定义的map方法进行处理,生成新的<key,value>对,这段代码,正是我们所编写过的:
public static class WordCountMapper extends MapReduceBase implements Mapper<Object ,Text ,Text ,IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word =new Text();
public void map(Object o, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException {
StringTokenizer itr = new StringTokenizer(text.toString());
while (itr.hasMoreTokens()){
word.set(itr.nextToken());
outputCollector.collect(word,one);
}
}
}
image
-
得到map方法输入的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key值相同的value值累加,得到Mapper的最终输出结果
因为我们设置的Combine为: conf.setCombinerClass(WordCountReducer.class);
public static class WordCountReducer extends MapReduceBase implements Reducer<Text , IntWritable ,Text ,IntWritable>{
private IntWritable result = new IntWritable(); public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException { int sum=0; while (iterator.hasNext()){ sum+=iterator.next().get(); } result.set(sum); outputCollector.collect(text,result); }
}
-
Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为wordCount的输出结果。
conf.setReducerClass(WordCountReducer.class); 同上。
关联Map-Combine-Reduce
public static void main(String[] args) throws Exception{
String input = "/user/liuzd/in";
String output = "/user/liuzd/o_t_account/result";
JobConf conf=new JobConf(WordCount.class);
conf.setJobName("WordCount");
conf.addResource("classpath:/core-site.xml");
conf.addResource("classpath:/hdfs-site.xml");
conf.addResource("classpath:/mapred-site.xml");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(WordCountMapper.class);
conf.setCombinerClass(WordCountReducer.class);
conf.setReducerClass(WordCountReducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf,new Path(input));
FileOutputFormat.setOutputPath(conf,new Path(output));
JobClient.runJob(conf);
System.exit(0);
}
网友评论