大数据Hadoop工具python教程9-Luigi工作流

作者: python测试开发 | 来源:发表于2019-01-22 17:26 被阅读29次

    管理Hadoop作业的官方工作流程调度程序是Apache Oozie。与许多其他Hadoop产品一样,Oozie是用Java编写的,是基于服务器的Web应用程序,它运行执行Hadoop MapReduce和Pig的工作流作业。 Oozie工作流是在XML文档中指定的控制依赖性指导非循环图(DAG)中排列的动作集合。虽然Oozie在Hadoop社区中有很多支持,但通过XML属性配置工作流和作业的学习曲线非常陡峭。

    Luigi是Spotify创建的Python替代方案,可以构建和配置复杂的批处理作业管道。它处理依赖项解析,工作流管理,可视化等等。它还拥有庞大的社区,并支持许多Hadoop技术。在github上超过1万星。

    本章介绍Luigi的安装和工作流程的详细说明。

    安装

    pip install luigi
    

    工作流

    在Luigi中,工作流由一系列操作组成,称为任务。 Luigi任务是非特定的,也就是说,它们可以是任何可以用Python编写的东西。任务的输入和输出数据的位置称为目标(target)。目标通常对应于磁盘上,HDFS上或数据库中的文件位置。除了任务和目标之外,Luigi还利用参数来自定义任务的执行方式。

    • 任务

    任务是构成Luigi工作流的操作序列。每个任务都声明其依赖于其他任务创建的目标。这样Luigi能够创建依赖链。

    图片.png
    • 目标

    目标是任务的输入和输出。最常见的目标是磁盘上的文件,HDFS中的文件或数据库中的记录。 Luigi包装了底层文件系统操作,以确保与目标的交互是原子的。这允许从故障点重放工作流,而不必重放任何已经成功完成的任务。

    • 参数

    参数允许通过允许值从命令行,以编程方式或从其他任务传递任务来自定义任务。例如,任务输出的名称可以通过参数传递给任务的日期来确定。

    参考资料

    工作流本示例

    #!/usr/bin/env python
    # 项目实战讨论QQ群630011153 144081101
    # https://github.com/china-testing/python-api-tesing
    import luigi
    
    class InputFile(luigi.Task):
       """
       A task wrapping a Target 
       """
       input_file = luigi.Parameter()
    
       def output(self):
          """
          Return the target for this task
          """
          return luigi.LocalTarget(self.input_file)
    
    class WordCount(luigi.Task):
       """
       A task that counts the number of words in a file
       """
       input_file = luigi.Parameter()
       output_file = luigi.Parameter(default='/tmp/wordcount')
    
       def requires(self):
          """
          The task's dependencies:
          """
          return InputFile(self.input_file)
    
       def output(self):
          """
          The task's output
          """
          return luigi.LocalTarget(self.output_file)
    
       def run(self):
          """
          The task's logic
          """
          count = {}
    
          ifp = self.input().open('r')
    
          for line in ifp:
             for word in line.strip().split():
                count[word] = count.get(word, 0) + 1
    
          ofp = self.output().open('w')
          for k, v in count.items():
                ofp.write('{}\t{}\n'.format(k, v))
          ofp.close()
    
    if __name__ == '__main__':
       luigi.run()
    

    执行

    $ python wordcount.py WordCount --local-scheduler --input-file /home/hduser_/input2.txt --output-file /tmp/wordcount2.txt
    DEBUG: Checking if WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt) is complete
    DEBUG: Checking if InputFile(input_file=/home/hduser_/input2.txt) is complete
    INFO: Informed scheduler that task   WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2   has status   PENDING
    INFO: Informed scheduler that task   InputFile__home_hduser__in_0eced493f7   has status   DONE
    INFO: Done scheduling tasks
    INFO: Running Worker with 1 processes
    DEBUG: Asking scheduler for work...
    DEBUG: Pending tasks: 1
    INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) running   WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
    INFO: [pid 21592] Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) done      WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
    DEBUG: 1 running tasks, waiting for next task to finish
    INFO: Informed scheduler that task   WordCount__home_hduser__in__tmp_wordcount2__a94efba0f2   has status   DONE
    DEBUG: Asking scheduler for work...
    DEBUG: Done
    DEBUG: There are no more tasks to run at this time
    INFO: Worker Worker(salt=067173106, workers=1, host=andrew-PC, username=hduser_, pid=21592) was stopped. Shutting down Keep-Alive thread
    INFO: 
    ===== Luigi Execution Summary =====
    
    Scheduled 2 tasks of which:
    * 1 complete ones were encountered:
        - 1 InputFile(input_file=/home/hduser_/input2.txt)
    * 1 ran successfully:
        - 1 WordCount(input_file=/home/hduser_/input2.txt, output_file=/tmp/wordcount2.txt)
    
    This progress looks :) because there were no failed tasks or missing dependencies
    
    ===== Luigi Execution Summary =====
    
    hduser_@andrew-PC:/home/andrew/code/HadoopWithPython/python/Luigi$ cat /tmp/wordcount2.txt
    jack    2
    be  2
    nimble  1
    quick   1
    
    

    相关文章

      网友评论

        本文标题:大数据Hadoop工具python教程9-Luigi工作流

        本文链接:https://www.haomeiwen.com/subject/whtrjqtx.html