一、背景描述
- 功能描述
简单的WordCount程序,但是利用Python来实现 - 输入文件
- 内容格式
noe two three
noe two three
noe two three
...
- 文件大小
[hadoop@master ~]$ du -mh test.txt
1.7G test.txt
- mapper实现代码
#!/usr/bin/env python2.6
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words while removing any empty strings
words = filter(lambda word: word, line.split())
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print '%s\t%s' % (word, 1)
- reducer实现代码
#!/usr/bin/env python2.6
from operator import itemgetter
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split()
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print '%s\t%s'% (word, count)
二、优化之前
- 运行指令
[hadoop@master ~]$ hadoop jar hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -D mapred.reduce.tasks=3 -input test -output output5 -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py
因为输入文件内容可以分为三类key:one、two、three,所以应该是每类key对应一个reducer,故用-D mapred.reduce.tasks=3指定reducer个数为3。
- 运行结果分析
17/03/22 10:27:52 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.236:8032
17/03/22 10:27:52 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.236:8032
...
17/03/22 10:47:21 INFO mapreduce.Job: map 100% reduce 100%
17/03/22 10:47:21 INFO mapreduce.Job: Job job_1490088877897_0004 completed successfully
17/03/22 10:47:21 INFO mapreduce.Job: Counters: 50
从输出信息可以看到,总共运行了20分钟。
但是从输出文件来看
[hadoop@master ~]$ hdfs dfs -ls output5
Found 4 items
-rw-r--r-- 2 hadoop supergroup 0 2017-03-22 10:47 output5/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 16 2017-03-22 10:35 output5/part-00000
-rw-r--r-- 2 hadoop supergroup 0 2017-03-22 10:31 output5/part-00001
-rw-r--r-- 2 hadoop supergroup 28 2017-03-22 10:47 output5/part-00002
[hadoop@master ~]$ hdfs dfs -cat output5/part-00000
three 130808991
[hadoop@master ~]$ hdfs dfs -cat output5/part-00002
noe 130808991
two 130808991
其中有一个reducer的输出大小为0,并且有两个key在同一个输出文件中,即被同一个reducer处理了。而且在运行过程中通过监控系统可以看到有一个reducer几乎是瞬间运行完毕。
三、第一次优化尝试
- 原理分析
从上面的运行结果分析来看,应该是map完后进行Partition的时候hash值重复了,即将noe和two当做了一类key,并输出到了一个reducer中,最终导致“数据倾斜”。所以,我们要做的就是自定义Partition。但是如何在命令行中自定义hash函数呢?Nothing is better than the official document。官方文档给了例子:
hadoop jar hadoop-streaming-2.7.3.jar \
-D stream.map.output.field.separator=. \
-D stream.num.map.output.key.fields=4 \
-D map.output.key.field.separator=. \
-D mapreduce.partition.keypartitioner.options=-k1,2 \
-D mapreduce.job.reduces=12 \
-input myInputDirs \
-output myOutputDir \
-mapper /bin/cat \
-reducer /bin/cat \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
Here, -D stream.map.output.field.separator=. and -D stream.num.map.output.key.fields=4 are as explained in previous example. The two variables are used by streaming to identify the key/value pair of mapper.
The map output keys of the above Map/Reduce job normally have four fields separated by “.”. However, the Map/Reduce framework will partition the map outputs by the first two fields of the keys using the -D mapred.text.key.partitioner.options=-k1,2 option. Here, -D map.output.key.field.separator=. specifies the separator for the partition. This guarantees that all the key/value pairs with the same first two fields in the keys will be partitioned into the same reducer.
- 优化执行
- 优化指令
[hadoop@master ~]$ hadoop jar hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar -D mapred.reduce.tasks=3 -input test -output output -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
- 运行结果
17/03/22 11:06:36 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.236:8032
17/03/22 11:06:37 INFO client.RMProxy: Connecting to ResourceManager at master/10.10.18.236:8032
...
17/03/22 11:18:49 INFO mapreduce.Job: map 100% reduce 100%
17/03/22 11:18:49 INFO mapreduce.Job: Job job_1490088877897_0005 completed successfully
17/03/22 11:18:50 INFO mapreduce.Job: Counters: 51
[hadoop@master ~]$ hdfs dfs -ls output
Found 4 items
-rw-r--r-- 2 hadoop supergroup 0 2017-03-22 11:18 output/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 0 2017-03-22 11:11 output/part-00000
-rw-r--r-- 2 hadoop supergroup 28 2017-03-22 11:18 output/part-00001
-rw-r--r-- 2 hadoop supergroup 16 2017-03-22 11:15 output/part-00002
[hadoop@master ~]$
[hadoop@master ~]$ hdfs dfs -cat output/part-00001
noe 130808991
two 130808991
[hadoop@master ~]$ hdfs dfs -cat output/part-00002
three 130808991
[hadoop@master ~]$
可以看到,总共运行了12分钟,提高了8分钟。但是通过过程监控,以及结果分析,输出文件依然有一个空文件问题依然存在。
三、第二次优化尝试
- 原理分析
其实从上面来看,应该是hadoop自带的partion分区并没有把noe和two分开,因为他们都%redcers,导致很容易被分配到同一个reducer。故需要自定义pation方法。见另一篇文章。
网友评论