Mapper程序
import sys
def read_input(file):
for line in file:
yield line.split()
def main():
data = read_input(sys.stdin)
for words in data:
for word in words:
print("%s%s%d" %(word,'\t',1))
if __name__ == "__main__":
main()
分割单词,以一下形式输出
a 1
b 1
c 1
a 1
reducer程序,统计词语频率
import sys
from operator import itemgetter
from itertools import groupby
def read_mapper_output(file,separator='\t'):
for line in file:
yield line.rstrip().split(separator,1)
def main():
data =read_mapper_output(sys.stdin)
for current_word,group in groupby(data,itemgetter(0)):
total_count = sum(int(count) for current_word,count in group)
print("%s %s %d" %(current_word,'\t',total_count))
if __name__ =='__main__':
main()
本地运行测试,命令行输入
echo "a b c d e"|python MapTest.py|python ReduceTest
确认无误后提交到集群上执行,输入命令
/usr/local/hadoop/hadoop-2.8.3/bin/hadoop
jar
/usr/local/hadoop/hadoop-2.8.3/share/hadoop/tools/lib/hadoop-streaming-2.8.3.jar
-files "/home/tobin/PycharmProjects/untitled/MapTest.py,/home/tobin/PycharmProjects/untitled/ReduceTest.py"
-input /LICENSE
-output /tmp/wordcounttest
-mapper "python MapTest.py"
-reducer "python ReduceTest.py"
-files
:将map和reduce程序(这里最好使用绝对路径,不然可能出错)提交到集群中,-input
和-output
指定的输入输出文件都在hdfs中,-reducer
指定reduce程序,-mappe
r指定map程序
/tmp/wordcounttest文件夹下有两个文件,一个是输出文件,另一个是状态信息
结果类似下面:
own 4
owner 4
owner. 1
ownership 2
page" 1
part 4
patent 5
patent, 1
percent 1
perform, 1
permission 1
permissions 3
perpetual, 2
pertain 2
places: 1
possibility 1
power, 1
preferred 1
prepare 1
product 1
prominent 1
provide 1
provided 5
provides 2
publicly 2
purpose 2
purposes 4
readable 1
reason 1
reasonable 1
received 1
网友评论