上篇文章说到了安装,这次说说使用
登录
https://localhost:8443
注意是https,采用的是jetty ssl链接。输入账号密码azkaban/azkanban(如果你之前没有更改的话)
![](https://img.haomeiwen.com/i1457188/ffbb875163c48b76.png)
首页有四个菜单
- projects:最重要的部分,创建一个工程,所有flows将在工程中运行。
- scheduling:显示定时任务
- executing:显示当前运行的任务
- history:显示历史运行任务
主要介绍projects部分
首先创建一个工程,填写名称和描述,比如o2olog。
![](https://img.haomeiwen.com/i1457188/ed32e0391877e11a.png)
Flows:工作流程,有多个job组成
Permissions:权限管理
Project Logs:工程日志
创建工程:
创建之前我们先了解下之间的关系,一个工程包含一个或多个flows,一个flow包含多个job。job是你想在azkaban中运行的一个进程,可以是简单的linux命令,可是java程序,也可以是复杂的shell脚本,当然,如果你安装相关插件,也可以运行插件。一个job可以依赖于另一个job,这种多个job和它们的依赖组成的图表叫做flow。
job创建
创建job很简单,只要创建一个以.job结尾的文本文件就行了,例如我们创建一个工作,用来将日志数据导入hive中(关于大数据方面的东西,不在重复,可以理解为,将日志所需数据导入的mysql中),我们创建o2o_2_hive.job
type=command
command=echo "data 2 hive"
一个简单的job就创建好了,解释下,type的command,告诉azkaban用unix原生命令去运行,比如原生命令或者shell脚本,当然也有其他类型,后面说。
一个工程不可能只有一个job,我们现在创建多个依赖job,这也是采用azkaban的首要目的。
flows创建
我们说过多个jobs和它们的依赖组成flow。怎么创建依赖,只要指定dependencies参数就行了。比如导入hive前,需要进行数据清洗,数据清洗前需要上传,上传之前需要从ftp获取日志。
定义5个job:
- o2o_2_hive.job:将清洗完的数据入hive库
- o2o_clean_data.job:调用mr清洗hdfs数据
- o2o_up_2_hdfs.job:将文件上传至hdfs
- o2o_get_file_ftp1.job:从ftp1获取日志
- o2o_get_file_fip2.job:从ftp2获取日志
依赖关系:
3依赖4和5,2依赖3,1依赖2,4和5没有依赖关系。
o2o_2_hive.job
type=command
# 执行sh脚本,建议这样做,后期只需维护脚本就行了,azkaban定义工作流程
command=sh /job/o2o_2_hive.sh
dependencies=o2o_clean_data
o2o_clean_data.job
type=command
# 执行sh脚本,建议这样做,后期只需维护脚本就行了,azkaban定义工作流程
command=sh /job/o2o_clean_data.sh
dependencies=o2o_up_2_hdfs
o2o_up_2_hdfs.job
type=command
#需要配置好hadoop命令,建议编写到shell中,可以后期维护
command=hadoop fs -put /data/*
#多个依赖用逗号隔开
dependencies=o2o_get_file_ftp1,o2o_get_file_ftp2
o2o_get_file_ftp1.job
type=command
command=wget "ftp://file1" -O /data/file1
o2o_get_file_ftp2.job
type=command
command=wget "ftp:file2" -O /data/file2
可以运行unix命令,也可以运行python脚本(强烈推荐)。将上述job打成zip包。
ps:为了测试流程,我将上述command都改为echo +相应命令
上传:
![](https://img.haomeiwen.com/i1457188/6477198994125a5e.png)
![](https://img.haomeiwen.com/i1457188/a857b2205810e842.png)
点击o2o_2_hive进入流程,azkaban流程名称以最后一个没有依赖的job定义的。
![](https://img.haomeiwen.com/i1457188/93637f40d145157e.png)
右上方是配置执行当前流程或者执行定时流程。
![](https://img.haomeiwen.com/i1457188/d6d7d0d0a06cd6fa.png)
Flow view:流程视图。可以禁用,启用某些job
Notification:定义任务成功或者失败是否发送邮件
Failure Options:定义一个job失败,剩下的job怎么执行
Concurrent:并行任务执行设置
Flow Parametters:参数设置。
1.执行一次
设置好上述参数,点击execute。
![](https://img.haomeiwen.com/i1457188/df1915c74b45c5b4.png)
绿色代表成功,蓝色是运行,红色是失败。可以查看job运行时间,依赖和日志,点击details可以查看各个job运行情况。
![](https://img.haomeiwen.com/i1457188/69b50de84c14de6d.png)
2.定时执行
![](https://img.haomeiwen.com/i1457188/aa63eba8a270dbb0.png)
这时候注意到cst了吧,之前需要将配置中时区改为Asia/shanghai。
可以选择"天/时/分/月/周"等执行频率。
![](https://img.haomeiwen.com/i1457188/bb02287da490da43.png)
可以查看下次执行时间。
其他job配置选项
- 可以定义job依赖另一个flow,配置
type=flow
flow.name=fisrt_flow
- 可以设置每个job中子命令
type=command
command=echo "hello"
command.1=echo "world"
- 可以配置job失败重启次数,及间隔时间,比如,上述ftp获取日志,我可以配置重试12次,每隔5分钟一次。
type=command
command=wget "ftp://file1" -O /data/file1
retries=12
#单位毫秒
retry.backoff=300000
azkaban还有很多强大的功能,如它开放了相关api,我们将它可以嵌入到自己的app等等,有兴趣的朋友可以研究。
至此,azkaban学习到此结束,本人也是初学者,如果问题,欢迎指正。
网友评论
推荐下,分布式作业中间件 Elastic-Job 源码解析 16 篇:http://tinyurl.com/y93r9wfg
颓
还不错那
在job 中写的command命令中带有中文字符,执行之后从日志里看中文乱码了,
mysql数据库里也是乱码,
请问这个是要修改哪里的编码格式?
Exception in thread "AzkabanWebServer-Cleaner-Thread" java.lang.NoSuchMethodError: org.joda.time.DateTime.now()Lorg/joda/time/DateTime;
at azkaban.executor.ExecutorManager$CleanerThread.cleanExecutionLogs(ExecutorManager.java:1224)
at azkaban.executor.ExecutorManager$CleanerThread.run(ExecutorManager.java:1209)
Exception in thread "main" java.lang.NoSuchMethodError: org.codehaus.jackson.JsonNode.asInt()I
at azkaban.utils.JSONUtils.toObjectFromJSONNode(JSONUtils.java:152)
at azkaban.utils.JSONUtils.toObjectFromJSONNode(JSONUtils.java:134)
at azkaban.utils.JSONUtils.parseJSONFromString(JSONUtils.java:106)
at azkaban.project.JdbcProjectLoader$ProjectResultHandler.handle(JdbcProjectLoader.java:1050)
at azkaban.project.JdbcProjectLoader$ProjectResultHandler.handle(JdbcProjectLoader.java:1012)
at org.apache.commons.dbutils.QueryRunner.query(QueryRunner.java:347)
at org.apache.commons.dbutils.QueryRunner.query(QueryRunner.java:225)
at azkaban.project.JdbcProjectLoader.fetchAllActiveProjects(JdbcProjectLoader.java:92)
at azkaban.project.JdbcProjectLoader.fetchAllActiveProjects(JdbcProjectLoader.java:77)
at azkaban.project.ProjectManager.loadAllProjects(ProjectManager.java:72)
at azkaban.project.ProjectManager.<init>(ProjectManager.java:66)
at azkaban.webapp.AzkabanWebServer.loadProjectManager(AzkabanWebServer.java:242)
at azkaban.webapp.AzkabanWebServer.<init>(AzkabanWebServer.java:186)
at azkaban.webapp.AzkabanWebServer.main(AzkabanWebServer.java:726)