此章只需要hadoop运行在本地模式,采用新API写第一个MapReduce程序 WordCount程序,并对此程序进行简单解释。
- 新建一个maven程序,添加hadoop依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.4</version>
</dependency>
2.编写WordCount代码
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class Main
{
public static class MyMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String word = value.toString();
context.write(new Text(word), new IntWritable(1));
}
}
public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val:values) {
sum+= val.get();
}
context.write(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(Main.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean status = job.waitForCompletion(true);
if(status){
System.exit(0);
}
else{
System.exit(1);
}
}
}
- 若是需要引入第三方库
要将之前代码的Job类放到run方法中,再用ToolRunner类来运行,为避免增加执行时参数,打jar包时要打成 *-with-dependencies.jar类型的。
public int run(String[] allArgs) throws Exception{
Job job = Job.getInstance(new Configuration());
job.setJarByClass(Main.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
String[] args = new GenericOptionsParser(getConf(),allArgs).getRemainingArgs();
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.submit();
return 0;
}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
ToolRunner.run(new Main(),args)
}
3.代码分析
两个继承自Mapper和Reducer的类是固定的,根据传入数据与传出类型来定义后面的数据类型。重写其中的map和reduce方法。map是负责处理输入格式为键值对的输入记录,输出也是键值对。reduce收到map产生的键和键对应的集合,处理后生成零个至多个键值对作为输出。
新API采用Job.class来配置作业,提交作业,控制作业运行和监控作业执行情况.引入第三方库时,ToolRunner.run()方法负责解析-libjars参数,他将这个任务委托给GenericOptionsParser.class执行。
4.打包后在hadoop本地环境执行
./hadoop jar MapReduceTest.jar Main /home/haduser/file/input/me /home/haduser/file/output
5.结果
image.png image.png
网友评论