简介
Hadoop 是 Apache 基金会主动开发的分布式基础架构。对开发人员主要关注HDFS、MapReduce就好。
HDFS是一个分布式文件系统,由NameNode和DataNode配合完成工作;MapReduce是一种编程模型,Map是把输入(Input) 分解成中间的Key/Value对,Reduce把Key/Value合成最终输出(Output)。MapReduce可以从HDFS中读取数据,并在技术后将数据写入HDFS。
本文主要介绍如何用Python开发MapReduce任务。
HDFS
可以通过hdfs命令访问HDFS文件系统:
$ hdfs dfs -ls /
$ hdfs dfs -get /var/log/hadoop.log /tmp/ # 将HDFS的/var/log/hadoop.log拷贝到本机/tmp/目录
$ hdfs dfs -put /tmp/a /user/hadoop/ # 将本机/tmp/a文件拷贝到HDFS的/user/hadoop/目录下
$ hdfs dfs # 查看完整命令
Python Client:
MapReduce
MapReduce 是一种编程模型,受函数式编程启发。主要由三部分组成:map、shuffle and sort、reduce。
Hadoop streaming是Hadoop自带的工具,它允许通过任何语言编写MapReduce任务。
测试
使用hadoop mapreduce example 测试,确保搭建的环境正常工作:
$ hdfs dfs -mkdir -p /user/$USER/input
$ hdfs dfs -put $HADOOP_HOME/libexec/etc/hadoop /user/$USER/input
$ hadoop jar $HADOOP_HOME/libexec/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.0.jar grep input output 'dfs[a-z.]+'
用Python写mapper、reducer来测试:
mapper.py
#!/usr/bin/env python
# encoding: utf-8
import sys
for line in sys.stdin:
words = line.split()
for word in words:
print('{}\t{}'.format(word.strip(), 1))
reducer.py
#!/usr/bin/env python
# encoding: utf-8
import sys
curr_word = None
curr_count = 0
for line in sys.stdin:
word, count = line.split('\t')
count = int(count)
if word == curr_word:
curr_count += count
else:
if curr_word:
print('{}\t{}'.format(curr_word, curr_count))
curr_word = word
curr_count = count
if curr_word == word:
print('{}\t{}'.format(curr_word, curr_count))
执行Python写的mapper、reducer:
$ hadoop jar $HADOOP_HOME/libexec/share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input input -output output
Hadoop streaming的命令行参数如下:
-
-files
: A command-separated list of les to be copied to the MapReduce cluster -
-mapper
: The command to be run as the mapper -
-reducer
: The command to be run as the reducer -
-input
: The DFS input path for the Map step -
-output
: The DFS output directory for the Reduce step
mrjob
安装
$ pip install mrjob
用mrjob写wordcount
代码如下:
$ cat word_count.py
#!/usr/bin/env python
# encoding: utf-8
from mrjob.job import MRJob
class MRWordCount(MRJob):
def mapper(self, _, line):
for word in line.split():
yield(word, 1)
def reducer(self, word, counts):
yield(word, sum(counts))
if __name__ == '__main__':
MRWordCount.run()s
本机运行:
$ python word_count.py data.txt
Hadoop上运行:
$ python word_count.py -r hadoop hdfs:///user/wm/input/data.txt
mrjob runner 可选值:
-
-r inline
: (Default) Run in a single Python process -
-r local
: Run locally in a few subprocesses simulating some Hadoop features -
-r hadoop
: Run on a Hadoop cluster -
-r emr
: Run on Amazon Elastic Map Reduce (EMR)
网上交流
QQ群 | 个人微信 |
---|---|
|
|
网友评论