美文网首页MIT 6.824
MIT6.824 Lab1 MapReduce

MIT6.824 Lab1 MapReduce

作者: 马天猫Masn | 来源:发表于2018-04-12 20:48 被阅读0次

    lab1是在单机上实现mapreduce库,因为没有分布式环境,所以只能实现序列化操作和用并行操作代替分布式操作。

    首先看一下流程,主函数在src/main/wc.go里,自己提供的map和reduce函数,这次做的主要是wordcount,所以map和reduce函数为:

    func mapF(filename string, contents string) []mapreduce.KeyValue {
        // Your code here (Part II).
        f := func(c rune) bool{
            return !unicode.IsLetter(c)
        }
        s := strings.FieldsFunc(contents, f)
        kv := make([]mapreduce.KeyValue, 0)
        for _, k := range s{
            kv = append(kv, mapreduce.KeyValue{k,"1"})
        }
        return kv
    }
    
    func reduceF(key string, values []string) string {
        // Your code here (Part II).
        count := 0
        for _, v := range values {
            vv, _ := strconv.Atoi(v)
            count = count + vv
        }
        return strconv.Itoa(count)
    }
    

    然后有两种测试的方法,一种是Sequential,另外种是Distributed,首先去实现Sequential的方法吧。这样可以测试你一些功能函数实现对不对。


    Sequential

    func Sequential(jobName string, files []string, nreduce int,
        mapF func(string, string) []KeyValue,
        reduceF func(string, []string) string,
    ) (mr *Master) {
        mr = newMaster("master")
        go mr.run(jobName, files, nreduce, func(phase jobPhase) {
            switch phase {
            case mapPhase:
                for i, f := range mr.files {
                    doMap(mr.jobName, i, f, mr.nReduce, mapF)
                }
            case reducePhase:
                for i := 0; i < mr.nReduce; i++ {
                    doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
                }
            }
        }, func() {
            mr.stats = []int{len(files) + nreduce}
        })
        return
    }
    

    main函数调用Sequential实现,在src/mapreduce/master.go:60,传的参数值有jobName,files是输入文件,nreduce是reduce的输入文件个数,map和reduce的函数。

    首先创建一个master,然后开一个线程运行run函数,run的第三个参数是一个函数,定义为当phase是mapPhase时调用doMap,是reducePhase时调用doReduce,第四个参数是finish函数。

    func (mr *Master) run(jobName string, files []string, nreduce int,
        schedule func(phase jobPhase),
        finish func(),
    ) {
        mr.jobName = jobName
        mr.files = files
        mr.nReduce = nreduce
    
        fmt.Printf("%s: Starting Map/Reduce task %s\n", mr.address, mr.jobName)
    
        schedule(mapPhase)
        schedule(reducePhase)
        finish()
        mr.merge()
    
        fmt.Printf("%s: Map/Reduce task completed\n", mr.address)
    
        mr.doneChannel <- true
    }
    

    可以看到run函数定义了先执行mapPhase,再执行reducePhase,schedule函数为Sequential里定义的串行执行。最后merge reduce task的输出文件。mr.doneChannel中输入true值(main函数里调用的mr.Wait()需要mr.doneChannel中有值,不然会阻塞等待)

    然后我们去看一下doMap和doReduce的实现。

    func doMap(
        jobName string, // the name of the MapReduce job
        mapTask int, // which map task this is
        inFile string,
        nReduce int, // the number of reduce task that will be run ("R" in the paper)
        mapF func(filename string, contents string) []KeyValue,
    ) {
        data, _ := ioutil.ReadFile(inFile)
        mapkv := mapF(inFile, string(data))
        f := make([]*os.File, nReduce)
        for i := 0; i < nReduce; i++{
            filename := reduceName(jobName, mapTask, i)
            f[i], _ = os.OpenFile(filename, os.O_RDONLY|os.O_WRONLY|os.O_CREATE, 0666)
            defer f[i].Close()
        }
        for _, kv := range mapkv{
            r := ihash(kv.Key) % nReduce
            enc := json.NewEncoder(f[r])
            enc.Encode(&kv)
        }
    }
    

    doMap的输入是一个文件,输出应该是nReduce个中间文件,先执行mapF,得到一个kv结构的数组。之后主要是需要调用读写文件的api,创建nReduce个中间文件,并且打开它们的描述符,用defer最后关闭。然后把kv数组的每个key做hash之后用json编码后存入对应文件描述符。

    type ByKey []KeyValue
    func(a ByKey) Len() int {return len(a)}
    func(a ByKey) Swap(i, j int) {a[i], a[j] = a[j], a[i]}
    func(a ByKey) Less(i, j int) bool {return a[i].Key < a[j].Key}
    func doReduce(
        jobName string, // the name of the whole MapReduce job
        reduceTask int, // which reduce task this is
        outFile string, // write the output here
        nMap int, // the number of map tasks that were run ("M" in the paper)
        reduceF func(key string, values []string) string,
    ) {
        kvslice := make([]KeyValue, 0)
        for i := 0; i < nMap; i++{
            filename := reduceName(jobName, i, reduceTask)
            f, _ := os.OpenFile(filename, os.O_RDONLY, 0666)
            defer f.Close()
            dec := json.NewDecoder(f)
            var kv KeyValue
            for{ 
                err := dec.Decode(&kv)
                if err != nil {
                    break
                }else{
                    kvslice = append(kvslice, KeyValue(kv))
                }
            }
        }
        sort.Sort(ByKey(kvslice))
        lenkv := len(kvslice)
        value := make([]string, 0)
        ff, _ := os.OpenFile(outFile, os.O_CREATE|os.O_RDONLY|os.O_WRONLY,0666)
        defer ff.Close()
        enc := json.NewEncoder(ff)
        for i := 0; i < lenkv; i++{
            if i != 0 && kvslice[i].Key != kvslice[i-1].Key{
                s := reduceF(kvslice[i-1].Key, value)
                enc.Encode(&KeyValue{kvslice[i-1].Key, s})
                value = make([]string, 0)
            }
            value = append(value, kvslice[i].Value)
        }
        if len(value) != 0{
            s := reduceF(kvslice[lenkv-1].Key, value)
            enc.Encode(KeyValue{kvslice[lenkv-1].Key, s})
        }
    }
    

    doReduce的输入应该是对应每个reduce任务,分别从nMap个map任务中读取对应的中间文件来执行reduceF。所以对应每个reduce task,首先需要得到nMap个中间文件的name,然后读取出来,用json格式解码,把每个kv结构解码出来存入kvslice中,然后调用结构体排序,结构体排序需要定义三个规则,如上Len,Swap,Less,即可,然后将排好序的kvslice,对于每一个key值的list,输入到reduceF中计算得到一个值,然后写入最终输出文件。


    Distributed

    Sequential执行非常简单,下面就是实现Distributed了,分布式的略为复杂一些,主要是实现schedule这个函数,让我们来看一下流程吧。

    func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
        mr = newMaster(master)
        mr.startRPCServer()
        go mr.run(jobName, files, nreduce,
            func(phase jobPhase) {
                ch := make(chan string)
                go mr.forwardRegistrations(ch)
                schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
            },
            func() {
                mr.stats = mr.killWorkers()
                mr.stopRPCServer()
            })
        return
    }
    
    

    首先调用startRPCServer()开启rpc服务器

    func (mr *Master) startRPCServer() {
        rpcs := rpc.NewServer()
        rpcs.Register(mr)
        os.Remove(mr.address) // only needed for "unix"
        l, e := net.Listen("unix", mr.address)
        if e != nil {
            log.Fatal("RegstrationServer", mr.address, " error: ", e)
        }
        mr.l = l
    
        // now that we are listening on the master address, can fork off
        // accepting connections to another thread.
        go func() {
        loop:
            for {
                select {
                case <-mr.shutdown:
                    break loop
                default:
                }
                conn, err := mr.l.Accept()
                if err == nil {
                    go func() {
                        rpcs.ServeConn(conn)
                        conn.Close()
                    }()
                } else {
                    debug("RegistrationServer: accept error", err)
                    break
                }
            }
            debug("RegistrationServer: done\n")
        }()
    }
    

    注册一个rpc服务器,然后监听一个地址,因为这个是在单机上实现的并行,所以类型是unix,地址是本地地址。然后开一个线程不停的循环监听,收到请求就ServeConn(conn)建立连接。这样本地就有个线程一直在监听请求。

    接着Distributed也开一个线程运行run函数,这次的第三个参数schedule创建了一个channel,然后开启了一个线程去执forwardRegistrations

    func (mr *Master) forwardRegistrations(ch chan string) {
        i := 0
        for {
            mr.Lock()
            if len(mr.workers) > i {
                // there's a worker that we haven't told schedule() about.
                w := mr.workers[i]
                go func() { ch <- w }() // send without holding the lock.
                i = i + 1
            } else {
                // wait for Register() to add an entry to workers[]
                // in response to an RPC from a new worker.
                mr.newCond.Wait()
            }
            mr.Unlock()
        }
    }
    

    这个的主要作用是当有worker新加入列表,就把它加到刚才创建的channel里,可以看做是worker队列。
    注意这行go func() { ch <- w }(),因为ch是无缓冲channel,里面只能放进去就取出来,不能放两个值,不然会死锁,所以要额外开一个线程,以免父线程被阻塞。

    schedule分别在mapPhase和reducePhase的时候调用我们自己实现的schedule函数。

    func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
        var ntasks int
        var n_other int // number of inputs (for reduce) or outputs (for map)
        switch phase {
        case mapPhase:
            ntasks = len(mapFiles)
            n_other = nReduce
        case reducePhase:
            ntasks = nReduce
            n_other = len(mapFiles)
        }
    
        fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
    
        // All ntasks tasks have to be scheduled on workers. Once all tasks
        // have completed successfully, schedule() should return.
        //
        // Your code here (Part III, Part IV).
        //
        // var done sync.WaitGroup
        // for count := 0; count < ntasks; count++{
        //  var w, filename string
        //  w = <- registerChan
        //  if phase == mapPhase{
        //      filename = mapFiles[count]
        //  }else{
        //      filename = ""
        //  }
        //  done.Add(1)
        //  go func(count int){
        //      defer done.Done()
        //      call(w, "Worker.DoTask", &DoTaskArgs{jobName, filename, phase, count, n_other}, new(struct{}))
        //      println("Start ...")
        //      registerChan <- w
        //      println("Stop ...")
        //  }(count)
        // }    
        // done.Wait()
        
        var done sync.WaitGroup
        done.Add(ntasks)
        ch_task := make(chan int, ntasks)
        for i := 0; i < ntasks; i++{
            ch_task <- i
        }
        go func(){
            for {
                w := <- registerChan
                go func(w string){
                    for{
                        task_id := <- ch_task
                        var filename string
                        if phase == mapPhase{
                            filename = mapFiles[task_id]
                        }else{
                            filename = ""
                        }
                        ok := call(w, "Worker.DoTask", &DoTaskArgs{jobName, filename, phase, task_id, n_other}, new(struct{}))
                        if ok == false{
                            ch_task <- task_id
                        }else{
                            done.Done()
                        }
                    }
                }(w)
            }
        }()
        done.Wait()
        fmt.Printf("Schedule: %v done\n", phase)
    }
    

    有两种实现方法,下面的比较好,注释掉的有错误。
    下面那种实现方法用WaitGroup,首先把WaitGroup的值设为ntasks,然后创建一个有缓冲的任务channel,把所有任务往channel里塞进去,然后开一个线程,无限循环,当worker channel里有空闲的worker,就开一个线程给它,让它无限循环,只要任务channel里有没执行的任务,就取出来执行,master通过rpc调用worker w的DoTask函数,因为这里是在本地运行的,所以前面mr已经开启过rpc服务器了,所以worker不需要开启rpc服务器了。如果有的任务失败了,就把任务继续塞进任务channel里,实现解决worker failure

    上面说注释掉的写法不好是因为,循环任务,然后把可用的worker给取出来进行操作,registerChan <- w如果有多个w放进去,就会阻塞,导致Wait()无法完成,一直阻塞。虽然可以go func(){registerChan <- w},但是总觉得不太好,MR算法应该是每个worker开个线程一直工作,有任务来了就做,比较符合分布式的MR算法。

    然后和Sequential一样执行完map和reduce任务之后,调用finish,分布式的finish需要先killWorkers

    func (mr *Master) killWorkers() []int {
        mr.Lock()
        defer mr.Unlock()
        ntasks := make([]int, 0, len(mr.workers))
        for _, w := range mr.workers {
            debug("Master: shutdown worker %s\n", w)
            var reply ShutdownReply
            ok := call(w, "Worker.Shutdown", new(struct{}), &reply)
            if ok == false {
                fmt.Printf("Master: RPC %s shutdown error\n", w)
            } else {
                ntasks = append(ntasks, reply.Ntasks)
            }
        }
        return ntasks
    }
    

    用rpc调用worker.shutdown,然后mr关闭RPC服务器。


    第一次用go,可能写的比较挫以及专业词汇使用错误,仍需努力。

    相关文章

      网友评论

        本文标题:MIT6.824 Lab1 MapReduce

        本文链接:https://www.haomeiwen.com/subject/jvmfkftx.html