本文参考自网上的各种MapReduce讲解的博客文章,其中很大一部分来自【hadoop 学习笔记:mapreduce框架详解】,里边包括词频统计的JAVA源码和较详细的解释。代码部分主要来自另一篇博客,但是忘了记录,现在很难找到了。本文侧重理解MapReduce的框架原理和流程。其中很多东西仍然没有弄明白,暂且放着,后期深入了解之后再更新。
MapReduce是什么
MapReduce是什么,知乎上已经有很多大神给出了各种适合小白们阅读的解释:关于MapReduce的理解?
简单来说,MapReduce是一种适用于大数据处理的编程模型,可以有效利用分布式环境中的系统资源,高效的并行处理基于大量数据的复杂任务。
MapReduce的过程
MapReduce采用的是Master-Worker的设计模式。这个模式在我们生活中和工作中非常常见。例如工地的包工头就是Master,负责工作的分配和人员调度、进度跟踪等;工人们就是Worker,负责执行具体的工作任务,并上报工作进度和状态给包工头。在分布式系统架构中,这种模式也很常见,例如HANA的Scale out架构中,有一个Master Node,还有很多个Slave Node(Worker Node),一般还会有一到多个Stand-by Node,在其他Slave Node宕机的时候,接替他们继续工作,类似于球场上的预备球员。
MapReduce在整个计算过程中,采用的也是这种工作模式。在MapReduce运行体制中,有4个参与主体:
- 客户端Client:编写MapReduce程序,配置任务,提交任务。这些都是由程序员完成的;
- JobTracker:充当Master的角色,负责接收任务,初始化任务,分配任务,和TaskTracker通信,协调整个任务的执行;
- TaskTracker:充当Worker的角色,和JobTracker保持通信,执行被分配到的子任务(Map或者Reduce);
- HDFS:分布式文件系统,负责保存任务的数据、配置信息、Map产生的中间结果和Reduce最后产生的结果等。
一个完整的MapReduce,需要经过哪些步骤呢?
- 程序员在客户端写好MapReduce的程序,配置好MapReduce的作业,然后提交给JobTracker;
- JobTracker分配ID给新提交的作业。然后:
2.1 检查配置的作业中,指定的最终结果输出目录是否存在。如果存在,则抛出错误给客户端。这一步检查是因为,在大数据环境下,一般作业运行所需要的数据量都非常大,一个作业执行完成的时间也很久,耗费资源也很多,时间成本和资源成本都很高,不允许作业执行过程中出任何差错;而如果指定的输出目录已经存在,它无法判断是覆盖目录中的文件内容,还是在原有文件内容基础上追加。不管追加还是覆盖,都有可能导致最终结果存在问题。因此它不允许一个作业在执行之前,指定的输出目录就已经存在,它对任何可能导致作业出差错的情况是零容忍的。
2.2 检查输入目录是否存在,如果不存在,同样抛出错误给客户端。 - 对输入数据进行数据分片(Split),然后将分片数据拷贝到集群的其他节点上;
整个过程需要经过以下六个步骤:
步骤 | 描述 |
---|---|
1. 输入(Input) | 将输入数据分片,分成一个个Split,并将Split进一步拆解为<Key, Value> 键值对 |
2. 映射(Map) | 根据输入的<Key, Value>键值对,进行函数映射 |
3. 合并(Combiner) | 将Key值相同的键值对合并(但并不是所有计算都可以做Combine操作) |
4. 分区(Partition) | 将键值对分成N份,送到下一环节 |
5. 化简(Reduce) | 将中间结果合并,得到最终结果 |
6. 输出(Output) | 负责输出最终结果 |
Hadoop是用JAVA写的,但是MapReduce却不一定需要用JAVA写,Python,C++等语言都可以。不过下面仍然以JAVA为例,简要说明六个步骤的用法。
输入(Input)
在Map计算之前,会根据输入文件计算输入分片(input split),并将文件按行分割成<Key, Value>键值对。每个分片将对应一个Map任务。
分片和HDFS的块(Block)之间的关系:
假定块大小为64MB,如果输入文件大小分别为:10MB,65MB,那么MapReduce会把第一个文件分为一个输入分片,第二个文件分为两个输入分片。
MapReduce会对输入的文件进行格式规范检查,如输入路径,后缀等的检查;然后再对输入的数据文件进行分块(Split);最后转换成map所需要的<Key, Value>键值对。
系统中提供了丰富的预置输入格式,主要有两种:
-
TextInputFormat:系统默认的数据输入格式。把文件分块(Split),并逐行读入,每一行记录形成一对<Key, Value>;其中,Key值是当前行在整个文件中的偏移量,Value值是这一行的文本内容。
-
KeyValueTextInputFormat:另一种常用的数据输入格式。要求读入的数据文件内容格式为<Key, Value>形式,读出的结果直接形成<Key, Value>送入map函数。
选择输入格式的方式
job.setInputFormatClass(<input_format>.class);
如果没有设定,系统默认选择TextInputFormat。输入格式也可以自定义,通过继承FileInputFormat或者InputFormat并改写其中函数的方式实现。
在格式检查之后,输入文件会被分成M份(M由用户定义),每一份通常为16MB到64MB。然后这些被拆分的文件会被复制到集群中的多个Worker节点上。
以词频统计为例,输入为:
Hello World
Hello Hadoop
Hello MapReduce
则进行Split分割后的键值对为:
<0, Hello World>
<12, Hello Hadoop>
<25, Hello MapReduce>
映射(Map)
Map函数一般由程序员开发定义,根据输入分片,将<Key, Value>键值对解析,然后用Map函数处理。处理后的<Key, Value>结果会缓存在内存中。
job.setMapperClass(<self-design>.class)
如果不指定map,即没有setMapperClass,则系统自动指定一个null,即把输入的<Key, Value>不做任何处理,直接送到下一环节。
合并(Combiner)
一般MapReduce处理的都是海量数据,因此Map的结果不可能全部放到内存,要写入磁盘。Map输出的时候,会在内存中开启一个环形缓冲区,专门用来输出,这个缓冲区的默认大小是100MB,并且在配置文件里,给这个缓冲区设置了一个默认阈值0.8(缓冲区大小和阈值在配置文件里都可以更改)。同时,Map会给输出操作启动一个守护线程,如果缓冲区的数据量达到了阈值80%,守护线程就会把缓冲区内容写到磁盘上,这个过程叫做Spill。另外20%的内存可以继续写入,写入内存和写到磁盘的操作是互不干扰的。如果缓冲区被写满,Map会阻塞新的数据写入,让写入磁盘的操作先完成之后,再继续写入。
写入磁盘前,会有排序的操作,如果定义了合并(Combiner),那么Combiner的操作,就是在排序之前进行的。
Combiner操作是可选的,它是对Map输出的结果中,具有相同Key值的记录做合并。合并的主要目的,就是减少Map阶段输出的中间结果的数量,降低数据的网络传输开销。
Combine的过程,和Reduce的过程很相似,只是Combine是在Map之后执行,是一种本地化的Reduce操作。所以对Combiner的自定义,可以继承Reducer(extends Reducer)。需要注意的是,并不是所有的计算都可以做Combine操作,例如,如果计算是求最大值、最小值,或者计算总数,则可以使用Combine操作,但如果是计算平均值,使用Combine操作,最终结果就会出错。
job.setCombinerClass(<combine-function>.class);
分区(Partition)
上面讲到了写入磁盘的过程叫Spill,即溢出操作,所以每个Spill操作都会相应的写一个溢出文件。Map的所有输出做完之后,会对这些溢出文件做合并,这个过程中就涉及到了Partition操作。Partition对Reduce而言,类似于Split对Map,一个Partition对应一个Reduce作业。Partition决定了Map节点的输出将被分区到哪个Reduce节点。
默认的Partitioner是HashPartitioner,它根据键值对的Key值进行Hash操作,获得一个非负整数的Hash码,然后对当前作业的Reduce节点数取模,有N个节点的话,就会被依次平均分配到N个节点上。我们也可以自定义取模函数,把数据按照自己的需求分配到不同的Reduce节点上。
化简(Reducer)
同合并Combiner的操作方法类似,把Map送来的结果根据键值进行化简,把Value相加,输出结果。
job.setReducerClass(<self-design-reducer>.class);
也可以不设置Reducer,系统将把Map送来的结果直接输出。
输出(Output)
Hadoop同样提供了丰富的数据输出格式规范,常用的是TextOutputFormat,也是系统默认的数据输出格式,将结果以“Key + value”的形式输出到文本文件中。
Job任务的创建
任务创建就是新建一个实例。
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setInputFormatClass(MyInputFormat.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setPartitionerClass(MyPartitioner.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
Job任务的执行
单个任务的执行很简单,但如果有多个任务,并且任务之间存在依赖关系,如下图所示,则需要定义这些任务间的关系。
任务依赖Configuration conf = new Configuration();
Job job1 = new Job(conf, "job1");
//.. config Job1
Job job2 = new Job(conf, "job2");
//.. config Job2
Job job3 = new Job(conf, "job3");
//.. config Job3
Job job4 = new Job(conf, "job4");
//.. config Job4
//添加依赖关系
job2.addDependingJob(job1);
job3.addDependingJob(job1);
job4.addDependingJob(job2);
job4.addDependingJob(job3);
JobControl jc = new JobControl("jbo name");
jc.addJob(job1);
jc.addJob(job2);
jc.addJob(job3);
jc.addJob(job4);
jc.run();
网友评论