最近一段时间一直在跟一个Python的后端项目,学习了基于Python栈的一些框架和包,其中使用到了Luigi,简单做个总结。
1. Luigi简介
官网的介绍:
Luigi is a Python (2.7, 3.6, 3.7 tested) package that helps you build complex pipelines of batch jobs. It handles dependency resolution, workflow management, visualization, handling failures, command line integration, and much more.
简单来说,Luigi是管理作业流的工具,可以方便我们监控、管理任务流。举个例子,比如我们有一个机器学习任务,牵扯到数据获取,数据清洗,模型训练,模型预测,结果回仓等作业。当然我们可以使用脚本的形式把这些作业串起来,但是可控性就很差,出了问题也比较难定位。
Luigi可以把我们从繁琐的作业流解放出来,它提供可视化的界面,方便我们查看DAG图,作业进度,作业状况等。甚至作业流出错时,可以选择从出错的作业开始继续执行,而不用重新开始等。
visualiser_front_page.png2. Luigi安装和运行
安装:
$ pip install luigi
运行:启动luigid服务,访问localhost:8082可以看到可视化界面
$ luigid
3. 基本概念
Task:每一个作业都是一个Task,以class的形式存在,继承luigi.Task。需要重载require()、run()、output()方法。
task_breakdown.png其中require()是任务的入口,可以指定该作业的上游作业,并获取上游作业的输出,以便继续处理。
run()是该作业的具体实现。
output()是作业的出口,可以选择把处理的结果输出给下游。
4. 实例
这里我们就简单举个例子,只有两步操作,第一步从数据库取数据,第二步做简单的数据处理,最后以txt格式保存在本地。
准备工作:在数据库students里插入3条数据:
1565778970421.jpg编写代码:
1565778907516.jpg主体有两个class,第一个Dump4DB,通过sqlalchemy数据库取数,并落地保存成data4db.txt。sqlalchemy的使用这里就不赘述了,Python操作数据库一般都会选择它。
第二个Clean,它的require是Dump4DB,也就是说Dump4DB的下一步是执行Clean作业。这里简单的数据清洗,再次落地成最终结果data.txt。
执行命令:Clean是最后的Task名称,--password 传递参数
python3 luigitest.py Clean --password xxxxxx
输出结果:中间落地的文件,以及最终结果如图。
5. 总结
由于在我的项目中,对Luigi的使用也比较简单,所以研究的也不是很深入。事实上,Luigi可以支持很多,比如FTP、HDFS、Spark、S3等。需要的同学自己在官网上搜吧。
网友评论