运行时系统保存输入数据分区的细节。调度程序在集群的不同机器上执行,对失败任务进行处理,管理集群内机器之间的通信。
关键点在于如何并行计算,分发数据,处理任务失败。
编程模型
map(k1,v1) -> list(k2,v2)
reduce(k2,list(v2)) -> (v2)
例子
-
分布式grep,一行如果匹配格式串,map会输出,reduce是一个横等函数(原样进原样出)
-
URL访问统计。(wordcount)
-
反向链接,找到网页上能跳转到这个目标的所有链接。(source,target) -> (target,list(source))
-
词频向量
实现
3.0 谷歌使用的集群环境
- 双核x86,2-4GB内存
- 商用网络硬件。一般是100m/s或者1g/s
- 集群由数百或者数千的机器组成,很容易出现机器down掉的情况(machine failure)
- 存储由本地节点IDE硬盘提供,需要一个分布式文件系统来统一管理存储在这些磁盘上的数据。文件系统使用副本来提供可用性和可靠性。
- 用户提交任务到一个调度系统上面,每个job由很多阶段的task构成,之后由调度器来分布到集群上的可用节点。
3.1 执行流程
- map调用是在集群中分布式进行的,输入数据自动分片(partition)成M块(split)。每个输入块可以被不同机器并行处理。
- reduce使用一个partition函数将map输出的中间阶段的key分区成R片(用户提供R和partition函数)
流程
-
mapreduce类库首先将输入文件分成M块(一般16M或者64M一块,使用用户参数进行调整)
之后会在集群上启动程序的副本(提交的jar包会被发送到分布式文件系统中,集群中的其他节点会下载代码并在本机运行) -
其中程序副本中的一个作为master,其他作为worker,worker会被master分派工作。
一共有M个map和R个reduce任务来分配。master会根据worker的空闲情况来分配一个map或者reduce任务。 -
分配到map任务的worker读取相应输入分块(input split)的内容,解析成(k1,v1)对,发给用户自定义的map函数中,
map函数产生的中间阶段(k2,v2)会缓存在内存中。 -
缓存在内存中的(k2,v2)会被周期性的写入到本地磁盘中,会被partition函数分区成R个分区,
保存缓存键值对的文件在本地磁盘上的位置会被发送会master节点。
master节点随后会将这些路径发送给reduce worker -
当一个reduce worker被master节点告知缓存键值对的位置时,
worker节点会使用RPC来读取保存在map节点本地磁盘上输出文件的内容。 -
当一个reduce worker读取了所有的中间数据,reduce worker会按照中间阶段的key(k2)进行排序,
这样所有相同的k就会被放在一起(作为reduce的输入(k2,list(v2)),
排序是必须的因为有时候有很多不同的k2分配到了相同的reduce任务中如果中间阶段的数据太多的话以至于不能在内存中排序的话,需要进行外部排序。 -
reduce worker在已经排序了的中间阶段的数据上运行reduce函数,相同key和对应的值会被传递给reduce函数(reduce(k2,list(v2)),
reduce 函数的输出会被追加到一个输出结果文件作为这个分区(reduce partition)的结果 -
当所有的map task和所有的reduce task都完成的时候,master会唤醒用户程序,这时候就会执行接下来用户的其他代码。
-
成功执行最后的输出将是R个文件,(一个reduce对应一个文件,由partition函数分成R个分区)。
一般来说用户不需要将这些文件组合在一起,这些文件同常会被传递给其他的分布式应用作为新的输出。
3.2 master上保存的数据
-
对于每个map和reduce任务 ,master会保存状态(idle,正在运行,完成)。master还会保存worker机器的地址(ip)
-
master是在map和reduce之间的传递中间键值对文件信息的媒介,所以每个完成的map任务,master由map任务产生的R个文件的位置和大小信息。这些信息在map任务完成之后就会被发送给master,而且会被增量的推送给正在运行reduce任务的worker节点。
3.3 容错机制
1.worker挂了
master会定期给worker发心跳信息,一段时间内没有回复的话,master就会标记节点为不可用状态。
-
map worker:worker做完的map task会被重新设置为idle状态,方便调度器调度。
map worker挂掉了的话,保存在这个worker节点上的中间状态文件也不可用,这时候需要调度器重新运行这个map任务得到运行结果 -
reduce worker:已经完成的reduce任务不需要重新执行,因为reduce worker完成任务的输出已经保存在分布式文件系统上了。
如果一个map任务先被A执行(之后挂了)然后又被B执行,所有运行reduce任务的worker都会知道这个任务被重新运行了,之前没有从A读取数据的reduce task会从B读取数据。
2.master挂了
可以让master周期性的记录一些检查点来记录之前所说需要保存的数据,如果master挂了就启动另外一个任务复制这些检查点数据,恢复任务状态。
如果只有一个master,那么这个master是不允许失败的。当前的实现是让用户决定是否重新执行mapreduce任务
3.出现错误(Failure)时的语义保证
当用户提供的map和reduce任务是确定性的函数(纯函数)的时候,分布式实现和正常情况下执行结果是一致的。
我们依赖map和reduce任务的原子提交(atomic commits)来达到这个语义保证,每个正在运行的任务都会将其结果写入临时的私有文件中。
reduce任务会产生一个这样的文件,map任务会产生R个这样的文件。
-
当一个map任务完成,worker发送一条信息给master,里面包含R个临时文件的名字。master会记录这R个文件的名字。
如果master收到一个已经完成的map任务的完成信息。master会忽略这条信息。 -
当一个reduce任务完成,worker原子性的重命名他的临时输出文件为最后的输出结果。如果相同的reduce任务在多个机器上运行,他们会同时调用重命名操作。这时会依赖分布式文件系统上的原子重命名操作来保证保存着最后的结果。
多数写出的map和reduce操作都是确定性的,如果两个操作都是不确定的,假设map任务M和reduce任务 R1 R2。 e(Ri)表示第i个任务的执行完毕并提交。
弱语义产生是因为e(R1)可能读取一个M执行产生的结果。而e(R2)可能读取的是M的另外一次执行产生的结果。
3.4 数据本地性
网络带宽一般非常稀有。我们利用了输入数据实际上是保存在集群节点的本地磁盘的特性。GFS将每个文件分成64M大小的块,而且会存储每个块的副本在不同的机器上。
master会考虑输入文件信息的位置,并尝试调度map任务到包含对应输入文件副本的机器上去。如果不满足条件则尝试调度任务到离数据保存位置相对较近的位置上。
如果在集群中运行一个使用较大比例集群节点的mapreduce任务时,可以发现大部分任务都是从本地读取输入数据的。
3.5 任务粒度
map任务被分成M块,而reduce任务被分成R块。理想化来讲一般M和R都要远大于集群中的worker节点数目。让每个节点执行不同种类的任务可以动态的达到负载均衡,也会减少因为worker down掉导致的错误(failuer)的恢复速度。
参数边界:master需要做O(M+R)次调度决定,并在内存中保存O(R*M)种状态
一般来说我们会调整M来让每个单独的任务的输入文件正好是分布式文件系统的块大小,而且将R设置成想要使用的worker节点的倍数。
3.6 备份任务
一个导致任务执行时间较长的问题,是整个任务要等待那些运行时间较长的任务完成。我们使用了这个方案来缓解这种现象。
当一个mapreduce任务接近完成时,master调度一个备份执行,来执行那些正在进行的任务。无论哪个任务先完成(原来的任务和备份执行的任务)任务都会被标记为完成。虽然这样会增加资源的消耗,但是我们发现这大大减低了完成较大规模mapreduce任务完成的时间。
网友评论