Python开发MapReduce

作者: Crazy_Data | 来源:发表于2017-05-18 11:08 被阅读1114次

    简介

    Hadoop 是 Apache 基金会主动开发的分布式基础架构。对开发人员主要关注HDFS、MapReduce就好。

    HDFS是一个分布式文件系统,由NameNode和DataNode配合完成工作;MapReduce是一种编程模型,Map是把输入(Input) 分解成中间的Key/Value对,Reduce把Key/Value合成最终输出(Output)。MapReduce可以从HDFS中读取数据,并在技术后将数据写入HDFS。

    本文主要介绍如何用Python开发MapReduce任务。

    HDFS

    可以通过hdfs命令访问HDFS文件系统:

    $ hdfs dfs -ls /
    $ hdfs dfs -get /var/log/hadoop.log /tmp/  # 将HDFS的/var/log/hadoop.log拷贝到本机/tmp/目录
    $ hdfs dfs -put /tmp/a /user/hadoop/  # 将本机/tmp/a文件拷贝到HDFS的/user/hadoop/目录下
    $ hdfs dfs # 查看完整命令
    

    Python Client:

    MapReduce

    MapReduce 是一种编程模型,受函数式编程启发。主要由三部分组成:map、shuffle and sort、reduce。

    Hadoop streaming是Hadoop自带的工具,它允许通过任何语言编写MapReduce任务。

    测试

    使用hadoop mapreduce example 测试,确保搭建的环境正常工作:

    $ hdfs dfs -mkdir -p /user/$USER/input
    $ hdfs dfs -put $HADOOP_HOME/libexec/etc/hadoop /user/$USER/input
    $ hadoop jar $HADOOP_HOME/libexec/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.8.0.jar grep input output 'dfs[a-z.]+'
    

    用Python写mapper、reducer来测试:

    mapper.py

    #!/usr/bin/env python
    # encoding: utf-8
    import sys
    
    
    for line in sys.stdin:
      words = line.split()
      for word in words:
          print('{}\t{}'.format(word.strip(), 1))
    

    reducer.py

    #!/usr/bin/env python
    # encoding: utf-8
    import sys
    
    
    curr_word = None
    curr_count = 0
    
    for line in sys.stdin:
      word, count = line.split('\t')
      count = int(count)
    
      if word == curr_word:
          curr_count += count
      else:
          if curr_word:
              print('{}\t{}'.format(curr_word, curr_count))
          curr_word = word
          curr_count = count
    
    if curr_word == word:
      print('{}\t{}'.format(curr_word, curr_count))
    

    执行Python写的mapper、reducer:

    $ hadoop jar $HADOOP_HOME/libexec/share/hadoop/tools/lib/hadoop-streaming-2.8.0.jar -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input input -output output
    

    Hadoop streaming的命令行参数如下:

    • -files: A command-separated list of les to be copied to the MapReduce cluster
    • -mapper: The command to be run as the mapper
    • -reducer: The command to be run as the reducer
    • -input: The DFS input path for the Map step
    • -output: The DFS output directory for the Reduce step

    mrjob

    安装

    $ pip install mrjob
    

    用mrjob写wordcount

    代码如下:

    $ cat word_count.py
    #!/usr/bin/env python
    # encoding: utf-8
    from mrjob.job import MRJob
    
    
    class MRWordCount(MRJob):
    
        def mapper(self, _, line):
            for word in line.split():
                yield(word, 1)
    
        def reducer(self, word, counts):
            yield(word, sum(counts))
    
    
    if __name__ == '__main__':
        MRWordCount.run()s
    

    本机运行:

    $ python word_count.py data.txt
    

    Hadoop上运行:

    $ python word_count.py -r hadoop hdfs:///user/wm/input/data.txt
    

    mrjob runner 可选值:

    • -r inline: (Default) Run in a single Python process
    • -r local: Run locally in a few subprocesses simulating some Hadoop features
    • -r hadoop: Run on a Hadoop cluster
    • -r emr: Run on Amazon Elastic Map Reduce (EMR)

    网上交流

    QQ群 个人微信
    QQ群 添加我的微信,我邀请您加入微信群

    相关文章

      网友评论

        本文标题:Python开发MapReduce

        本文链接:https://www.haomeiwen.com/subject/ufcsxxtx.html