介绍
MapReduce的实验基于论文Mapreduce
以计数为例
- 输入是M个文件
- Input1 -> Map -> a,1 b,1
- Input2 -> Map -> b,1
- Input3 -> Map -> a,1 c,1
。 | | -> Reduce -> c,1 | -----> Reduce -> b,2 ---------> Reduce -> a,2
- Map处理文件输入文件,每个文件处理后输出多个k,v对,k表示每个文件中的单词,v表示单词的数量。每个Map调用是一个task
- k,v对分散地保存在中间文件,以json格式保存
- Reduce读取json文件,对文件中单词的数量进行统计
MapReduce架构
imageMaster
- 保存worker信息
- 选择worker调用map和reduce
- 监控worker工作状态,如果不可用会将其移除。并选择新的worker进行处理
- map处理完之后才会调度reduce。
- 配置reduce的数量
Worker
- 处理map或者reduce程序
- 询问master所要处理的任务
- 处理map之后写入gfs,处理reduce的worker读取gfs,结果输出到文件中
QA
限制性能
- 04年网络是限制因素,现在网络比CPU/磁盘更快
- 所有map完成之后才会进行reduce。reduce从mapworker获取中间结果,并且reduceworker结果写入gfs
均衡
- 有一些worker的完成可能会比较慢。所以task数量远大于worker。这样完成任务的worker可以分配新任务,并且不会有worker一直处理大的task而导致其他worker等待。
容错
- 失败的map,reduce可以重新运行
- 处理map的worker失败了那么master会分配给其他worker;reduce的worker失败了,如果已经写入到gfs就不用重新运行,否则重新运行。
相关框架
- Hoodop
- Spark
实现代码 link
master.go
- 数据结构,记录输入文件,map和reduce的数量,以及channel。用map记录正在运行的task已经运行的时间,如果超时的话master需要重新调度。已完成的map和reduce数量用来master处理完成后返回。
type Master struct {
sync.Mutex
cond *sync.Cond
files []string
nMap int
nReduce int
finishedMap int
finishedReduce int
mapTaskChan chan int
reduceTaskChan chan int
runningMapTaskMap map[int]int64
runningReduceTaskMap map[int]int64
}
- MakeMaster初始化master结构。并调用server监听worker的请求。tick检测worker任务是否超时。
func MakeMaster(files []string, nReduce int) *Master {
m := Master{}
m.cond = sync.NewCond(&m)
m.files = files
m.nMap = len(files)
m.nReduce = nReduce
m.finishedMap = 0
m.finishedReduce = 0
m.mapTaskChan = make(chan int, m.nMap)
m.reduceTaskChan = make(chan int, m.nReduce)
m.runningMapTaskMap = make(map[int]int64)
m.runningReduceTaskMap = make(map[int]int64)
for i := 0; i < m.nMap; i++ {
m.mapTaskChan <- i
}
for i := 0; i < m.nReduce; i++ {
m.reduceTaskChan <- i
}
m.server()
go m.tick()
return &m
}
- GetTask远程调用接口,调用该接口时,worker会把已完成的任务放在远程调用的参数中,master进行已完成任务的统计。然后对未完成的任务进行调度,返回给worker。
func (m *Master) GetTask(args *AskForTaskArgs, reply *AskForTaskReply) error {
m.Lock()
defer m.Unlock()
m.finishTask(args.CompleteTask)
for {
var result ScheduleResult
reply.Task, result = m.scheduleTask()
reply.Done = false
switch result {
case Success:
return nil
case Done:
reply.Done = true
return nil
default: // NoAvailable
m.cond.Wait()
}
}
}
worker.go
- 数据结构
type KeyValue struct {
Key string
Value string
}
- Worker函数,mr中的work调用这个函数,先是对master发起rpc请求任务。判断是map任务还是reduce任务。然后调用相应函数。
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
args := AskForTaskArgs{}
args.CompleteTask.Phase = UndefinedPhase
for {
reply, ok := askForTask(args)
if !ok || reply.Done {
break
}
task := &reply.Task
args.CompleteTask = *task
switch task.Phase {
case MapPhase:
mapTask := task.MapTask
Map(mapTask.FileName, mapTask.MapIndex, mapTask.ReduceNumber, mapf)
case ReducePhase:
reduceTask := task.ReduceTask
Reduce(reduceTask.ReduceIndex, reduceTask.MapNumber, reducef)
default:
log.Fatalf("unknown task phase: %v", task.Phase)
}
}
}
rpc.go
- RPC参数,发送当前worker已经完成的Task
type AskForTaskArgs struct {
CompleteTask Task
}
- Task,Phase描述当前任务类型。MapTask中存储文件名称、Map下标和Reduce数量。
type Task struct {
Phase JobPhase
MapTask MapTask
ReduceTask ReduceTask
}
type MapTask struct {
FileName string
MapIndex int
ReduceNumber int
}
type ReduceTask struct {
ReduceIndex int
MapNumber int
}
- 调用结果
type AskForTaskReply struct {
Task Task
Done bool
}
网友评论