MapReduce简介
MapReduce是一种分布式计算模型,是Google提出的,主要用于搜索领域,解决海量数据的计算问题。
MR有两个阶段组成:Map和Reduce,用户只需实现map()和reduce()两个函数,即可实现分布式计算。
MapReduce执行流程
MapReduce原理
MapReduce的执行步骤:
1、Map任务处理
1.1 读取HDFS中的文件。每一行解析成一个。每一个键值对调用一次map函数。<0,hello you> <10,hello me>
1.2 覆盖map(),接收1.1产生的,进行处理,转换为新的输出。
1.3 对1.2输出的进行分区。默认分为一个区。详见《Partitioner》
1.4 对不同分区中的数据进行排序(按照k)、分组。分组指的是相同key的value放到一个集合中。 排序后: 分组后:
1.5 (可选)对分组后的数据进行归约。详见《Combiner》
2、Reduce任务处理
2.1 多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点上。(shuffle)详见《shuffle过程分析》
2.2 对多个map的输出进行合并、排序。覆盖reduce函数,接收的是分组后的数据,实现自己的业务逻辑,
处理后,产生新的输出。
2.3 对reduce输出的写到HDFS中。
Java代码实现
注:要导入org.apache.hadoop.fs.FileUtil.java。
1、先创建一个hello文件,上传到HDFS中
2、然后再编写代码,实现文件中的单词个数统计(代码中被注释掉的代码,是可以省略的,不省略也行)
1package mapreduce; 2 3import java.net.URI; 4import org.apache.hadoop.conf.Configuration; 5import org.apache.hadoop.fs.FileSystem; 6import org.apache.hadoop.fs.Path; 7import org.apache.hadoop.io.LongWritable; 8import org.apache.hadoop.io.Text; 9import org.apache.hadoop.mapreduce.Job; 10import org.apache.hadoop.mapreduce.Mapper; 11import org.apache.hadoop.mapreduce.Reducer; 12import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 13import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 14import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 16 17publicclass WordCountApp { 18staticfinalString INPUT_PATH = "hdfs://chaoren:9000/hello"; 19staticfinalString OUT_PATH = "hdfs://chaoren:9000/out"; 20 21publicstaticvoidmain(String[] args)throws Exception { 22Configuration conf =new Configuration(); 23FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); 24Path outPath =new Path(OUT_PATH); 25if (fileSystem.exists(outPath)) { 26fileSystem.delete(outPath,true); 27 } 28 29Job job =newJob(conf, WordCountApp.class.getSimpleName()); 30 31// 1.1指定读取的文件位于哪里 32 FileInputFormat.setInputPaths(job, INPUT_PATH); 33// 指定如何对输入的文件进行格式化,把输入文件每一行解析成键值对 34//job.setInputFormatClass(TextInputFormat.class); 35 36// 1.2指定自定义的map类 37job.setMapperClass(MyMapper.class); 38// map输出的类型。如果的类型与类型一致,则可以省略 39//job.setOutputKeyClass(Text.class); 40//job.setOutputValueClass(LongWritable.class); 41 42// 1.3分区 43//job.setPartitionerClass(org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class); 44// 有一个reduce任务运行 45//job.setNumReduceTasks(1); 46 47// 1.4排序、分组 48 49// 1.5归约 50 51// 2.2指定自定义reduce类 52job.setReducerClass(MyReducer.class); 53// 指定reduce的输出类型 54job.setOutputKeyClass(Text.class); 55job.setOutputValueClass(LongWritable.class); 56 57// 2.3指定写出到哪里 58 FileOutputFormat.setOutputPath(job, outPath); 59// 指定输出文件的格式化类 60//job.setOutputFormatClass(TextOutputFormat.class); 61 62// 把job提交给jobtracker运行 63job.waitForCompletion(true); 64 } 65 66/** 67 * 68 * KEYIN 即K1 表示行的偏移量 69 * VALUEIN 即V1 表示行文本内容 70 * KEYOUT 即K2 表示行中出现的单词 71 * VALUEOUT 即V2 表示行中出现的单词的次数,固定值1 72 * 73*/ 74staticclassMyMapperextends 75Mapper { 76protectedvoid map(LongWritable k1, Text v1, Context context) 77throws java.io.IOException, InterruptedException { 78String[] splited = v1.toString().split("\t"); 79for (String word : splited) { 80context.write(newText(word),newLongWritable(1)); 81 } 82 }; 83 } 84 85/** 86 * KEYIN 即K2 表示行中出现的单词 87 * VALUEIN 即V2 表示出现的单词的次数 88 * KEYOUT 即K3 表示行中出现的不同单词 89 * VALUEOUT 即V3 表示行中出现的不同单词的总次数 90*/ 91staticclassMyReducerextends 92Reducer { 93protectedvoidreduce(Text k2, java.lang.Iterable v2s, 94Context ctx)throws java.io.IOException, 95 InterruptedException { 96longtimes = 0L; 97for (LongWritable count : v2s) { 98times += count.get(); 99 }100ctx.write(k2,new LongWritable(times));101 };102 }103}
3、运行成功后,可以在Linux中查看操作的结果
网友评论