今天我们开始对 mapreduce 进行操作,实现一个功能:编写 mapreduce 实现单词的统计功能,并使用 jar 在 yarn 上统计单词数。闲话少讲,接下来我们开始对 mapreduce 的编写。
为方便,我们先创建一个类:MyWordCountUp并使其继承自 Configured 以及实现 Tool 接口,结构如下:
public class MyWordCountUp extends Configured implements Tool{
public int run(String[] args) throws Exception{
}
}
一、在类中创建一个继承自 WordCountMapper 的类,用于切割数据并实现相关的逻辑(这里为方便是以内部类的形式创建的,看个人的需求):
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text mapKey = new Text();
private IntWritable mapValue = new IntWritable(1); //初始值赋值为1
//对我们独立元素中的每一个元素进行并行计算操作的函数
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
//以空格切割成字符串数组
String[] lines = line.split(" ");
for(String str : lines){
mapKey.set(str); //设值
context.write(mapKey, mapValue);
}
}
}
二、同理,创建一个继承自 Reducer 的内部类,用于统计单词的个数即聚合:
public static class WrodCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable reduceValue = new IntWritable();
//对我们独立元素中的数据进行合并
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
List<IntWritable> list = Lists.newArrayList(values);
int sum = 0;
for(IntWritable reduceValue : list){
sum += reduceValue.get(); //累加
}
reduceValue.set(sum);//设值
context.write(key, reduceValue);
}
}
三、在run 方法中操作 driver (组装所有的过程到job):
//run
public int run(String[] args) throws Exception {
//driver
//1) 获取 conf
Configuration configuration = this.getConf(); //由于类已继承 Configured ,因此直接调用其方法
//2) 创建 job
Job job = Job.getInstance(configuration,this.getClass().getSimpleName());
job.setJarByClass(this.getClass());
//3.1) 输入
Path path = new Path(args[0]);
FileInputFormat.addInputPath(job, path);
//3.2 设置 map
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 -> 在这里可以设置:分区、排序、分组、设置 reduce 的个数等
//3.3 设置 reduce
job.setReducerClass(WrodCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//3.4 输出
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
//4. 提交
boolean sucess = job.waitForCompletion(true);
return sucess ? 0 : 1;
}
四、主方法的实现:
public static void main(String[] args) {
//当打包成 jar 之前 记得注释掉
// args = new String[]{
// "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/wordcount.txt",
// "hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/output"
// };
Configuration configuration = new Configuration();
try {
//先判断文件夹是否存在
Path fileOutPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if(fileSystem.exists(fileOutPath)){
fileSystem.delete(fileOutPath, true); //删除
}
int status = ToolRunner.run(configuration, new MyWordCountUp(), args);
System.exit(status);
} catch (Exception e) {
e.printStackTrace();
}
}
五、到这一步我们的程序已经弄好了,接下来我们可以先在本地运行一下,注意main方法中路径要去掉注释,运行没有问题我们再打包成 jar ,(温馨提示:要测试必须要先启动集群服务:1. sbin/start-dfs.sh 2.sbin/start-yarn.sh):
六、我们将打包好的 jar 放在系统的 jars 文件夹下,然后进入 hadoop 目录下执行如下命令进行统计:
bin/hadoop jar /opt/jars/workcount.jar com.lcy.mr.MyWordCountUp hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/wordcount.txt hdfs://bigdata-pro01.lcy.com:9000/user/hdfs/output6
效果如下:
打开网页浏览效果:http://bigdata-pro01.lcy.com:50070
image.png
接下来我们再使用命令进行查询:
bin/hdfs dfs -text /user/hdfs/output6/part*
效果如下:
image.png
最后注意的是,此项目的配置是建立在第一篇项目之上的,即 pom.xml 已经配置好,还有,统计的数据文件已经在 hdfs 上。。。
网友评论