一、程序
1、WCMapper.java
package cn.itcast.hadoop.mr.wordcount;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* 默认情况框架传递给我们的mapper的输入数据:key——要处理行的起始偏移量;value——行文本
* mapper的输出数据:key——文本;value—文本统计序列
*/
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
// 具体业务逻辑即写在此处,而且业务要处理的数据已经被框架传递进来,在方法参数key-value中
//super.map(key, value, context);
String line = value.toString();
String[] words = StringUtils.split(line, " ");
for(String word : words) {
context.write(new Text(word), new LongWritable(1));
}
}
}
2、 WCReducer.java
package cn.itcast.hadoop.mr.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
//框架在mapper处理完成后,将所有的kv对缓存起来,进行分组,然后传递进一个组<key,values{}>,调用一次reduce方法
//<hello,{1,1,1,1...}>
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
//super.reduce(arg0, arg1, arg2);
long count = 0;
for(LongWritable value : values) {
count += value.get();
}
context.write(new Text(key), new LongWritable(count));
}
}
3、WCRunner.java
package cn.itcast.hadoop.mr.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*
* 用于描述一个特定的作业
* 该作业使用哪一个类作为逻辑处理中的map、reduce
* 要处理的数据源于哪个路径,结果置于哪个路径
*/
public class WCRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//原来的new方法已经弃用,使用静态方法
//Job myjob = new Job();
Configuration conf = new Configuration();
Job myjob = Job.getInstance(conf);
//设置本job所用的类在哪个jar包(运行所需)
myjob.setJarByClass(WCRunner.class);
//本job使用的map、reduce的类
myjob.setMapperClass(WCMapper.class);
myjob.setReducerClass(WCReducer.class);
//Map的k-v输出
myjob.setMapOutputKeyClass(Text.class);
myjob.setMapOutputValueClass(LongWritable.class);
//Reduce的k-v输出(其实此方法可以对Map、reduce同时起作用,此处展示的是若map-reduce的数据类型有变动的写法)
myjob.setOutputKeyClass(Text.class);
myjob.setOutputValueClass(LongWritable.class);
//原始数据的位置
//FileInputFormat.setInputPaths(myjob, new Path("/myJavaAPItest/WordCount/SrcData"));//服务器
FileInputFormat.setInputPaths(myjob, new Path("C:\\Users\\13533\\Documents\\Java\\hadoop-local-test\\myJavaAPItest\\WordCount\\SrcData"));//本地模式
//输出数据的位置
//FileOutputFormat.setOutputPath(myjob, new Path("/myJavaAPItest/WordCount/Output"));//服务器
FileOutputFormat.setOutputPath(myjob, new Path("C:\\Users\\13533\\Documents\\Java\\hadoop-local-test\\myJavaAPItest\\WordCount\\Output"));//本地模式
//将job提交集群运行(运行进度提示true)
myjob.waitForCompletion(true);
}
}
二、运行
1、服务器(linux)
wordcount所在工程/所在包进行导包为myWordCount.jar
服务器上运行:hadoop jar myWordCount.jar cn.itcast.hadoop.mr.wordcount.WCRunner
2、本地(win)
直接运行WCRunner.java的main函数
网友评论