1. 背景知识
-
起源: Google最早用MapReduce做搜索引擎中的倒排索引,简单来说是: 统计每个词条与文档对应的映射关系
-
数据块与数据分片: block是HDFS上的文件块,split是文件的分片(逻辑划分,不包含具体数据,只包含这些数据的位置信息)
- a. 一个split包含一个或多个block,默认是一对一的关系
- b. 一个split不包含两个文件的block, 不会不会跨文件进行划分
- c. 一个Map处理一个Split, Map数量由Split决定(HDFS数据源)
- d. 数据分片尽量保证Map端的负载均衡
2. MapReduce数据处理流程(以文本格式文件为例)
1) Map阶段
- a. 首先将split中的数据以K/V(key/value)的形式读取出来
- b. 然后将这些数据交给用户自定义的Map函数进行处理
- c. Map处理完数据后,同样将结果以K/V的形式交给MR的计算框架
- d. MR计算框架会将不同的数据划分成不同的partition,不同Map计算的相同partition数据会分到同一个reduce节点上面进行处理,也就是说一类partition对应一个reduce
2) Map阶段特点:
- a. 一个Map处理一个split
- b. Map默认使用Hash算法对输出的key值进行Hash计算,这样保证了相同key值的数据能够划分到相同的partition中,同时也保证了不同的partition之间的数据量时大致相当
- c. Map的数量是由split的数量决定的
3) Reduce阶段:
- a. Map处理完后,reduce处理程序在各个Map节点将属于自己的数据拷贝到自己的内存缓冲区中,也就是说: 相同partition的数据被拷贝至同一个reduce中
- b. 然后将这些数据合并成一个大的数据集,并且按照key值进行聚合,把聚合后的value值以迭代器的方式作为reduce的输入value
- c. 用户使用自定义的reduce函数处理完迭代器中的数据后,一般把结果以K/V的格式存储到HDFS上的文件中
4) Reduce阶段特点:
- a. Reduce的数量是由partition的数量决定的
- b. 一个Reduce处理相同Key的所有value值(Iterator迭代器形式)
3. Shuffle过程
在上面介绍的MR基本过程中,还存在一个shuffle过程,发生于Map和Reduce之间
1) Map中的shuffle
- a. Collec阶段将数据放在环形缓冲区,唤醒缓冲区分为数据区和索引区
- b. sort阶段对在统一partition内的索引按照key值排序
- c. spill(溢写)阶段根据排好序的索引将数据按顺序写到小文件中
- d. Merge阶段将Spill生成的小文件分批合并排序成一个大文件
2) Reduce中的shuffle
- a. Copy阶段将Map段的数据分批拷贝到Reduce的缓冲区
- b. Spill阶段将内存缓冲区的数据按照顺序写到小文件中
- c. Merge阶段将溢出文件合并成一个排好序的数据集
4. 案例代码****(文本单词统计)
1) Map与Reduce
// 输入k1,输入v1,输出k2,输出v2
static class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] data = value.toString().split(" ");
for (int i = 0; i < data.length; i++) {
context.write(new Text(data[i]), new IntWritable(1));
// 输出k2:单词,输出v2:计数1次
}
}
}
// 输入k3,输入v3,输出k4,输出v4
static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable v : values) {
sum += v.get();
}
context.write(key, new IntWritable(sum));
// 输出k4:单词,输出v4:单词总数量
}
}
2) 主程序
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.*getInstance*(new Configuration());
// 指定主程序类
job.setJarByClass(WordCountMain.class);
// 指定Map类及输出key/value类型
job.setMapperClass(WordCountMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定Reduce类及输出key/value类型
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定程序输入输出路径(注意输出路径不能已存在)
FileInputFormat.*addInputPath*(job, new Path(args[0]));
FileOutputFormat.*setOutputPath*(job, new Path(args[1]));
// 程序执行并打印Log
System.*exit*(job.waitForCompletion(true) ? 0 : 1);
}
3) 自定义数据类型
- a. 实现Writable接口
- b. 对字段进行序列化与反序列化
public class Employee implements Writable {
private String name;
private int age;
private int sal;
// 序列化
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.name);
dataOutput.writeInt(this.age);
dataOutput.writeInt(this.sal);
}
// 反序列化
public void readFields(DataInput dataInput) throws IOException {
this.name = dataInput.readUTF();
this.age = dataInput.readInt();
this.sal = dataInput.readInt();
}
4) 自定义Partitioner类
- a. 继承Partitioner抽象类, 重写getPartition方法
- b. 可以保证Reduce端的负载均衡
public class MyPartitioner extends Partitioner<IntWritable, Employee> {
public int getPartition(IntWritable sal, Employee emp, int numReduceTasks) {
if (sal.get() <= 3333) {
return 1 % numReduceTasks;
} else if (sal.get() <= 6666) {
return 2 % numReduceTasks;
} else {
return 3 % numReduceTasks;
}
// 默认的HashPartitioner实现:
// return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(3);
5) 自定义Combiner类
- a. 不是所有情况都可以使用Combiner
- b. 在Map端进行的特殊ReduceTask, 一般情况下, 与Reduce端代码一样
- c. Map输出与Combiner输入保持一致,Combiner的输出与Reduce的输入保持一致(执行顺序:MapàCombineràReduce)
- d. 减少Map与Reduce之间的磁盘IO
// 与WordCountReducer一样的代码
public class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable v : values) {
sum += v.get();
}
context.write(key, new IntWritable(sum));
}
}
job.setCombinerClass(MyCombiner.class);
6) 读写ORC格式文件
// 特定的Orc数据类型
// 读ORC文件
public static class ORCMapper extends Mapper<NullWritable, OrcStruct, Text, Text> {
public void map(NullWritable key, OrcStruct value, Context output) throws IOException, InterruptedException {
output.write((Text) value.getFieldValue(0),
(Text) value.getFieldValue(1));
// 获取OrcStruct中的第0和1个元素
}
// 写ORC文件
public static class ORCReducer extends Reducer<Text, Text, NullWritable, OrcStruct> {
// 定义OrcStruct的类型描述
private TypeDescription **schema** = TypeDescription.fromString("struct<name:string,mobile:string>");
// 创建OrcStruct对象
private OrcStruct **pair** = (OrcStruct) OrcStruct.createValue(**schema**);
public void reduce(Text key, Iterable<Text> values, Context output) throws IOException, InterruptedException {
for (Text val : values) {
pair.setFieldValue(0, key);// 将数值塞到OrcStruct中
pair.setFieldValue(1, val);
output.write(NullWritable.get(), pair);
}
}
}
// Map类型设置
job.setInputFormatClass(OrcInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// Reduce类型设置
job.setOutputFormatClass(OrcOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(OrcStruct.class);
推荐博客:
大数据新手博客
网友评论