美文网首页
luigi使用 - multiple pipeline

luigi使用 - multiple pipeline

作者: _Megamind_ | 来源:发表于2018-07-05 17:56 被阅读0次

    一般地,通常使用luigi框架搭建流程都是只有一个pipeline(暂时没有通过google找到有多个Pipeline的教程)

    由于工作需要,需要把之前写好的多个流程串联起来作为一个总的pipeline,并且各个pipeline之间有一定的依赖关系


    假设有 pipeline_1, pipeline_2, pipeline_3 三个子流程(可单独运行),结构如下:

      class TaskSon(luigi.Task):
        def run(self):
          pass
        def output(self):
          return luigi.LocalTarget("tmp")
    
      class workflow(luigi.Task):
        def required(self):
          return TaskSon()
    

    同时,有主流程main_pipeline,结构如下:

      class Pipeline1_Task(luigi.Task):
        def run(self):
          # 执行子流程 pipeline_1
          pass
        def output(self):
          # 返回子流程 pipeline_1 的输出
          pass
    
      class Pipeline2_Task(luigi.Task):
        def required(self):
          # 依赖于子流程 pipeline_1 的输出
          return Pipeline1_Task()
        def run(self):
          # 执行子流程 pipeline_2
          pass
        def output():
          # 返回子流程 pipeline_2 的输出
          pass
    
      class Pipeline3_Task(luigi.Task):
        def required(self):
          # 依赖于子流程 pipeline_2 的输出
          return Pipeline2_Task()
        def run(self):
          # 执行子流程 pipeline_3
          pass
        def output(self):
          # 返回子流程 pipeline_3 的输出
          pass
    
      class workflow(luigi.Task):
        def required(self):
          return Pipeline3_Task()
    
    Screenshot.png

    这里需要考虑一个问题

    • 如何将子流程的输入输出跟主流程中对应任务的输入输出对接

    为了解决这个问题,首先需要考虑,如何将子流程中所有任务的输出反馈到主流程

    • 一般地,流程的结构设计都是有一个主入口(workflow),由主入口任务(在required方法中)初始化并启动其他任务

    • 那么,就需要在workflow任务中把整个流程中其他任务的输出作为一个整体输出:

      class workflow(luigi.Task):
        def required(self):
          return [otherTask()]
        def output(self):
          # *** 这样就可以将主入口所依赖的所有其他任务的输出返回 ***
          return self.input()
    

    既然能够获取到子流程中所有任务的总输出,那么就需要考虑把输出反馈给主流程

    • 考虑到workflow任务获取其他任务的总输出的方法,可以直接将workflow的output方法跟对应主流程任务的output方法结合:
      class Pipeline1_Task()
        def output(self):
          # *** 这样子流程的output就会跟主流程任务的output对接 ***
          # 同时,这样处理在主流程启动时,luigi框架依旧是会检查子流程的输出是否已经完整
          from pipeline1 import workflow as pipeline1
          return pipeline1().output()
    
    • 至于主流程中的任务的依赖就比较容易处理了:
      class Pipeline2_Task(luigi.Task):
        def required(self):
          # 由于Pipeline1_Task的输出即为子流程pipeline_1的输出,所以这里luigi会检查到子流程pipeline_1的输出是否完整
          return Pipeline1_Task()
    

    子流程的输入输出已经可以跟主流程的输入输出对应上了,那么就需要考虑如何怎么运行子流程

    这里是没有办法通过pipeline1.workflow().run()直接执行,因为入口任务是没有重载run方法

    • 所以,这里把子流程作为一个黑箱执行:
      class Pipeline1_Task(luigi.Task):
        def run(self):
          from pipeline1 import workflow as pipeline1
          # *** 黑箱 ***
          luigi.Build([pipeline1()])
    

    由于需要确保主流程中的任务“挂载”的是统一的一个子流程,则可以定义一个变量来储存子流程对象

      class Pipeline1_Task(luigi.Task):
        pipeline = None
        def run(self):
          luigi.Build([self.pipeline])
        def output(self):
          # 由于每个任务在流程中优先执行的是output方法(当任务被依赖的时候luigi会利用output方法检查输出的完整性),所以self.pipeline的初始化应该在output方法内执行
          from pipeline1 import workflow as pipeline1
          self.pipeline = pipeline1()
          return pipeline1.output()
    

    这样,就可以完整地把子流程装载到主流程的任务中

    相关文章

      网友评论

          本文标题:luigi使用 - multiple pipeline

          本文链接:https://www.haomeiwen.com/subject/nwapuftx.html