美文网首页
记一次MapReduce的简单优化

记一次MapReduce的简单优化

作者: 鹅鹅鹅_ | 来源:发表于2019-01-01 11:54 被阅读0次

一、背景描述

  1. 功能描述
    简单的WordCount程序,但是利用Python来实现
  2. 输入文件
  • 内容格式
noe two three
noe two three
noe two three
...
  • 文件大小
[hadoop@master ~]$ du -mh test.txt 
1.7G    test.txt
  • mapper实现代码
#!/usr/bin/env python2.6

import sys

# maps words to their counts
word2count = {}

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words while removing any empty strings
    words = filter(lambda word: word, line.split())
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)

  • reducer实现代码
#!/usr/bin/env python2.6
 
from operator import itemgetter
import sys
 
# maps words to their counts
word2count = {}
 
# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
 
    # parse the input we got from mapper.py
    word, count = line.split()
    # convert count (currently a string) to int
    try:
        count = int(count)
        word2count[word] = word2count.get(word, 0) + count
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        pass
 
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
 
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
    print '%s\t%s'% (word, count)

二、优化之前

  1. 运行指令
[hadoop@master ~]$ hadoop jar hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -D mapred.reduce.tasks=3 -input test -output output5 -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py

因为输入文件内容可以分为三类key:one、two、three,所以应该是每类key对应一个reducer,故用-D mapred.reduce.tasks=3指定reducer个数为3。

  1. 运行结果分析
17/03/22 10:27:52 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.236:8032
17/03/22 10:27:52 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.236:8032
...
17/03/22 10:47:21 INFO mapreduce.Job:  map 100% reduce 100%
17/03/22 10:47:21 INFO mapreduce.Job: Job job_1490088877897_0004 completed successfully
17/03/22 10:47:21 INFO mapreduce.Job: Counters: 50

从输出信息可以看到,总共运行了20分钟。
但是从输出文件来看

[hadoop@master ~]$ hdfs dfs -ls output5
Found 4 items
-rw-r--r--   2 hadoop supergroup          0 2017-03-22 10:47 output5/_SUCCESS
-rw-r--r--   2 hadoop supergroup         16 2017-03-22 10:35 output5/part-00000
-rw-r--r--   2 hadoop supergroup          0 2017-03-22 10:31 output5/part-00001
-rw-r--r--   2 hadoop supergroup         28 2017-03-22 10:47 output5/part-00002
[hadoop@master ~]$ hdfs dfs -cat output5/part-00000
three   130808991
[hadoop@master ~]$ hdfs dfs -cat output5/part-00002
noe 130808991
two 130808991

其中有一个reducer的输出大小为0,并且有两个key在同一个输出文件中,即被同一个reducer处理了。而且在运行过程中通过监控系统可以看到有一个reducer几乎是瞬间运行完毕。

三、第一次优化尝试

  1. 原理分析
    从上面的运行结果分析来看,应该是map完后进行Partition的时候hash值重复了,即将noe和two当做了一类key,并输出到了一个reducer中,最终导致“数据倾斜”。所以,我们要做的就是自定义Partition。但是如何在命令行中自定义hash函数呢?Nothing is better than the official document。官方文档给了例子:
hadoop jar hadoop-streaming-2.7.3.jar \
  -D stream.map.output.field.separator=. \
  -D stream.num.map.output.key.fields=4 \
  -D map.output.key.field.separator=. \
  -D mapreduce.partition.keypartitioner.options=-k1,2 \
  -D mapreduce.job.reduces=12 \
  -input myInputDirs \
  -output myOutputDir \
  -mapper /bin/cat \
  -reducer /bin/cat \
  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
Here, -D stream.map.output.field.separator=. and -D stream.num.map.output.key.fields=4 are as explained in previous example. The two variables are used by streaming to identify the key/value pair of mapper.

The map output keys of the above Map/Reduce job normally have four fields separated by “.”. However, the Map/Reduce framework will partition the map outputs by the first two fields of the keys using the -D mapred.text.key.partitioner.options=-k1,2 option. Here, -D map.output.key.field.separator=. specifies the separator for the partition. This guarantees that all the key/value pairs with the same first two fields in the keys will be partitioned into the same reducer.
  1. 优化执行
  • 优化指令
    [hadoop@master ~]$ hadoop jar hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -D mapred.reduce.tasks=3 -input test -output output -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py  -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
  • 运行结果
17/03/22 11:06:36 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.236:8032
17/03/22 11:06:37 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.236:8032
...
17/03/22 11:18:49 INFO mapreduce.Job:  map 100% reduce 100%
17/03/22 11:18:49 INFO mapreduce.Job: Job job_1490088877897_0005 completed successfully
17/03/22 11:18:50 INFO mapreduce.Job: Counters: 51
[hadoop@master ~]$ hdfs dfs -ls output
Found 4 items
-rw-r--r--   2 hadoop supergroup          0 2017-03-22 11:18 output/_SUCCESS
-rw-r--r--   2 hadoop supergroup          0 2017-03-22 11:11 output/part-00000
-rw-r--r--   2 hadoop supergroup         28 2017-03-22 11:18 output/part-00001
-rw-r--r--   2 hadoop supergroup         16 2017-03-22 11:15 output/part-00002
[hadoop@master ~]$ 
[hadoop@master ~]$ hdfs dfs -cat output/part-00001
noe 130808991
two 130808991
[hadoop@master ~]$ hdfs dfs -cat output/part-00002
three   130808991
[hadoop@master ~]$ 

可以看到,总共运行了12分钟,提高了8分钟。但是通过过程监控,以及结果分析,输出文件依然有一个空文件问题依然存在。

三、第二次优化尝试

  1. 原理分析
    其实从上面来看,应该是hadoop自带的partion分区并没有把noe和two分开,因为他们都%redcers,导致很容易被分配到同一个reducer。故需要自定义pation方法。见另一篇文章。

相关文章

网友评论

      本文标题:记一次MapReduce的简单优化

      本文链接:https://www.haomeiwen.com/subject/eqrnlqtx.html