在上一篇《大数据(4):MapReduce 简介》中简单说了一些 MapReduce 的组成及其工作原理。这篇文章将从编程方面来看看其具体使用。
一、单词统计
单词统计在 MapReduce 中相当于 Hello World。目的是从一个文件中,统计每个单词出现的次数。例如有一个文件有如下内容:
hello world tom
tom hello world
how are you tom
tom how old are you
统计该文件每个单词出现了多少次。
1、导入 maven 依赖。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
</dependency>
2、Map 端编程
对每一行用空格切割,把切割后的单词按照 (word, 1)
的形式发给 Reduce。
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
for (String str : value.toString().split(" ")) {
context.write(new Text(str), new IntWritable(1));
}
}
}
3、Reduce 端编程
reduce 中已经按 key 聚合了(shuffle 阶段按 key 聚合),因此遍历 values 并统计,就能知道每个单词的个数。
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable i : values) {
count += i.get();
}
context.write(key, new IntWritable(count));
}
}
4、主函数类
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJobName("WCApp");
job.setJarByClass(WordCount.class);
job.setInputFormatClass(TextInputFormat.class);
// 文件输入/输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// combiner
job.setCombinerClass(WCReducer.class);
// map 设置
job.setMapperClass(WCMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// reduce 设置
job.setNumReduceTasks(1);
job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.waitForCompletion(true);
}
}
二、年份气温统计
已知一些年份每月的气温,需要统计这些年份的最高气温,例如:
1900 30°C
1900 34°C
1900 35°C
1900 35°C
...
1901 32°C
1901 34°C
1901 35°C
1901 36°C
1901 28°C
输出:(1900, 35), (1901, 36)
这里使用二次排序的方法。为了实现这一目标,我们将 key 变为复合:年份和温度的组合。然后把 key 的排序顺序是按年份递增,然后按温度递减:
1900 35°C
1900 35°C
1900 34°C
...
1901 36°C
1901 35°C
这样只需要取出每一年的第一条记录就是该年的最高气温。
为实现这样的 Key,我们自定义一个 ComboKey,将年份和气温组合起来。然后实现 WritableComparable 接口,并重写 compareTo 方法。
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class ComboKey implements WritableComparable<ComboKey> {
private int year;
private int temp;
public int compareTo(ComboKey o) {
int y0 = o.getYear();
int t0 = o.getTemp();
// 年份相同(升序)
if (this.year == y0) {
//气温降序
return -(temp - t0);
} else {
return year - y0;
}
}
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(year);
dataOutput.writeInt(temp);
}
public void readFields(DataInput dataInput) throws IOException {
year = dataInput.readInt();
temp = dataInput.readInt();
}
}
如果只是使用 ComboKey,是还不够,因为同一年的记录将具有不同的 key,因此通常不会进到同一个 reducer。例如,key1 = (1900, 35°C) 和 key2 = (1900, 34°C) 会进入不同的 reducer。
因此需要自定义分区器,设置为按 key 的年份分区,这样可以保证同一年的记录进入同一个 reducer。
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class YearPartitioner extends Partitioner<ComboKey, NullWritable> {
public int getPartition(ComboKey key, NullWritable nullWritable, int numPartitions) {
return key.getYear() % numPartitions;
}
}
然而,这仍然不足以实现我们的目标。分区器只确保一个 reducer 接收一年内的所有记录;它不会更改 reduce 在分区内按键分组:
解决办法是自定义分组。如果按 key 的年份对 reducer 中的值进行分组,同一年的所有记录将在一个 reduce 组中。因为它们是按温度降序排列的,所以第一个是最高温度:
年份分组的代码如下:
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class YearGroupComparator extends WritableComparator {
protected YearGroupComparator() {
super(ComboKey.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
ComboKey k1 = (ComboKey) a;
ComboKey k2 = (ComboKey) b;
return k1.getYear() - k2.getYear();
}
}
自定义 ComboKey 的比较方法,即按照年份升序,气温降序。
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class ComboKeyComparator extends WritableComparator {
protected ComboKeyComparator() {
super(ComboKey.class, true);
}
public int compare(WritableComparable a, WritableComparable b) {
ComboKey k1 = (ComboKey) a;
ComboKey k2 = (ComboKey) b;
return k1.compareTo(k2);
}
}
Mapper 端代码实现,将输入按空格分隔,组合 key 的第 0 个值是年份,第 1 个值是气温,value 为空(NullWritable)。
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Arrays;
public class MaxTempMapper extends Mapper<LongWritable, Text, ComboKey, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
context.write(new ComboKey(Integer.parseInt(arr[0]), Integer.parseInt(arr[1])), NullWritable.get());
}
}
Reducer 中直接取第一个结果即可。
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class MaxTempReducer extends Reducer<ComboKey, NullWritable, IntWritable, IntWritable> {
@Override
protected void reduce(ComboKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(new IntWritable(key.getYear()), new IntWritable(key.getTemp()));
}
}
main 函数中,设置输入/输出格式,配置 maper 和 reducer 的参数等。
package com.hezongjiang.maxtemp;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTempApp {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJobName("MaxTempApp");
job.setJarByClass(MaxTempApp.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTempMapper.class);
job.setMapOutputKeyClass(ComboKey.class);
job.setMapOutputValueClass(NullWritable.class);
// 分区
job.setPartitionerClass(YearPartitioner.class);
// 分组
job.setGroupingComparatorClass(YearGroupComparator.class);
// 设置排序对比器
job.setSortComparatorClass(ComboKeyComparator.class);
job.setReducerClass(MaxTempReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(3);
job.waitForCompletion(true);
}
}
网友评论