美文网首页机器学习与数据挖掘大数据&云计算大数据
MRJob 十分钟入门: 用 Python 轻松运行 MapRe

MRJob 十分钟入门: 用 Python 轻松运行 MapRe

作者: 郭威廉 | 来源:发表于2018-03-20 11:23 被阅读3522次

    概览

    前言

    本教程取材翻译于mrjob v0.5.10 documentation。有删减。最近在学mapreduce, 用到mrjob,在网上没有找到好的中文教程,就自己翻译了一下官方文档的重点。

    简介

    mrjob是用来写能在hadoop运行的python程序的最简便方法。其最突出的特点就是在mrjob的帮助下,无需安装hadoop或部署任何集群,我们可以在本地机器上运行代码(进行测试)。同时,mrjob可以轻松运行于Amazon Elastic MapReduce。
    为了达到简便实用的目的,一些功能被mrjob省去了。如果追求更多的功能,可以尝试Dumbo,Pydoop等package。

    安装

    使用pip安装。

    pip install mrjob
    

    anaconda的使用者推荐使用conda安装。

    conda install -c conda-forge mrjob 
    

    第一个mrjob程序

    在这里我们使用一个统计文本中字符数的程序fc.py作为例子。

    from mrjob.job import MRJob
    
    class FrequencyCount(MRJob):
        def mapper(self, _, line):
            yield "chars", len(line)
            yield "words", len(line.split())
            yield "lines", 1
        
        def reducer(self, key, values):
            yield key, sum(values)
        
    if __name__ == '__main__':
        FrequencyCount.run()
    

    要运行该程序,只需在命令行中运行

    python fc.py testfile.txt
    

    即可。其中testfile.txt可以是任意文本文件。
    接下来我们简单地解释这段程序。 mrjob中所有的任务都是通过一个继承MRJob的类来定义的。在这个类中,可以包含mapper,combiner和reducer。这三个函数的参数均是一个(key, value)键值对。在本mapper函数中,键被忽略(写作_),值为文本的每一行line。在reducer中,对mapper生成的每一个键(chars,words,lines)求和,生成的和为对应的值输出。
    另一个注意点是最后的if判断,该if判断是必须的。在这个if判断中,mrjob才明确我们的目标(job class)是什么。

    MapReduce简介

    mapreduce是一种用来在分布式系统上处理海量数据的系统。其基础是MapReduce: Simplified Data Processing on Large Clusters这篇论文。mapreduce将海量数据分成小的数据集,并行地进行相同的任务,最后将所有的子结果整理并合并成最终的结果。其中拆分数据进行相同的步骤称为mapper,后面合并整理的步骤称为reducer。而combiner可以看作是一个优化器,但不是必须的。

    编写任务脚本

    一步任务

    一步任务(one step job)是最简单的mrjob脚本,前文中第一个mrjob程序 fc.py 就是一个一步工作脚本。
    要编写一步工作脚本,只需继承(subclass)MRjob类,并覆盖(override)mapper, combiner, reducer 等方法即可。

    多步任务

    在编写多步任务(Multi step job)时,需要覆盖steps方法,并在step中返回一个由mapper, combiner, reducer等组成的list。
    以下是一个多步任务的例子。在这个例子中,输入文件中的最高频词汇将被输出。

    from mrjob.job import MRJob
    from mrjob.step import MRStep
    import re
    
    WORD_RE = re.compile(r"[\w']+")
    
    class MRMostUsedWord(MRJob):
        
        def steps(self):
            return[
                MRStep(mapper = self.mapper_get_words,
                       combiner = self.combiner_count_words,
                       reducer = self.reducer_count_words),
                MRStep(reducer = self.reducer_find_max_word)
            ]
        
        def mapper_get_words(self,_,line):
            #yield each word in the line
            for word in WORD_RE.findall(line):
                yield (word.lower(),1)
        
        def combiner_count_words(self, word, counts):
            #optimization: count the words we have seen so far
            yield (word, sum(counts))
        
        def reducer_count_words(self, word, counts):
            #send all (num_occupences, word) pairs to same reducer
            #use sum(num_occupences) to get the total num of occupences of each word
            yield None, (sum(counts), word)
        
        def reducer_find_max_word(self, _, word_count_pairs):
            #none key in this function because in reducer_count_words we discard the key
            #each item of word_count_pairs is (count, word), yield one result: the value(word) of max count
            yield max(word_count_pairs)
    
    #never forget       
    if __name__ == '__main__':
        MRMostUsedWord.run()
    

    在step方法中共返回4个mapper, combiner 和 reducer。

    开始和结束

    在任务的开始前和结束后,可以通过特定的方法进行设置:*_init() 和 *_final()方法,前面的 * 可以是mapper, combiner, reducer 任意一种。

    命令行语句的使用

    第一种用法是在任务前先单独运行一条命令行指令,通过将*_cmd设置为参数传入MRStep或在MRJob中覆盖同名方法。
    另一种用法是用命令行指令过滤(filter)输入文件,方法是在MRStep中加入mapper_pre_filterreducer_pre_filter,或在MRJob类中覆盖同名的方法。mapper_pre_filter='grep "kitty"'就表示在mapper前,只输入含有kitty的行。

    协议

    协议(Protocol)主要是关于mrjob中数据的格式。每一个任务都有input protocol , output protocol 和 internal protocol。
    每一个协议都有read()write()方法,read()将原始数据的字节转化为python使用的键值对,write()将python使用的键值对转化回字节。
    input protocol用来将input的字节读入第一个mapper(当不存在mapper时读入第一个reducer),output protocol用来最后输出output,internal protocol是将一步的输出转化成下一步的输入。
    以上三种协议在应用中都可以自己设置,同时使用者也可以编写完全不同的全新协议。

    运行器Runner

    简介

    MRJob类可以将任务置于MapReduce框架下运行,而运行器Runner包装并提交任务,在不同的环境下运行任务,并向使用者报告运行结果。
    通常情况下,使用者是通过命令行以及设置文件(configuration file)与运行器进行交互的。当使用者通过命令行运行程序时,程序会根据不同的参数--runner去创建不同的运行器,使任务运行在不同的环境。
    使用者一般不需要手动创建运行器,当程序运行时,会自动为任务生成运行器。当然,使用者也可以用my_job.make_runner()手动创建运行器。

    本地环境运行

    要在本地运行任务,只需要使用如下代码。

    python your_mr_job_sub_class.py <log_file_or_whatever> output
    

    使用者也可以单独运行若干个步骤。

    python your_mr_job_sub_class.py --mapper
    

    这行代码只运行了任务中的mapper部分。

    Hadoop集群环境运行

    首先对Hadoop集群进行设置.
    接下来在运行任务时加入-r Hadoop参数。

    python your_mr_job_sub_class.py -r Hadoop input output
    

    EMR环境运行

    首先设置aws.
    接下来在运行任务时加入-r emr参数。

    python your_mr_job_sub_class.py -r emr input output
    

    Dataproc环境运行

    首先谷歌云平台Google Cloud Platform.
    接下来在运行任务时加入-r dataproc参数。

    python your_mr_job_sub_class.py -r dataproc input output
    

    手动编写运行器脚本

    使用者可以手动编写运行器脚本,并在这个脚本中用make_runner()运行Runner来调用其他脚本中的任务。
    手动编写运行器也常被用在测试中。

    mr_job = MRWordCounter(args=['-r', 'emr'])
    with mr_job.make_runner() as runner:
        runner.run()
        for key, value in mr_job.parse_output(runner.cat_output()):
            ..# do something with the parsed output
    

    在这段代码中,使用者实例化了一个MRJob,用make_runner创建了一个Runner,用runner.run()运行任务,并利用cat_output()将结果转化为一个字节流(bytes stream),最后用parse_output将字节流进行解析。
    在手动编写运行器脚本时,必须格外注意,绝对不能将用来生成运行器的make_runner和描述任务的类文件(job class)放在一个文件中。
    以下是一个错误的示范:

    from mrjob.job import MRJob
    
    class MyJob(MRJob):
        # (your job)
    
    # no, stop, what are you doing?!?!
    mr_job = MyJob(args=[args])
    with mr_job.make_runner() as runner:
        runner.run()
        # ... etc
    

    运行这段代码,将导致类似以下的报错信息出现

    UsageError: make_runner() was called with --steps. This probably means you
                tried to use it from __main__, which doesn't work.
    

    正确的做法是将你的任务放入一个脚本,将手动编写的运行器放入另一个。以下是对刚才错误示范进行修正的两个对应文件.

    # job.py
    from mrjob.job import MRJob
    
    class MyJob(MRJob):
        # (your job)
    
    if __name__ == '__main__':
        MyJob.run()
    
    
    # run.py
    from job import MyJob
    mr_job = MyJob(args=[args])
    with mr_job.make_runner() as runner:
        runner.run()
        # ... etc
    

    后记

    在官方文档中还包含Spark和emr在mrjob中的使用等内容,因为精力有限,没有写进这篇教程中。我是第一次进行这种翻译教程的工作,因为能力有限,不免有些疏漏。如果有翻译和内容错误之处,欢迎大家指正。
    如果用微信手机端的阅读体验不佳,欢迎点击链接,来我的博客阅读。

    相关文章

      网友评论

        本文标题:MRJob 十分钟入门: 用 Python 轻松运行 MapRe

        本文链接:https://www.haomeiwen.com/subject/dovvqftx.html