MapReduce数据处理模型:
map和reduce函数的输入和输出时键值对。
MapReduce的类型
Hadoop的 MapReduce中,map函数和reduce函数遵循如下格式:
- map:(k1,v1) --->list(k2,v2)
- combiner:(k2,list(v2))---> lsit(k2,v2)
- reduce:(k2,list(v2)) --->list(k3,v3)
默认的MapReduce作业
1、默认mapper是Mapper类,将输入的键和值原封不动地写到输出中:
public class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>{
protected void map(KEYIN key,VALUEIN value,
Context context) throws IOException,InterruptedException{
context.write((KEYOUT)key,(VALUE)value);
}
}
Mapper是一个泛型类型(generic type),可以接受任何键或值的类型。map任务的数量等于输入文件被划分成的分块数,取决于输入文件的大小及文件块的大小。
2、默认的partitioner(分区索引)是HashPartitioner,它对每条记录的键进行哈希操作,以绝对该记录应该属于哪个分区:
public class HashPartitioner<K,V>extends Partitioner<K,V>{
public int getPartitioner(k key,V value,int numPartitions){
return (key.hashCode()& Integer.MAX_VALUE) % numPartitions;
}
}
键的哈希码被转换成一个非负整数,它由哈希值与最大的整型值做一次按位与操作而获得,然后用分区数,进行取模操作,以决定该记录属于哪个分区索引。
每个分区由一个reduce任务处理,所以分区数等于作业的reduce任务个数,默认情况下,只有一个reducer,即只有一个分区,该情况下想,所有数据都放在同一分区,partitioner操作变得无关紧要,如果有多个reduce任务,这是HashPartitioner的作用便体现出来。
3、默认reducer是Reducer类型,也是一个泛型类型,把所有输入写到输出中:
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>{
protected void reduce(KEYIN key,Iterable<VALUEIN> values,
Context context)throws IOException,InterruptedException{
for(VALUEIN value:values){
context.write((KEYOUT)key,(VALUEOUT)value);
}
}
}
输入格式
输入分片与记录
一个输入分片(split)是一个由单个map操作来处理的输入块,每个map操作只处理一个输入分片。每个分片被划分为若干条记录,每条记录就是一个键值对,map一个接一个地处理记录。
输入分片在包org.apache.hadoop.mapreduce下,用InputSplit接口表示
public abstract class InputSplit{
public abstract long getLength() throws IOException,InterruptedException;
public abstract String[] getLocations()throws IOException,InterruptedException;
}
InputSplit包含一个以字节为单位的长度和一组存储位置(一组主机名),分片并不包含数据本身,而是指向数据的引用,存储位置用于供MapReduce系统使用以便将map任务尽量放在分片数据附近,而分片大小用来排序分片,以便优先处理最大的分片,从而最小优化作业运行时间(贪心算法)。
MapReduce开发不用直接处理InputSplit,它由InputFormat创建的(在MapReduce驱动程序中,InputFormat负责创建输入分片并将它们分割成记录),InputFormat类定义如下:
public abstract class InputFormat<K,V>{
public abstract List<InputSplit> getSplits(JobContext context)
throws IOException,InterruptedException;
public abstract RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException,InterruptedException;
}
- 运行作业的客户端(驱动程序)通过getSplits()计算分片,然后将它们发送到application master
- application master使用其存储位置信息(InputSplit中的字段)来调度map任务,从而在集群上处理这些分片数据。
- map任务把输入分片传给InputFormat的createRecordReader()方法来获取这个分片的RecordReader。RecordReader就像记录上的迭代器,map任务用一个RecordReader来生成记录的键值对,然后再传递给map函数。
查看Mapper的run方法,如下:
public void run(Context context)throws IOException,InterruptedException{
setup(context);
while(context.nextKeyalue()){
map(context.getCurrentKey(),context.getCurrentValue(),context);
}
cleanup(context);
}
- 首先运行setup(),重复调用Context的nextKeyValue()为mapper产生键值对象。
- 通过Context,键值从RecordReader中被检索出并传递给map()方法
- 当reader读到stream的结尾时,nextKeyValue()方法返回false,map任务运行其cleanup()方法,然后结束。
Mapper的run方法是公共的 ,可以由用户定制。MultithreadedMapRunner是另一个MapRunnable接口的实现,可配置指定个数的线程来并发运行多个mapper。
FileInputFormat类
FileInputFormat是所有使用文件作为其数据源的InputFormat实现的基类,它提供两个功能:
- 用于指出作业的输入文件位置
- 为输入文件生成分片的代码实现。
FileInputFormat类的输入路径:
作业的输入被设定成一组路径,FileInputFormat提供4种静态方法来设定Job
public static void addInputPath(Job job,Path path);
public static void addInputPaths(Job job,String commaSeperatedPaths);
public static void setInputPaths(Job job,Path inputPaths);
public static void setInputPaths(Job job,String commaSeperatedPaths);
FileInputFormat类的输入分片:
FileInputFormat负责把大文件(超过HDFS块的大小)分割成小文件(HDFS块大小),分片大小在默认情况下:
max(minimumSize, min(maximumSize,blockSize))
避免切分:
两种实现方法:
- 增加最小分片大小,设置为最大文件大小,即long.MAX_VALUE
- 使用FileInputFormat具体子类,重写isSplitable()方法,把返回值设置为false,如下
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class NonSplittableTextInputFormat extends TextInputFormat{
@Override
protected boolean isSplitable(JobContext context,Path file){
return false;
}
}
文本输入
TextInputFormat是默认的InputFormat,每天记录时一行输入,键是LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行内容,不包括任何终止符,被打包成一个Text对象。如下:
//如下文本
On the top of the Crumpetty Tre
The Quangle Wangle sat,
But his face you could not see,
On accout of his Beaver Hat.
//每条记录值,键值对
(0,On the top of the Crumpetty Tre )
(33,The Quangle Wangle sat,)
(57,But his face you could not see,)
(89,On accout of his Beaver Hat.)
其他类:KeyValueTextInputFormat、NLineInputFormat
二进制输入
SequenceFileInputFormat类、SequenceFileAsTextInputFormat类、SequenceFileAsBinaryInputFormat类、FileLengthInputFormat类。
数据库输入/输出
在关系数据库与HDFS之间移动数据的方法是:使用Sqoop
其他方式:Hbase的TableInputFormat
输出格式
类图:
OutputFormat类图.png
参考资料:《Hadoop权威指南》
网友评论