MapReduce定义
MapReduce是一个分布式计算的框架,是用户开发机遇hadoop的数据分析应用的核心框架。
MapReduce的优缺点
- 优点
- 易于编程 只要实现一些简单的接口即可实现功能,且编写程序类似串行
- 良好的扩展性 支持扩展计算服务器的数量
- 高容错性 可以在价格低廉的机器上运行,即便集群中某些节点宕机,也可以正常使用
- 适合PB级离线计算
- 缺点
不擅长实时计算、流式计算、DAT计算
MapReduce的编程思想
MapReduce的编程思想MapReduce主要包括两个部分 Map阶段 + Reduce阶段,每个阶段中的输入输出都是key-value的形式存在
已文本词数统计为例,两个阶段的流程如下:
- Map阶段读取Hadoop分片的数据,按行读取自动进行一次map操作,得到输入key-value对应为 “偏移量-本行数据”。
偏移量实际是该行起始的数据长度索引,可以理解为行号,例如第一行偏移量为0,数据10byte,则第二行偏移量为11。 - Map阶段第二步执行我们实现的接口算法,并将结果的key-value(单词-每行的词频 如 java - 1 2 1 4 1)输出都磁盘上。整个Map阶段都是完全并行执行的。
- Reduce阶段读取Map的结果,执行实现的接口,对每个分片的结果进行初次的汇总
- Reduce阶段对每个分片的结果再次进行汇总成为一个最终结果
注:通常一个分片对应hadoop中存储的一个块,即128M,这也可以避免载入内存文件过大,撑爆内存
实现wordcount编码
Map类,继承Mapper,重写其map方法实现对每个单词的统计,范型是根据自己业务需要定义的类型
package com.irving.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* map执行类
* 四个范型是map输入和输出的类型
* @LongWritable 字符偏移量
* @Author yuanyc
* @Date 15:17 2019-07-11
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/**
* 重写map方法
* @Author yuanyc
* @Date 15:19 2019-07-11
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 每行起始的偏移量
System.out.println("-------");
System.out.println("偏移量" + key.get());
// 按行读取的数据
String line = value.toString();
// 根据空格切分str
String[] arr = line.split(" ");
// 对字符传标记1
for (String str : arr) {
context.write(new Text(str), new IntWritable(1));
}
}
}
Reduce类,继承Reducer类,重写reduce方法,实现对map结果的汇总
package com.irving.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* reduce类
* @Author yuanyc
* @Date 11:17 2019-07-14
*/
public class WordCountRecuder extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// values 形如 1, 2,2,1,1
// 对词频进行统计
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
编写启动类,执行mapreduce算法
注:输入输出路径应当为hdfs的目录,但是本地调试阶段可以使用linux文件系统目录
package com.irving.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 启动类
* @Author yuanyc
* @Date 15:39 2019-07-11
*/
public class WordCountMain {
public static void main(String[] args) {
Configuration configuration = new Configuration();
// args = new String[]{"/Users/yuanyc/Documents/workspace/hdfs/test.txt", "/Users/yuanyc/Documents/workspace/hdfs/out"};
try {
// 创建job
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountMain.class);
// 指定map类
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定reduce
job.setReducerClass(WordCountRecuder.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交任务
job.waitForCompletion(true);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
}
测试数据
测试数据
本地执行结果
分片为1
执行结果
注:输出目录不能重复存在,要重新执行时需要删除现有目录
Hadoop的序列化
通过上面代码可以看出,MR在编码过程中使用的输入输出对象类型都是不是自定义的类型。
使用的这些类型是Hadoop定义的基础类型,由于mapreduce过程中伴随大量的IO操作,因此需要针对序列化进行性能优化。
Java常用类型与Hadoop序列化类型的对照表
JDK的类型 | Hadoop序列化类型 |
---|---|
int | IntWritable |
long | LongWritable |
float | FloatWritable |
double | DoubleWritable |
byte | ByteWritable |
boolean | BooleanWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
自定义Java对象的序列化
自定义的Bean需要实现writable接口,重写序列化和反序列化的方法
package com.irving.wordcount;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 自定义bean的hadoop序列化
* @Author yuanyc
* @Date 12:37 2019-07-14
*/
public class BeanWritable implements Writable, Comparable {
private String name;
private int age;
/**
* 序列化方法
* @Author yuanyc
* @Date 12:53 2019-07-14
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeChars(name);
dataOutput.writeInt(age);
}
/**
* 反序列化,顺序要与序列化一致
* @Author yuanyc
* @Date 12:53 2019-07-14
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
name = dataInput.readUTF();
age = dataInput.readInt();
}
/**
* 自定义对象用key时,需要重写compareto方法,用于shuffle阶段的排序
* @Author yuanyc
* @Date 12:52 2019-07-14
*/
@Override
public int compareTo(Object o) {
return 0;
}
}
网友评论