1.mapreduce编程指导思想
本文的核心是带领大家去了解的MapReduce的核心设计思想,以及最基础的编程方法,虽然该组件在实际的开发中不常用了,但是其分而治之的设计思想是十分值的学习和借鉴的。
- mapReduce编程模型的总结:
- MapReduce的开发一共有八个步骤其中map阶段分为2个步骤,shuffle阶段4个步骤,reduce阶段分为2个步骤
- 第一步:读取文件,解析成key,value对。
- 第二步:自定义Map逻辑,接收k1 v1转换行成新的k2
- 第三步:分区。相同key的数据发送到同一个reduce里边去
- 第四步:排序:对key2进行排序,默认是字典排序
- 第五步:规约,combiner过程,调优步骤,可选
- 第六步:分组
- 第七步:自定义reduce逻辑,接收k2 v2 转换成新的k3 v3 的输出
- 第八步:输出 k3 v3 进行保存
(1). Map阶段2个步骤
- 第一步:设置inputFormat类,将我们的数据切分成key,value对,输入到第二步
- 第二步:自定义map逻辑,处理我们第一步的输入数据,然后转换成新的key,value对进行输出
(2). shuffle阶段4个步骤
- 第三步:对输出的key,value对进行分区。(相同key的数据属于同一分区)
- 第四步:对不同分区的数据按照相同的key进行排序
- 第五步:对分组后的数据进行规约(combine操作),降低数据的网络拷贝(可选步骤)
- 第六步:对排序后的数据进行分组,分组的过程中,将相同key的value放到一个集合当中(每组数据调用一次reduce方法)
(3). reduce阶段2个步骤
- 第七步:对多个map的任务进行合并,排序,写reduce函数自己的逻辑,对输入的key,value对进行处理,转换成新的key,value对进行输出
- 第八步:设置outputformat将输出的key,value对数据进行保存到文件中
2.MapReduce 的输入和输出
MapReduce框架运转在**<key,value>** **键值对**上,也就是说,框架把作业的输入看成是一组<key,value>键值对,同样也产生一组<key,value>键值对作为作业的输出,这两组键值对可能是不同的。
一个MapReduce作业的输入和输出类型如下图所示:可以看出在整个标准的流程中,会有三组<key,value>键值对类型的存在。
MapReduce的输入输出.png
(1).hadoop当中常用的数据类型
- hadoop没有沿用java当中基本的数据类型,而是自己进行封装了一套数据类型,其自己封装的类型与java的类型对应如下
- 下表常用的数据类型对应的Hadoop数据序列化类型
Java类型 | Hadoop Writable类型 |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Int | IntWritable |
Float | FloatWritable |
Long | LongWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | ArrayWritable |
byte[] | BytesWritable |
3.MapReduce 的序列化
(1).概述
序列化(Serialization)是指把结构化对象转化为字节流。
反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。
当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。
Java的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系…),不便于在网络中高效传输;所以,hadoop自己开发了一套序列化机制(Writable),精简,高效。不用像java对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销。
Writable是Hadoop的序列化格式,hadoop定义了这样一个Writable接口。
一个类要支持可序列化只需实现这个接口即可。
public interface Writable {
void write(DataOutput out) throws IOException;
void readFields(DataInput in) throws IOException;
}
(2).Writable 序列化接口
如需要将自定义的bean放在key中传输,则还需要实现WritableComparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序,此时,自定义的bean实现的接口应该是:
public class FlowBean implements WritableComparable<FlowBean>
需要自己实现的方法是:
/**
* 序列化的方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upflow);
out.writeLong(dflow);
out.writeLong(sumflow);
}
/**
* 反序列化的方法,反序列化时,从流中读取到的各个字段的顺序应该与序列化时写出去的顺序保持一致
*/
@Override
public void readFields(DataInput in)throws IOException {
upflow = in.readLong();
dflow = in.readLong();
sumflow = in.readLong();
}
@Override
public int compareTo(FlowBean o) {
//实现按照sumflow的大小倒序排序
return sumflow>o.getSumflow()?-1:1;
}
compareTo方法用于将当前对象与方法的参数进行比较。
如果指定的数与参数相等返回0。
如果指定的数小于参数返回 -1。
如果指定的数大于参数返回 1。
例如: o1.compareTo(o2);
返回正数的话,当前对象(调用compareTo方法的对象o1)要排在比较对象(compareTo传参对象o2)后面,返回负数的话,放在前面。
// 代码示例
@Override
public int compareTo(Pojo o) {
// 先按年龄升序排序,再按身高降序排序
int i = this.age.compareTo(o.age);
if (i == 0){
// 当前判断中指定的就是当age相同的时候,再去比较high,然后对升序取反就是降序
i = -(this.high.compareTo(o.high));
}
return i;
}
4.Map task 数量以及切片机制
(1).MapTask个数与切片原理
- MapTask 切片 与 HDFS 分块 对比
- 在MapReduce当中,每个mapTask处理一个切片split的数据量,注意切片与block块的概念很像,但是block块是HDFS当中存储数据的单位,切片split是MapReduce当中每个MapTask处理数据量的单位。
- 数据块:Block是HDFS物理上把数据分成一块一块。
- 数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
- 查看FileInputFormat的源码,里面getSplits的方法便是获取所有的切片,其中有个方法便是获取切片大小:
- TextInputFormat : 其中的 isSplitable() 函数表示是否对数据进行切片,其返回值是布尔类型的值。
- TextInputFormat 继承自 FileInputFormat ,其中有一个函数 getSplits() ,表示获取当前的分片返回值,其函数的具体实现函数是 computeSplitSize() 。
-
切片大小的计算公式详解 -- 由以上计算公式可以推算出split切片的大小刚好与block块相等:
Math.max(minSize, Math.min(maxSize, blockSize));
- mapreduce.input.fileinputformat.split.minsize=1 默认值为1
- mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
- blockSize为128M
(2).MapTask个数判断示例
-
那么hdfs上面如果有以下两个文件,文件大小分别为300M和10M,那么会启动多少个MapTask???
1、输入文件两个
file1.txt 300M
file2.txt 10M
2、经过FileInputFormat的切片机制运算后,形成的切片信息如下:
file1.txt.split1-- 0~128
file1.txt.split2-- 128~256
file1.txt.split3-- 256~300
file2.txt.split1-- 0~10M
一共就会有四个切片,与我们block块的个数刚好相等
(3).如何控制mapTask的个数
如果有1000个小文件,每个小文件是1kb-100MB之间,那么我们启动1000个MapTask是否合适,该如何合理的控制MapTask的个数???
-
如果需要控制maptask的个数,我们只需要调整maxSize和minsize这两个值,那么切片的大小就会改变,切片大小改变之后,mapTask的个数就会改变
-
maxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值。
FileInputFormat.setMaxInputSplitSize(Job job, Integer size)
-
minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大。
FileInputFormat.setMinInputSplitSize(Job job, Integer size)
-
-
在实际情况中:
- 如果 MapTask 任务执行很慢,则可以适当的调小一点 splitSize 的大小。
- 调整切片的最大值或者最小值哈可以
网友评论