美文网首页
记一次MapReduce的简单优化

记一次MapReduce的简单优化

作者: 鹅鹅鹅_ | 来源:发表于2019-01-01 11:54 被阅读0次

    一、背景描述

    1. 功能描述
      简单的WordCount程序,但是利用Python来实现
    2. 输入文件
    • 内容格式
    noe two three
    noe two three
    noe two three
    ...
    
    • 文件大小
    [hadoop@master ~]$ du -mh test.txt 
    1.7G    test.txt
    
    • mapper实现代码
    #!/usr/bin/env python2.6
    
    import sys
    
    # maps words to their counts
    word2count = {}
    
    # input comes from STDIN (standard input)
    for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()
        # split the line into words while removing any empty strings
        words = filter(lambda word: word, line.split())
        # increase counters
        for word in words:
            # write the results to STDOUT (standard output);
            # what we output here will be the input for the
            # Reduce step, i.e. the input for reducer.py
            #
            # tab-delimited; the trivial word count is 1
            print '%s\t%s' % (word, 1)
    
    
    • reducer实现代码
    #!/usr/bin/env python2.6
     
    from operator import itemgetter
    import sys
     
    # maps words to their counts
    word2count = {}
     
    # input comes from STDIN
    for line in sys.stdin:
        # remove leading and trailing whitespace
        line = line.strip()
     
        # parse the input we got from mapper.py
        word, count = line.split()
        # convert count (currently a string) to int
        try:
            count = int(count)
            word2count[word] = word2count.get(word, 0) + count
        except ValueError:
            # count was not a number, so silently
            # ignore/discard this line
            pass
     
    # sort the words lexigraphically;
    #
    # this step is NOT required, we just do it so that our
    # final output will look more like the official Hadoop
    # word count examples
    sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
     
    # write the results to STDOUT (standard output)
    for word, count in sorted_word2count:
        print '%s\t%s'% (word, count)
    
    

    二、优化之前

    1. 运行指令
    [hadoop@master ~]$ hadoop jar hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -D mapred.reduce.tasks=3 -input test -output output5 -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py
    

    因为输入文件内容可以分为三类key:one、two、three,所以应该是每类key对应一个reducer,故用-D mapred.reduce.tasks=3指定reducer个数为3。

    1. 运行结果分析
    17/03/22 10:27:52 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.236:8032
    17/03/22 10:27:52 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.236:8032
    ...
    17/03/22 10:47:21 INFO mapreduce.Job:  map 100% reduce 100%
    17/03/22 10:47:21 INFO mapreduce.Job: Job job_1490088877897_0004 completed successfully
    17/03/22 10:47:21 INFO mapreduce.Job: Counters: 50
    
    

    从输出信息可以看到,总共运行了20分钟。
    但是从输出文件来看

    [hadoop@master ~]$ hdfs dfs -ls output5
    Found 4 items
    -rw-r--r--   2 hadoop supergroup          0 2017-03-22 10:47 output5/_SUCCESS
    -rw-r--r--   2 hadoop supergroup         16 2017-03-22 10:35 output5/part-00000
    -rw-r--r--   2 hadoop supergroup          0 2017-03-22 10:31 output5/part-00001
    -rw-r--r--   2 hadoop supergroup         28 2017-03-22 10:47 output5/part-00002
    [hadoop@master ~]$ hdfs dfs -cat output5/part-00000
    three   130808991
    [hadoop@master ~]$ hdfs dfs -cat output5/part-00002
    noe 130808991
    two 130808991
    
    

    其中有一个reducer的输出大小为0,并且有两个key在同一个输出文件中,即被同一个reducer处理了。而且在运行过程中通过监控系统可以看到有一个reducer几乎是瞬间运行完毕。

    三、第一次优化尝试

    1. 原理分析
      从上面的运行结果分析来看,应该是map完后进行Partition的时候hash值重复了,即将noe和two当做了一类key,并输出到了一个reducer中,最终导致“数据倾斜”。所以,我们要做的就是自定义Partition。但是如何在命令行中自定义hash函数呢?Nothing is better than the official document。官方文档给了例子:
    hadoop jar hadoop-streaming-2.7.3.jar \
      -D stream.map.output.field.separator=. \
      -D stream.num.map.output.key.fields=4 \
      -D map.output.key.field.separator=. \
      -D mapreduce.partition.keypartitioner.options=-k1,2 \
      -D mapreduce.job.reduces=12 \
      -input myInputDirs \
      -output myOutputDir \
      -mapper /bin/cat \
      -reducer /bin/cat \
      -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
    Here, -D stream.map.output.field.separator=. and -D stream.num.map.output.key.fields=4 are as explained in previous example. The two variables are used by streaming to identify the key/value pair of mapper.
    
    The map output keys of the above Map/Reduce job normally have four fields separated by “.”. However, the Map/Reduce framework will partition the map outputs by the first two fields of the keys using the -D mapred.text.key.partitioner.options=-k1,2 option. Here, -D map.output.key.field.separator=. specifies the separator for the partition. This guarantees that all the key/value pairs with the same first two fields in the keys will be partitioned into the same reducer.
    
    1. 优化执行
    • 优化指令
        [hadoop@master ~]$ hadoop jar hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -D mapred.reduce.tasks=3 -input test -output output -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
    
    • 运行结果
    17/03/22 11:06:36 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.236:8032
    17/03/22 11:06:37 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.236:8032
    ...
    17/03/22 11:18:49 INFO mapreduce.Job:  map 100% reduce 100%
    17/03/22 11:18:49 INFO mapreduce.Job: Job job_1490088877897_0005 completed successfully
    17/03/22 11:18:50 INFO mapreduce.Job: Counters: 51
    [hadoop@master ~]$ hdfs dfs -ls output
    Found 4 items
    -rw-r--r--   2 hadoop supergroup          0 2017-03-22 11:18 output/_SUCCESS
    -rw-r--r--   2 hadoop supergroup          0 2017-03-22 11:11 output/part-00000
    -rw-r--r--   2 hadoop supergroup         28 2017-03-22 11:18 output/part-00001
    -rw-r--r--   2 hadoop supergroup         16 2017-03-22 11:15 output/part-00002
    [hadoop@master ~]$ 
    [hadoop@master ~]$ hdfs dfs -cat output/part-00001
    noe 130808991
    two 130808991
    [hadoop@master ~]$ hdfs dfs -cat output/part-00002
    three   130808991
    [hadoop@master ~]$ 
    
    

    可以看到,总共运行了12分钟,提高了8分钟。但是通过过程监控,以及结果分析,输出文件依然有一个空文件问题依然存在。

    三、第二次优化尝试

    1. 原理分析
      其实从上面来看,应该是hadoop自带的partion分区并没有把noe和two分开,因为他们都%redcers,导致很容易被分配到同一个reducer。故需要自定义pation方法。见另一篇文章。

    相关文章

      网友评论

          本文标题:记一次MapReduce的简单优化

          本文链接:https://www.haomeiwen.com/subject/eqrnlqtx.html