一. 代码如下
public class CombinerWordCountApp {
public static final int VALUE = 1;
public static final String WORD_CONNT = "wordConnt";
/**
* 封装了mapper和reduceer的方法
*
* @param agrs
*/
public static void main(String[] agrs) throws IOException, ClassNotFoundException, InterruptedException {
//1.创建配置
Configuration conf = new Configuration();
//如果输出目录已经存在,那么将其删除掉
Path path = new Path(agrs[1]);
FileSystem fileSystem = FileSystem.get(conf);
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);
System.out.println("输出目录是存在的,但是已经被删掉了");
}
//2.创建job
Job job = Job.getInstance(conf, WORD_CONNT);
//3.绑定job需要执行的类
job.setJarByClass(CombinerWordCountApp.class);
//4.设置job输入路径
FileInputFormat.setInputPaths(job, new Path(agrs[0]));
//5.设置job输出路径
FileOutputFormat.setOutputPath(job, new Path(agrs[1]));
//增加combine方式,在本地先进行一次reduce操作,减少网络数据传输量
job.setCombinerClass(MyReducer.class);
//6.设置job对应的map类及map的输入和输出参数类型
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//7.设置job对应的reduce类及reduce的输入和输出参数类型
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//8.提交作业
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] lineText = value.toString().split(" ");
for (String text : lineText) {
context.write(new Text(text), new LongWritable(VALUE));
}
}
}
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (LongWritable value : values) {
sum++;
}
context.write(key, new LongWritable(sum));
}
}
}
二.小结
使用MapReduce实现WordCount,代码可分为3个部分
- map阶段
1.1 map功能需要实现Mapper类,一般有setup/map/cleanup三个方法可以重写,setup只在map开始前调用一次,类似于Android中Activity的onCreate方法,可用于做初始化
1.2 cleanup在map结束后调用一次,类似于Android中Activity的onDestory方法,可用于销毁资源
1.3 map阶段具体的逻辑需要在map实现,map方法在每从前一阶段传过来一个键值对时,都会调用一次,这个方法回调了4个参数,参数类型是实现Mapper类时需要确定的泛型,第一个参数时从前面阶段传过来的map中的键类型,一般是LongWrite类型,表示这一行字符串的第一位字符在整个文档中的索引位置,第二个参数是从前面split阶段传过来的map的值的类型,一般是Text类型,表示需要处理的文档中的一行文字,第三个参数指的是输出阶段的键的类型,第四个参数值的是map输出阶段值的类型 - reduce阶段
2.1 跟map类似,不同的是reduce方法里面的第二个参数,据我理解,是map阶段传过来的值有可能是一个数组,如果map阶段采用了combine的方式,那么会将相同的键的对应的值放到一个数组里传过来,可以减少数据传输量 - job阶段
大概分为3个阶段
- 创建job
- 创建job需要Configuration对象,hdfs的很多属性可以在这里配置
- 定义job的属性
- job需要执行的类
- job的文件输入路径
- job的文件结果输出路径
- job执行的mapper类
- job的map输出键类型
- job的map输出值类型
- job的reducer类
- job的reduce输出键类型
- job的recude的输出值类型
- job需要执行的类
- 启动job
网友评论