WordCount的业务逻辑:
1、mapTask 阶段处理每个数据分块的单词统计分析,思路是每遇到一个单词则把其转换成一个 key,value 对,比如单词 hello,就转换成 <’hello’,1> 发送给 reduceTask 去汇总
2、reduceTask 阶段将接受 mapTask 的结果,来做汇总计数
结果:
hdfs 1
hive 1
flink 1
hadoop 1
hello 5
spark 1
首先看Mapper:WordMapper.java
/**
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* KEYIN:是指框架读取到的数据的key的类型,在默认的InputFormat下,读取的key就是一行文本的起始的偏移量。所以key的类型Long
* VALUEIN:是指框架读取到的数据的value的类型,在默认的InputFormat下,读取的value就是一行文本。所以value的类型String
* KEYOUT:是指用户自定义的逻辑方法返回的数据中key的类型,由用户根据业务逻辑自己决定,在我们wordcount程序中,我们输出的key是单词,所以类型是String
* VALUEOUT:是指用户自定义的逻辑方法返回的数据中value的类型,由用户根据业务逻辑自己决定,在我们wordcount程序中,我们输出的value是次数,所以类型是Long
*
* 但是,String、Long 是jdk中自带的数据类型,在序列化的时候,效率低。
* hadoop为了提高效率,自定义了一套序列化的框架
* 在hadoop程序当中,如果要进行序列化(写磁盘、网络传输数据),一定要用hadoop实现的序列化的数据类型
*
* Long ====》 LongWritable
* String ====》 Text
* Integer ====》 IntWritable
* Null ====》 NullWritable
*
*/
public class WordMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
/**
*
* @param key 就是偏移量
* @param value 一行文本
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、单词的切分
String[] words = value.toString().split(" ");
//2、计数一次,帮单词转换成<hello,1>这样的key value类型对外输出
for (String word : words) {
//3、写入到上下文中
context.write(new Text(word),new LongWritable(1));
}
}
}
其次看Reducer:WordReducer.java
/**
* Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* KEYIN:map阶段的一个输出的key
* VALUEIN:LongWritable类型的数字
* KEYOUT:最终的结果单词,Text类型
* VALUEOUT:最终的单词的次数,LongWritable类型
*/
public class WordReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
/**
*
* @param key map阶段的一个输出的key
* @param values hello <1,1>
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//1、定义一个统计的变量
long count = 0;
//2、迭代
for (LongWritable value : values) {
count += value.get();
}
//3、写入到上下文
context.write(key,new LongWritable(count));
}
}
再看Driver:JobMain.java
/**
* 主类:将map和reduce串接起来,并提供了运行入口
*/
public class JobMain {
/**
* 这个main方法是WordCount程序运行的入口,其中用一个Job类对象来管理程序运行的很多参数:
* 指定用哪个类作为Mapper阶段的业务逻辑类,指定哪个类作为Reducer阶段的业务逻辑类
* ......其他的各种需要的参数
* @param args
*/
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
//一、初始化一个Job对象
Job job = Job.getInstance(configuration, "wordcount");
//二、设置Job对象的相关信息 里面包含了8个小步骤
//1、设置输入的路径,让程序找到源文件的位置
job.setInputFormatClass(TextInputFormat.class);
//运行本地文件
//TextInputFormat.addInputPath(job,new Path("D://input/test2.txt"));
//在服务器运行运行
TextInputFormat.addInputPath(job,new Path("hdfs://bigdata02:9000/wordcount.txt"));
//2、设置Mapper类,并设置k2 v2
job.setMapperClass(WordMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 3、分区 4、排序 5、局部合并 6、分组 四个步骤,都是Shuffle阶段,使用默认的配置。
//7、设置Reducer类,并设置k3 v3的类型
job.setReducerClass(WordReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//8、设置输出的路径
job.setOutputValueClass(TextOutputFormat.class);
//运行本地文件
//TextOutputFormat.setOutputPath(job,new Path("D://word_out"));
//在服务器运行运行
TextOutputFormat.setOutputPath(job,new Path("hdfs://bigdata02:9000/word_out"));
//三、等待完成
boolean b = job.waitForCompletion(true);
System.out.println(b);
System.exit(b ? 0 : 1);
}
}
网友评论