美文网首页
MIT-6.824 Lab1: MapReduce-2018

MIT-6.824 Lab1: MapReduce-2018

作者: yddeng | 来源:发表于2019-10-31 17:55 被阅读0次

    MapReduce是由JeffreyDean提出的一种处理大数据的编程模型,作为在Go中编程和构建容错分布式系统的入门。
    集群中有一个master,其它的都是worker,总共有M个map任务和R个reduce任务(M和R由用户指定)。master负责将map和reduce任务分配给空闲的worker并处理worker的故障。

    Part I: Map/Reduce input and output

    分别实现 common_map.go、common_reduce.go 中的 doMap()、doReduce() 方法。

    1.读取一个输入文件inFile。调用用户定义函数mapF,将内容转换为键值对。
    2.新建nReduce工作数目相等的中间文件。使用reduceName(jobName, mapTask, r)生成的中间文件名。
    3.根据key-value的分配规则(ihash(key) % nReduce),将键值对存入新建的中间文件内。

    func doMap(
        jobName string, // the name of the MapReduce job
        mapTask int, // which map task this is
        inFile string,
        nReduce int, // the number of reduce task that will be run ("R" in the paper)
        mapF func(filename string, contents string) []KeyValue,
    ) {
        data, err := ioutil.ReadFile(inFile)
        if err != nil {
            panic(err)
        }
        //新建中间文件
        outputFiles := make([] *os.File, nReduce)
        for i := 0; i < nReduce; i++ {
            fileName := reduceName(jobName, mapTask, i)
            outputFiles[i], err = os.Create(fileName)
            if err != nil {
                panic(err)
            }
        }
        //将输入文件内容转为键值对
        keyValues := mapF(inFile, string(data))
        //根据hash规则将键值对存入中间文件
        for _, kv := range keyValues {
            index := ihash(kv.Key) % nReduce
            enc := json.NewEncoder(outputFiles[index])
            enc.Encode(kv)
        }
        for _, file := range outputFiles {
            file.Close()
        }
    }
    

    1.读取map工作结果的中间文件的键值对,并合并相同的key。
    2.对key排序。
    3.将reduceF的结果保存到mergeName()返回的文件中。

    func doReduce(
        jobName string, // the name of the whole MapReduce job
        reduceTask int, // which reduce task this is
        outFile string, // write the output here
        nMap int, // the number of map tasks that were run ("M" in the paper)
        reduceF func(key string, values []string) string,
    ) {
        inputFiles := make([] *os.File, nMap)
        for i := 0; i < nMap; i++ {
            fileName := reduceName(jobName, i, reduceTask)//注意与中间文件名的创建保持一致
            inputFiles[i], _ = os.Open(fileName)
        }
        //读取中间文件内容
        keyValues := make(map[string][]string)
        for _, inputFile := range inputFiles {
            defer inputFile.Close()
            dec := json.NewDecoder(inputFile)
            for {
                var kv KeyValue
                err := dec.Decode(&kv)
                if err != nil {
                    break
                }
                keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)
            }
        }
        //排序
        keys := make([]string, 0, len(keyValues))
        for k := range keyValues {
            keys = append(keys, k)
        }
        sort.Strings(keys)
    
        //新建结果文件,将key的统计结果存入。
        out, err := os.Create(outFile)
        if err != nil {
            log.Fatal("Error in creating file", outFile)
        }
        defer out.Close()
    
        enc := json.NewEncoder(out)
        for _, key := range keys {
            kv := KeyValue{key, reduceF(key, keyValues[key])}
            enc.Encode(kv)
        }
    }
    

    Part II: Single-worker word count

    编写main/wc.go 中的 mapF()、reduceF()方法。
    mapF() 返回一个键/值对的切片;
    reduceF() 返回这个key出现了多少次,即values的长度。

    func mapF(filename string, contents string) []mapreduce.KeyValue {
        words := strings.FieldsFunc(contents, func(r rune) bool {
            return !unicode.IsLetter(r)
        })
        res := make([]mapreduce.KeyValue, 0)
        for _, word := range words {
            res = append(res, mapreduce.KeyValue{word, ""})
        }
        return res
    }
    
    func reduceF(key string, values []string) string {
        return strconv.Itoa(len(values))
    }
    

    Part III: Distributing MapReduce tasks

    编写schedule.go中的 schedule()方法。
    1.等待所有任务完成。
    2.从registerChan中取出worker的地址,将任务分配给它。
    注:该通道为每个工作者生成一个字符串,其中包含工作者的RPC地址。有些worker可能在调用schedule()之前存在,有些可能在schedule()运行时启动;所有这些都将出现在registerChan上。schedule()应该使用所有的worker,包括在它启动后出现的那些。

    func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
        var ntasks int
        var n_other int // number of inputs (for reduce) or outputs (for map)
        switch phase {
        case mapPhase:
            ntasks = len(mapFiles)
            n_other = nReduce
        case reducePhase:
            ntasks = nReduce
            n_other = len(mapFiles)
        }
    
        fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
        wg := sync.WaitGroup{}
        wg.Add(ntasks)
    
        //将任务放入队列
        taskCh := make(chan int,ntasks)
        for idx:=0;idx<ntasks;idx++ {
            taskCh <- idx
        }
    
        //所有任务完成,关闭任务channel,退出
        go func() {
            wg.Wait()
            close(taskCh)
        }()
    
    
        for idx := range taskCh{
            arg := DoTaskArgs{
                JobName:       jobName,
                File:          mapFiles[idx],
                Phase:         phase,
                TaskNumber:    idx,
                NumOtherPhase: n_other,
            }
    
            worker := <- registerChan
            go func(worker string,arg DoTaskArgs,idx int) {
                if call(worker,"Worker.DoTask",arg,nil){
                    wg.Done()
                }else { //call失败,将任务重新放回队列
                    taskCh <- idx
                }
                //任务结束,归还工作线程
                registerChan <- worker
            }(worker,arg,idx)
    
        }
        fmt.Printf("Schedule: %v done\n", phase)
    }
    

    Part IV: Handling worker failures

    worker 工作失败的例子
    RPC失败的原因:1.请求没有达到,工作进程没有执行任务;2.工作进程可能已经执行了它,但是应答丢失;3.工作进程可能仍然在执行,但是主进程的RPC超时了。
    代码实现参考part 3;

    Part V: Inverted index generation (optional, does not count in grade)

    生成倒排索引

    func mapF(document string, value string) (res []mapreduce.KeyValue) {
        words := strings.FieldsFunc(value, func(r rune) bool {
            return !unicode.IsLetter(r)
        })
        s := make(map[string]bool)
        for _, word := range words {
            lower := strings.ToLower(word)
            upper := strings.ToUpper(word)
            _, hasUpper := s[lower]
            _, hasLower := s[upper]
            if !hasLower && !hasUpper {
                if lower == word {
                    s[lower] = true
                } else if upper == word {
                    s[upper] = true
                }
            }
        }
    
        for k, _ := range s {
            res = append(res, mapreduce.KeyValue{k, document})
        }
        return
    }
    
    func reduceF(key string, values []string) string {
        sort.Strings(values)
        return strconv.Itoa(len(values)) + " " + strings.Join(values, ",")
    }
    

    相关文章

      网友评论

          本文标题:MIT-6.824 Lab1: MapReduce-2018

          本文链接:https://www.haomeiwen.com/subject/hpozvctx.html