参照 https://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
0. 前言
尽管hadoop的底层实现是用java,但不限于java,hadoop可以使用python,c++,ruby,perl进行编程。利用python的sys.stdin读取输入数据,并把输入传递到sys.stdout,其他的工作Hadoop的流API会为我们处理。具体使用python编程技巧如下
The “trick” behind the following Python code is that we will use the Hadoop Streaming API (see also the corresponding wiki entry) for helping us passing data between our Map and Reduce code via
STDIN
(standard input) andSTDOUT
(standard output). We will simply use Python’ssys.stdin
to read input data and print our own output tosys.stdout
. That’s all we need to do because Hadoop Streaming will take care of everything else!
1. 编写mapper.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
for line in sys.stdin:
"通过sys.stdin读取文件"
line = line.strip()
words = line.split()
for word in words:
print "%s\t%s" %(word, 1)
2. 编写reduce.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
line = line.strip()
word, count = line.split("\t")
try:
count = int(count)
except ValueError:
continue
if current_word == word:
current_count += count
else:
if current_word:
print "%s\t%s" % (current_word, current_count)
current_count = count
current_word = word
# 计算最后一个输出的单词
if current_word == word:
print "%s\t%s" % (current_word, current_count)
3. 测试代码(cat data | map | sort | reduce)
(1) 执行 echo "foo foo quux labs foo bar quux" | python mapper.py
foo 1
foo 1
quux 1
labs 1
foo 1
bar 1
quux 1
(2) 执行 echo "foo foo quux labs foo bar quux" | python mapper.py | sort -k1 | python reduce.py
bar 1
foo 3
labs 1
quux 2
4. 在hadoop上运行代码
~/hadoop-client/hadoop/bin/hadoop streaming -D mapred.job.name="test-hadoop" -D mapred.job.priority=VERY_HIGH -D mapred.reduce.tasks=1 -D mapred.job.map.capacity=1000 -D mapred.job.reduce.capacity=1000 -file ./mapper.py -file ./reducer.py -mapper ./mapper.py -reducer ./reducer.py -input `/xxxxxx/test.txt` -output `/xxxxx/myoutput2`
参数说明
Hadoop本身是用Java开发的,程序也需要用Java编写,但是通过Hadoop Streaming,我们可以使用任意语言来编写程序,让Hadoop运行MapReduce程序。
-input和-output 为hdfs路径,且output路径应该为不存在的路径;
-mapper和-reducer 中py需加python *.py
或者
-file为必需项,将本地*.py文件打包放到集群上,供集群其他机器执行
这样就可以运行了~
网友评论