美文网首页
6.824MapReduce

6.824MapReduce

作者: HannahLi_9f1c | 来源:发表于2020-04-30 12:44 被阅读0次

介绍

MapReduce的实验基于论文Mapreduce

以计数为例

  • 输入是M个文件
  • Input1 -> Map -> a,1 b,1
  • Input2 -> Map -> b,1
  • Input3 -> Map -> a,1 c,1
  •                 。
                  |   |   -> Reduce -> c,1
                  |   -----> Reduce -> b,2
                  ---------> Reduce -> a,2
    
  1. Map处理文件输入文件,每个文件处理后输出多个k,v对,k表示每个文件中的单词,v表示单词的数量。每个Map调用是一个task
  2. k,v对分散地保存在中间文件,以json格式保存
  3. Reduce读取json文件,对文件中单词的数量进行统计

MapReduce架构

image
Master
  • 保存worker信息
  • 选择worker调用map和reduce
  • 监控worker工作状态,如果不可用会将其移除。并选择新的worker进行处理
  • map处理完之后才会调度reduce。
  • 配置reduce的数量
Worker
  • 处理map或者reduce程序
  • 询问master所要处理的任务
  • 处理map之后写入gfs,处理reduce的worker读取gfs,结果输出到文件中

QA

限制性能
  1. 04年网络是限制因素,现在网络比CPU/磁盘更快
  2. 所有map完成之后才会进行reduce。reduce从mapworker获取中间结果,并且reduceworker结果写入gfs
均衡
  1. 有一些worker的完成可能会比较慢。所以task数量远大于worker。这样完成任务的worker可以分配新任务,并且不会有worker一直处理大的task而导致其他worker等待。
容错
  1. 失败的map,reduce可以重新运行
  2. 处理map的worker失败了那么master会分配给其他worker;reduce的worker失败了,如果已经写入到gfs就不用重新运行,否则重新运行。
相关框架
  1. Hoodop
  2. Spark

实现代码 link

master.go
  1. 数据结构,记录输入文件,map和reduce的数量,以及channel。用map记录正在运行的task已经运行的时间,如果超时的话master需要重新调度。已完成的map和reduce数量用来master处理完成后返回。
type Master struct {
    sync.Mutex
    cond *sync.Cond

    files          []string
    nMap           int
    nReduce        int
    finishedMap    int
    finishedReduce int

    mapTaskChan    chan int
    reduceTaskChan chan int

    runningMapTaskMap    map[int]int64
    runningReduceTaskMap map[int]int64
}
  1. MakeMaster初始化master结构。并调用server监听worker的请求。tick检测worker任务是否超时。
func MakeMaster(files []string, nReduce int) *Master {
    m := Master{}
    m.cond = sync.NewCond(&m)

    m.files = files
    m.nMap = len(files)
    m.nReduce = nReduce
    m.finishedMap = 0
    m.finishedReduce = 0

    m.mapTaskChan = make(chan int, m.nMap)
    m.reduceTaskChan = make(chan int, m.nReduce)

    m.runningMapTaskMap = make(map[int]int64)
    m.runningReduceTaskMap = make(map[int]int64)

    for i := 0; i < m.nMap; i++ {
        m.mapTaskChan <- i
    }
    for i := 0; i < m.nReduce; i++ {
        m.reduceTaskChan <- i
    }

    m.server()
    go m.tick()
    return &m
}
  1. GetTask远程调用接口,调用该接口时,worker会把已完成的任务放在远程调用的参数中,master进行已完成任务的统计。然后对未完成的任务进行调度,返回给worker。
func (m *Master) GetTask(args *AskForTaskArgs, reply *AskForTaskReply) error {
    m.Lock()
    defer m.Unlock()

    m.finishTask(args.CompleteTask)
    for {
        var result ScheduleResult
        reply.Task, result = m.scheduleTask()
        reply.Done = false
        switch result {
        case Success:
            return nil
        case Done:
            reply.Done = true
            return nil
        default: // NoAvailable
            m.cond.Wait()
        }
    }
}
worker.go
  1. 数据结构
type KeyValue struct {
    Key   string
    Value string
}
  1. Worker函数,mr中的work调用这个函数,先是对master发起rpc请求任务。判断是map任务还是reduce任务。然后调用相应函数。
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
    args := AskForTaskArgs{}
    args.CompleteTask.Phase = UndefinedPhase
    for {
        reply, ok := askForTask(args)
        if !ok || reply.Done {
            break
        }
        task := &reply.Task
        args.CompleteTask = *task

        switch task.Phase {
        case MapPhase:
            mapTask := task.MapTask
            Map(mapTask.FileName, mapTask.MapIndex, mapTask.ReduceNumber, mapf)
        case ReducePhase:
            reduceTask := task.ReduceTask
            Reduce(reduceTask.ReduceIndex, reduceTask.MapNumber, reducef)
        default:
            log.Fatalf("unknown task phase: %v", task.Phase)
        }
    }
}
rpc.go
  1. RPC参数,发送当前worker已经完成的Task
type AskForTaskArgs struct {
    CompleteTask Task
}
  1. Task,Phase描述当前任务类型。MapTask中存储文件名称、Map下标和Reduce数量。
type Task struct {
    Phase      JobPhase
    MapTask    MapTask
    ReduceTask ReduceTask
}
type MapTask struct {
    FileName     string
    MapIndex     int
    ReduceNumber int
}
type ReduceTask struct {
    ReduceIndex int
    MapNumber   int
}
  1. 调用结果
type AskForTaskReply struct {
    Task Task
    Done bool
}

相关文章

  • 6.824MapReduce

    介绍 MapReduce的实验基于论文Mapreduce 以计数为例 输入是M个文件 Input1 -> Map ...

网友评论

      本文标题:6.824MapReduce

      本文链接:https://www.haomeiwen.com/subject/qbjzwhtx.html