美文网首页
6.824:MapReduce

6.824:MapReduce

作者: Ahungrynoob | 来源:发表于2018-07-05 17:43 被阅读0次

MapReduce 练手

Part I

common_map.go

package mapreduce

import (
    "encoding/json"
    "hash/fnv"
    "log"
    "os"
)

func doMap(
    jobName string, // the name of the MapReduce job
    mapTask int, // which map task this is
    inFile string,
    nReduce int, // the number of reduce task that will be run ("R" in the paper)
    mapF func(filename string, contents string) []KeyValue,
) {
    //
    // doMap manages one map task: it should read one of the input files
    // (inFile), call the user-defined map function (mapF) for that file's
    // contents, and partition mapF's output into nReduce intermediate files.
    //
    // There is one intermediate file per reduce task. The file name
    // includes both the map task number and the reduce task number. Use
    // the filename generated by reduceName(jobName, mapTask, r)
    // as the intermediate file for reduce task r. Call ihash() (see
    // below) on each key, mod nReduce, to pick r for a key/value pair.
    //
    // mapF() is the map function provided by the application. The first
    // argument should be the input file name, though the map function
    // typically ignores it. The second argument should be the entire
    // input file contents. mapF() returns a slice containing the
    // key/value pairs for reduce; see common.go for the definition of
    // KeyValue.
    //
    // Look at Go's ioutil and os packages for functions to read
    // and write files.
    //
    // Coming up with a scheme for how to format the key/value pairs on
    // disk can be tricky, especially when taking into account that both
    // keys and values could contain newlines, quotes, and any other
    // character you can think of.
    //
    // One format often used for serializing data to a byte stream that the
    // other end can correctly reconstruct is JSON. You are not required to
    // use JSON, but as the output of the reduce tasks *must* be JSON,
    // familiarizing yourself with it here may prove useful. You can write
    // out a data structure as a JSON string to a file using the commented
    // code below. The corresponding decoding functions can be found in
    // common_reduce.go.
    //
    //   enc := json.NewEncoder(file)
    //   for _, kv := ... {
    //     err := enc.Encode(&kv)
    //
    // Remember to close the file after you have written all the values!
    //
    // Your code here (Part I).
    //

    inputFile, err := os.Open(inFile)
    if err != nil {
        log.Fatal("doMap: open input file: ", inFile, " error: ", err)
    }
    defer inputFile.Close()

    fileInfo, err := inputFile.Stat()
    if err != nil {
        log.Fatal("doMap: getstat input file: ", inFile, " error: ", err)
    }

    inputFileContent := make([]byte, fileInfo.Size())
    _, err = inputFile.Read(inputFileContent)
    if err != nil {
        log.Fatal("doMap: read input file: ", inFile, " error: ", err)
    }

    keyValues := mapF(inFile, string(inputFileContent))
    for i := 0; i < nReduce; i++ {
        fileName := reduceName(jobName, mapTask, i)
        reduceFile, err := os.Create(fileName)
        if err != nil {
            log.Fatal("doMap: create input file: ", inFile, " error: ", err)
        }
        defer reduceFile.Close()
        enc := json.NewEncoder(reduceFile)
        for _, kv := range keyValues {
            if ihash(kv.Key)%nReduce == i {
                err := enc.Encode(&kv)
                if err != nil {
                    log.Fatal("doMap: json encode error: ", err)
                }
            }
        }
    }
}

func ihash(s string) int {
    h := fnv.New32a()
    h.Write([]byte(s))
    return int(h.Sum32() & 0x7fffffff)
}

common_reduce.go

package mapreduce

import (
    "encoding/json"
    "log"
    "os"
    "sort"
)

func doReduce(
    jobName string, // the name of the whole MapReduce job
    reduceTask int, // which reduce task this is
    outFile string, // write the output here
    nMap int, // the number of map tasks that were run ("M" in the paper)
    reduceF func(key string, values []string) string,
) {
    //
    // doReduce manages one reduce task: it should read the intermediate
    // files for the task, sort the intermediate key/value pairs by key,
    // call the user-defined reduce function (reduceF) for each key, and
    // write reduceF's output to disk.
    //
    // You'll need to read one intermediate file from each map task;
    // reduceName(jobName, m, reduceTask) yields the file
    // name from map task m.
    //
    // Your doMap() encoded the key/value pairs in the intermediate
    // files, so you will need to decode them. If you used JSON, you can
    // read and decode by creating a decoder and repeatedly calling
    // .Decode(&kv) on it until it returns an error.
    //
    // You may find the first example in the golang sort package
    // documentation useful.
    //
    // reduceF() is the application's reduce function. You should
    // call it once per distinct key, with a slice of all the values
    // for that key. reduceF() returns the reduced value for that key.
    //
    // You should write the reduce output as JSON encoded KeyValue
    // objects to the file named outFile. We require you to use JSON
    // because that is what the merger than combines the output
    // from all the reduce tasks expects. There is nothing special about
    // JSON -- it is just the marshalling format we chose to use. Your
    // output code will look something like this:
    //
    // enc := json.NewEncoder(file)
    // for key := ... {
    //  enc.Encode(KeyValue{key, reduceF(...)})
    // }
    // file.Close()
    //
    // Your code here (Part I).
    //
    keyValues := make(map[string][]string, 0)
    //find the keys in all map tasks for reduceTask i
    for i := 0; i < nMap; i++ {
        fileName := reduceName(jobName, i, reduceTask)
        file, err := os.Open(fileName)
        if err != nil {
            log.Fatal("doReduce: open intermediate file ", fileName, " error: ", err)
        }
        defer file.Close()

        decoder := json.NewDecoder(file)
        for {
            var kv KeyValue
            err := decoder.Decode(&kv)
            if err != nil {
                break
            }
            _, ok := keyValues[kv.Key]
            if !ok {
                keyValues[kv.Key] = make([]string, 0)
            }
            keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)
        }
    }

    var keys []string
    for k := range keyValues {
        keys = append(keys, k)
    }
    sort.Strings(keys)
    resultFile, err := os.Create(outFile)
    if err != nil {
        log.Fatal("doReduce: create reduced file ", outFile, " error: ", err)
    }
    defer resultFile.Close()
    enc := json.NewEncoder(resultFile)
    for _, k := range keys {
        reducedValue := reduceF(k, keyValues[k])
        err := enc.Encode(&KeyValue{k, reducedValue})
        if err != nil {
            log.Fatal("doReduce: encode error: ", err)
        }
    }
}

Part II

word-count

func mapF(filename string, contents string) []mapreduce.KeyValue {
    // Your code here (Part II).
    words := strings.FieldsFunc(contents, func(c rune) bool {
        return !unicode.IsLetter(c)
    })
    keyValues := make([]mapreduce.KeyValue, 0)
    for _, word := range words {
        keyValues = append(keyValues, mapreduce.KeyValue{word, "1"})
    }
    return keyValues
}
func reduceF(key string, values []string) string {
    // Your code here (Part II).
    sum := 0
    for _, i := range values {
        count, err := strconv.Atoi(i)
        if err != nil {
            log.Fatal("reduceF: strconv string to int error: ", err)
        }
        sum += count
    }
    return strconv.Itoa(sum)
}

Part III

package mapreduce

import (
    "context"
    "fmt"
    "sync"
)

//
// schedule() starts and waits for all tasks in the given phase (mapPhase
// or reducePhase). the mapFiles argument holds the names of the files that
// are the inputs to the map phase, one per map task. nReduce is the
// number of reduce tasks. the registerChan argument yields a stream
// of registered workers; each item is the worker's RPC address,
// suitable for passing to call(). registerChan will yield all
// existing registered workers (if any) and new ones as they register.
//
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
    var ntasks int
    var n_other int // number of inputs (for reduce) or outputs (for map)
    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        n_other = nReduce
    case reducePhase:
        ntasks = nReduce
        n_other = len(mapFiles)
    }
    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

    // All ntasks tasks have to be scheduled on workers. Once all tasks
    // have completed successfully, schedule() should return.
    //
    // Your code here (Part III, Part IV).
    //
    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background())
    for i := 0; i < ntasks; i++ {
        wg.Add(1)
        go func(i int, phase jobPhase) {
            defer wg.Done()
            for {
                worker := <-registerChan
                var doTaskArgs DoTaskArgs
                switch phase {
                case mapPhase:
                    doTaskArgs = DoTaskArgs{
                        jobName,
                        mapFiles[i],
                        phase,
                        i,
                        n_other,
                    }
                case reducePhase:
                    doTaskArgs = DoTaskArgs{
                        jobName,
                        "",
                        phase,
                        i,
                        n_other,
                    }
                }
                ok := call(worker, "Worker.DoTask", &doTaskArgs, nil)
                if ok {
                    go func() {
                        select {
                        //上下文传递超时信息,结束goroutine
                        case registerChan <- worker:
                            fmt.Println("worker has gone phase: ", phase, " i:", i)
                            return
                                                //prevent block with context
                        case <-ctx.Done():
                            fmt.Println("exit phase: ", phase, " i:", i)
                            return
                        }
                    }()
                    break
                }
            }
        }(i, phase)
    }
    wg.Wait()
    cancel()
    fmt.Printf("Schedule: %v done\n", phase)
}

利用context防止阻塞还不是美滋滋

相关文章

  • 6.824:MapReduce

    MapReduce 练手 Part I common_map.go common_reduce.go Part I...

  • MIT 6.824 Day1

    MapReduce 参考paper:https://pdos.csail.mit.edu/6.824/papers...

  • 6.824MapReduce

    介绍 MapReduce的实验基于论文Mapreduce 以计数为例 输入是M个文件 Input1 -> Map ...

  • mit6.824-(lab1)

    mit-6.824 lab1文档这部分是实现和理解mapreduce论文,实现简单的mapreduce框架 主要设...

  • 6.824 Lab 1: MapReduce

    计划过很多次,终于开始了6.824的征程;希望一鼓作气! 一: MapReduce逻辑 二: 实验任务 完成用户端...

  • MapReduce 6.824 学习笔记

    map函数和reduce函数 这两个函数是交给用户实现的,这两个函数定义了任务本身。 map函数:接受一个键值对(...

  • 6.824-Lab1: MapReduce

    MapReduce, 批处理的典型之一。主要思想即“分而治之”,将一大批数据(一个大任务)分成多个子任务,分别进行...

  • 6.824 Lab01 MapReduce

    一些废话:学6.824是因为实在是喜欢存储,不想再做监控运维之类无聊的工作,想真正成长为一个专业的分布式存储工程师...

  • 分布式系统学习1-mapreduce实现

    MIT6.824 2017课程作业的lab1,使用go语言实现mapreduce。框架代码来自 git://g.c...

  • 6.824 Note1: MapReduce (2004)

    一:问题背景 很多计算任务涉及到海量数据的处理,想要在可以接受的时间内完成计算任务,就必须将这些任务分布到成百上千...

网友评论

      本文标题:6.824:MapReduce

      本文链接:https://www.haomeiwen.com/subject/xwopuftx.html