美文网首页
MapReduce翻译及笔记

MapReduce翻译及笔记

作者: hugo54 | 来源:发表于2019-11-04 08:35 被阅读0次

    Execution Overview

    1. 把input files分割成M个部分。在机器集群上启动若干个程序的拷贝,其中有个特殊的拷贝叫master,其余的是worker,它们在master上注册。一共有M个map task和R个reduce task等待被分配给worker。

    2. 被分配到map task的worker,把M个input file pieces转换成Key-Value,存储在内存中。(在实际应用中,内存中的数据会被周期性地写入到硬盘中,根据partitioning function,被分配到R个区域。这些被写入磁盘的数据的位置,会被传给master。)

    3. 被分配到reduce task的worker,它们知道map task的output位置,使用RPC来获取这些output。当reduce worker获取到所有的intermediate data,就根据Key来排序,以确保相同Key的Key-Value聚集在一起。(如果内存装不下,需要采用外部排序)

    4. reduce worker遍历已排序的数据,对于每一个Key,把Key和它的Value集合作为参数传给Reduce(key string, values []string) string,Reduce函数的output会被添加到这个reduce partition最终的output里。

    Types

    // map    (k1, v1)       → list(k2, v2)
    // reduce (k2, list(v2)) → list(v2)
    ​
    type KeyValue struct {
        Key   string
        Value string
    }
    ​
    mapFuc(key string, value string) []KeyValue
    ​
    // Typically just zero or one output value is produced per Reduce invocation
    reduceFuc(key string, values []string) string
    

    Example

    1. Count of Word Frequency of a Large Collections of Documents

    func mapFuc(filename string, contents string) []KeyValue {
        var output []KeyValue
        keys := strings.FieldsFunc(contents, func(r rune) bool {
            return !unicode.IsLetter(r) && !unicode.IsNumber(r)
        })
        for _, k := range keys {
            output = append(output, KeyValue{Key: k, Value: "1"})
        }
        return output
    }
    
    func reduceFuc(key string, values []string) string {
        num := 0
        for _, value := range values {
            i, _ := strconv.Atoi(value)
            num += i
        }
        return strconv.Itoa(num)
    }
    

    2. Distributed Grep

    const pattern = "your pattern here"
    
    // The map function emits a line if it matches a supplied pattern
    func mapFuc(filename string, contents string) []KeyValue {
        var output []KeyValue
        
        // seperates file contents by line
        keys := strings.FieldsFunc(contents, func(r rune) bool {
            return r != '\n'
        })
        for _, k := range keys {
            matched, _ := regexp.Match(pattern, []byte(k))
            if (matched) {
                output = append(output, KeyValue{Key: k, Value: ""})
            }
        }
        return output
    }
    
    // The reduce function is an identity function that 
    // just copies the supplied intermediate data to the output
    func reduceFuc(key string, values[] string) string {
        return key
    }
    

    3. Count of URL Access Frequency

    // The map function processes logs of web page requests and outputs <URL, 1>
    func mapFuc(filename string, contents string) []KeyValue {
    }
    
    // The reduce function adds together all values for the same URL 
    // and emits a <URL, total count> pairs
    func reduceFuc(key string, values[] string) string {
    }
    

    4. Reverse Web-Link Graph

    // The map function outputs <target, source> pairs for each link 
    // to target URL found in a page named source
    func mapFuc(filename string, contents string) []KeyValue {
    }
    
    // The reduce function concatenates the list of all source URLs 
    // associated with a given target URL and emits the pair: <target, list(source)>
    func reduceFuc(key string, values[] string) string {
    }
    

    5. Term-Vector per Host

    /*
        A term summarizes the most important words that occur in a document 
        or a set of documents as a list of <work, frequency> pairs.
        
        What is a term vector? 
        http://www.inf.ed.ac.uk/teaching/courses/tts/pdf/vspace-2x2.pdf
    */
    
    // The map function emits a <hostname, term vector> pair for each input document.
    // (where the hostname is extracted from the URL of the document)
    func mapFuc(filename string, contents string) []KeyValue {
    }
    
    // The reduce function is passed all per-document term vectors for a given host.
    // It adds these term vectors together, throwing away infrequent terms, 
    // and then emits a final <hostname, term vector> pair.
    func reduceFuc(key string, values[] string) string {
    }
    

    6. Inverted Index

    // The map function parses each document, 
    // and emits a sequence of <word, document ID> pairs.
    func mapFuc(filename string, contents string) []KeyValue {
        var output []KeyValue;
        words := strings.FieldsFunc(contents, func(r rune) bool {
            return !unicode.IsLetter(r) && !unicode.IsNumber(r)
        })
        for _, word := range words {
            output = append(output, KeyValue{Key: word, Value: filename})
        }
        return output
    }
    
    // The reduce function accepts all pairs for a given word, 
    // sorts the corresponding document IDs and emits a <word, list(documentID)> pair.
    // The set of all output pairs forms a simple inverted index.
    // It is easy to augment this computation to keep track of word positions.
    func reduceFuc(key string, values[] string) string {
        
    }
    

    7. Distributed Sort

    // The map function extracts the key from each record, and emits a <key, record> pair.
    func mapFuc(filename string, contents string) []KeyValue {
    }
    
    // The reduce function emits all pairs unchanged.
    // (This computation depends on the partitioning facilties described in
    // Section 4.1 and the ordering properties described in Section 4.2)
    func reduceFuc(key string, values[] string) string {
    }
    

    How To Handle Worker Failures?

    Master周期性ping已注册的worker,如果没有响应(不管是延迟还是真的挂了),master就认为这个worker挂了。分配给挂掉的worker的task会被重新分配给空闲的worker:

    • 已完成的map task在worker挂掉的情况下被重新执行,因为存储在local machine的output已经不可获取。

    • 已完成的reduce task在worker挂掉的情况下不会被重新执行,因为它的output已经存储在global file system中。

    如果一个map task先后被worker A和worker B执行(因为worker A挂了),所有在执行reduce task的worker都会被告知这次re-execution。然后,所有还没读worker A产生的数据的reduce task都会从worker B读。

    How To Handle Master Failure?

    周期性地把master的数据写入硬盘(checkpoints),如果master挂了,就从最新的checkpoint恢复。

    master挂了以后,停止MapReduce的计算,以确保一致性。

    Reference

    相关文章

      网友评论

          本文标题:MapReduce翻译及笔记

          本文链接:https://www.haomeiwen.com/subject/fdjrbctx.html