WordCountMapper.java
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* LongWritable Text --input value/key
* Text IntWritable --output value/key
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
/**
* @throws InterruptedException
* @throws IOException
*
*/
public void map(LongWritable key, Text lineText, Context context) throws IOException, InterruptedException {
StringTokenizer iter = new StringTokenizer(lineText.toString());
while(iter.hasMoreTokens()) {
word.set(iter.nextToken());
context.write(word, one);
}
}
}
WordCountReducer.java
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
//
private IntWritable res = new IntWritable();
/**
*
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable value : values) {
sum += value.get();
}
res.set(sum);
context.write(key, res);
}
}
WordCountRunner.java
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountRunner extends Configured implements Tool {
public static void main(String[] args) {
try {
System.exit(ToolRunner.run(new WordCountRunner(), args));
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(getConf(), "WordCountMR");
job.setJarByClass(getClass());
// input path
FileInputFormat.addInputPath(job, new Path(args[0]));
// input format class
job.setInputFormatClass(TextInputFormat.class);
// output path
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// output format class
job.setOutputFormatClass(TextOutputFormat.class);
// set mapper class
job.setMapperClass(WordCountMapper.class);
// set mapper output key class
job.setMapOutputKeyClass(Text.class);
// set mapper output value class
job.setMapOutputValueClass(IntWritable.class);
job.setCombinerClass(WordCountReducer.class );
// set reducer class
job.setReducerClass(WordCountReducer.class);
// set reducer output key class
job.setOutputKeyClass(Text.class);
// set reducer output value class
job.setOutputValueClass(IntWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
}
写完上面几个类之后,需要把他们打成jar包(我的是WordCountMR.jar),上传到hadoop服务器。打包时不要忘了添加Main class (就是 WordCountRunner),否则无法执行job。提交作业的命令为(其中的路径都为HDFS中的):
hadoop jar WordCountMR.jar /temp/usr/install.log /temp/out/res1
设置多个reducer
hadoop jar WordCountMR.jar /temp/usr/install.log /temp/out/res1 -D mapred.reduce.tasks=3
总结
在我第一次写完这些文件,打包到hadoop中运行的时候,出现了一个警告,但并没有终止计算程序,最后查看结果时发现,统计结果并没有合并。出现了下面的情况:
*** 1
*** 1
/usr/share/info/gnupg.info 1
/usr/share/info/grep.info.gz 1
/usr/share/info/groff.gz 1
/usr/share/info/grub.info.gz 1
/usr/share/info/ipc.info 1
/usr/share/info/libidn.info.gz 1
/usr/share/info/multiboot.info.gz 1
/usr/share/info/sed.info.gz 1
/usr/share/info/which.info.gz 1
FINISHED 1
Header 1
ID 1
INSTALLING 1
Installing 1
Installing 1
Installing 1
...
很明显这是一个bug,不过是什么原因造成的呢?我第一反应就是reduce阶段并没有发生,最后发现在
WordCountReducer.java
文件中我把reduce方法写成了reducer导致reduce并没有执行。修改之后的最终结果为:
*** 2
/usr/share/info/gnupg.info 1
/usr/share/info/grep.info.gz 1
/usr/share/info/groff.gz 1
/usr/share/info/grub.info.gz 1
/usr/share/info/ipc.info 1
/usr/share/info/libidn.info.gz 1
/usr/share/info/multiboot.info.gz 1
/usr/share/info/sed.info.gz 1
/usr/share/info/which.info.gz 1
FINISHED 1
Header 1
ID 1
INSTALLING 1
Installing 155
网友评论