美文网首页
Note-Hadoop: Reduce side data jo

Note-Hadoop: Reduce side data jo

作者: rua_rua_rua | 来源:发表于2017-04-19 12:50 被阅读0次

    Goal:

    实现两组数据的合并
    input data 1:

    place_id \t woeid \t latitude \t longitude \t place_name \t place_type_id \t place_url

    Input data 2:

    photo_id \t owner \t tags \t date_taken \t place_id \t accuracy


    For data join, usually there are two different ways:

    1. map side join
    2. reduce side join


      map-side-join ![Reduce-side-Join](https://img.haomeiwen.com/i5688533/6052e3bfe7ac2fff.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

    map side join通常要求从 dataset->Map 的数据有序,否则,其时间复杂度将是n的x次方,x为dataset的个数。
    因为我们的Input data 2 含有很多不同的dataset, 可能会消耗很长的时间,所以选择使用reduce side join。


    Expect mapper output

    output data 1:

    place_id#0 \t place_type_id \t place_url

    output data 2:

    place_id#1 \t photo_id \t tags

    我们将place_id 作为key,对来源不同的数据组在‘#’后加上数字予以辨别,然后再选取需要的数据从mapper当中进行输出。


    Shuffle

    我们在shuffle当中进行一个partion:根据place_id作为key当成第一主键,‘#’后的数字标记label为第二主键进行排序和分区。
    排序和分区后的数据进入reducer

    可以通过

    -D mapreduce.partition.keypartitioner.options=-k1,1 \

    进行实现


    Reducer

    此时进入reducer的data应该是排序分区好了的sorted data
    Input data 1:

    place_id#0 \t place_type_id \t place_url

    Input data 2:

    place_id#1 \t photo_id \t tags

    此时我们期望的output应该是通过place_id 作为key将两组数据连接在一起
    Output data:

    photo_id \t tags \t place_type_id \t place_url


    Code

    具体的代码实现如下
    Mapper:

    #!/usr/bin/python3
    
    import sys
    
    
    def multi_mapper():
        """ This mapper will output different format dependind on input type
        If input is place file:
        Input format: place_id \t woeid \t latitude \t longitude \t place_name \t place_type_id \t place_url
        Output format: place_id#0 \t place_type_id \t place_url
                    
        If input is photo file:
        Input format: photo_id \t owner \t tags \t date_taken \t place_id \t accuracy
        Output format: place_id#1 \t photo_id \t tags
        """
        for line in sys.stdin:
            parts = line.strip().split("\t")
            
            if len(parts) == 7:
                place_id, place_type_id, place_url = parts[0].strip(), parts[5].strip(), parts[6].strip()
                if place_type_id == '7' or place_type_id == '22':
                    print(place_id + "#0\t" + place_type_id + "\t" + place_url)
            
            elif len(parts) == 6:
                photo_id, place_id, tags = parts[0].strip(), parts[4].strip(), parts[2].strip()
                print(place_id + "#1\t" + photo_id + "\t" + tags)
    
    
    if __name__ == "__main__":
        multi_mapper()
    

    Reducer:

    #!/usr/bin/python3
    
    import sys
    
    def read_map_output(file):
        """ Return an key-value pair extracted from file (sys.stdin).
        Input format: key \t value
        Output format: (key, value)
        """
        for line in file:
            yield line.strip().split('\t', 1)
    
    
    def combine_place():
        """ This reducer run reduce side join
        Input format: place_id#0 \t place_type_id \t place_url
                      place_id#1 \t photo_id \t tags
        Ourput format: photo_id  \t tags \t place_type_id \t place_url
        """
    
        data = read_map_output(sys.stdin)
    
        current_place_id = ''
        current_place_url_and_type_id= 'NULL'
        for key, value in data:
            # check input is valid
            if key == '':
                continue
    
            # split key by '#' , get the place_id and a number
            key = key.split('#')
    
            # check the key-value pair come from place or come from photo
            if key[0] != current_place_id:
                if key[1] == '0':
                    current_place_id = key[0]
                    current_place_url_and_type_id = value
                else:
                    current_place_id = key[0]
                    current_place_url_and_type_id = 'NULL'
    
                    print(value + '\t' + current_place_url_and_type_id)
            else:
                print(value + '\t' + current_place_url_and_type_id)
    
        
    
    if __name__ == '__main__':
        combine_place()
    

    相关文章

      网友评论

          本文标题:Note-Hadoop: Reduce side data jo

          本文链接:https://www.haomeiwen.com/subject/mmwczttx.html