python 实现hadoop的mapreduce

作者: 数据蛙datafrog | 来源:发表于2018-05-04 16:46 被阅读417次

    为了用python实现mapreduce,我们先引入下面两个个知识

    • sys.stdin()
    • itertools之groupby

    sys模块的简单学习

    sys.stdin 是一个文件描述符,代表标准输入,不需使用open函数打开,就可以使用
    例如下面的简单程序

      # coding=utf-8
      import sys
     
      for line in sys.stdin:
          print line
    

    执行命令

    cat /etc/passwd | python test_stdin.py
    

    cat 是查看 /etc/passwd 文件内容,然后通过管道符 | 传给python程序,这样就可以一行一行的输出内容了(这里passwd是以一个列表的形式传到了test_stidin.py中了)

    延伸学习

    >>>sys.stdout.write('hello'+'\n') 
    hello
    
    #这时屏幕会打印出"hello"字符串(记得还有换行符啊)
    

    那这与Python语言中打印对象调用print obj有什么区别呢,其是事实上就调用了下面的函数sys.stdout.write(obj+"\n")

    下面是等价的

    1 sys.stdout.write('hello'+'\n') 
    2 
    3 print 'hello'
    

    下面我们再来看sys.stdin与raw_input这两个标准输入的区别

    当我们使用raw_input("Input promption:")时,事实上是先把提示信息输出,然后捕获输入
    以下两组在事实上等价:

    >>>hi=raw_input('hello? ')
     hello?kaixuan  #这里是需要从键盘上输入
    >>>hi
    kaixuan
    
    >>print 'hello? '
    hello?
    >>>hi=sys.stdin.readline()[:-1] # -1 to discard the '\n' in input stream
    kaixuan   #键盘上输入
    >>>hi
    kaixuan
    

    itertools之groupby

    groupby()把迭代器中相邻的重复元素挑出来放在一起,实际上挑选规则是通过函数完成的,只要作用于函数的元素返回的值相等,这元素就被认为是在一组的,而函数返回值作为组的key。

    from itertools import groupby
    from operator import itemgetter
    from collections import Iterator
    d1 = ['python', 100]
    d2 = ['python', 99]
    d3 = ['c++', 99]
    d4 = ['c++',99]
    d5 = ['python', 100]
    d = [d1, d2, d3, d4, d5]
    d.sort(key=lambda x:x[0], reverse = False )#分组之前先进行排序,改变了已经存在的列表,注意与sorted函数的区别
    #排序后[['c++', 99], ['c++', 99], ['python', 100], ['python', 99], ['python', 100]]
    lstg = groupby(d,lambda x:x[0])
    for key, group in lstg:
        print (key,(isinstance(group,Iterator)))
        for g in group:         #group是一个迭代器
            print (g)
    
    #输出
    c++ True
    ['c++', 99]
    ['c++', 99]
    python True
    ['python', 100]
    ['python', 99]
    ['python', 100]
    

    延伸itemgetter
    operator模块提供的itemgetter函数用于获取对象的哪些维的数据,参数为一些序号(对于列表):

    a = [1,2,3] 
    >>> b=operator.itemgetter(1)      //定义函数b,获取对象的第1个域的值,
    >>> b(a) 
    2
    >>> b=operator.itemgetter(1,0)  //定义函数b,获取对象的第1个域和第0个的值
    >>> b(a) 
    (2, 1)
    

    思考题:对列表是这样的操作,那么对于dict呢?
    上面的sort排序中,我们使用的是lambda函数,如果换成itemgetter又是怎么样呢?(下面的连接中会有相应的答案)

    下面是正式开始用Python写mapreduce

    创建文件words

    python|thread|process
    python|xlrd|pyinotiy
    python|print|c++
    c++|java|php
    node.js|javascript|go
    

    上传到hdfs上

    hadoop fs -put ./words.txt /storage/kaixuan/data/
    

    编写mapper文件
    mapper.py内容如下

    # coding=utf-8
    import sys
     
    for line in sys.stdin:
    words = line.strip().split('|')
    for word in words:
        print word
    

    Hadoop上面运行,就是使用 HadoopStreaming 来让数据在map 和 reduce之间传递数据,使用sys.stdin获得输入数据,而print 等同于 sys.stdout ,作为标准输出

    编写reducer文件

    reducer.py内容如下

    # coding=utf-8
    import sys
    from operator import itemgetter
    from itertools import groupby
    
    def read_mapper_output(files, separator='\t'):
        for line in files:
            yield line.strip().split(separator, 1)
    
    def main():
        data = read_mapper_output(sys.stdin)
        for key, data in groupby(data, itemgetter(0)):
            count = 0
            for value in data:
                count += 1
            print "{word}\t{count}".format(word=key, count=count)
    
    if __name__ == '__main__':
        main()
    

    在hadoop集群上运行代码

    编写run.sh

    #!/bin/bash
    hadoop fs -rm -r -f /storage/kaixuan/data/wordcount
    hadoop jar /usr/hdp/2.6.1.0-129/hadoop-mapreduce/hadoop-streaming.jar \
    -jobconf mapreduce.reduce.shuffle.memory.limit.percent=0.1 \
    -jobconf mapreduce.reduce.shuffle.input.buffer.percent=0.3 \
    -jobconf mapreduce.map.memory.mb=512 \
    -jobconf mapreduce.reduce.memory.mb=512 \
    -jobconf mapred.map.capacity=100 \
    -jobconf mapred.reduce.capacity=100 \
    -jobconf mapred.job.name=test_word_count \
    -file ./mapper.py     -mapper  "python mapper.py"  \
    -file ./reducer.py    -reducer "python reducer.py"  \
    -input /storage/kaixuan/data/words.txt -output /storage/zhaoning/data/wordcount
    

    执行结束后,将文件从hadoop上下载下来

    hadoop fs -getmerge /storage/kaixuan/data/wordcount wordcount
    

    cat ./wordcount 内容为

    xlrd 1
    print 1
    python 3
    javascript 1
    process 1
    c++ 2
    pyinotiy 1
    java 1
    php 1
    go 1
    thread 1
    node.js 1
    

    延伸:Hadoop Streaming的作用
    Hadoop Streaming框架,让任何语言编写的map, reduce程序能够在hadoop集群上运行;map/reduce程序只要遵循从标准输入stdin读,写出到标准输出stdout即可

    其次,容易进行单机调试,通过管道前后相接的方式就可以模拟streaming, 在本地完成map/reduce程序的调试

    # cat inputfile | mapper | sort | reducer > output
    

    最后,streaming框架还提供了作业提交时的丰富参数控制,直接通过streaming参数,而不需要使用java语言修改;很多mapreduce的高阶功能,都可以通过steaming参数的调整来完成

    参考:
    标准输入stdin
    python重定向sys.stdin、sys.stdout
    python中sort和sorted的区别
    Python中的分组函数(groupby、itertools)
    itertools 模块学习(一)
    Python教程:itemgetter对列表排序
    用python 写mapreduce程序
    Hadoop Streaming详解

    相关文章

      网友评论

        本文标题:python 实现hadoop的mapreduce

        本文链接:https://www.haomeiwen.com/subject/pmrqrftx.html