美文网首页
调度工具Airflow

调度工具Airflow

作者: xieyan0811 | 来源:发表于2018-10-27 13:39 被阅读195次

    1. 什么是Airflow

     Airflow是Airbnb开源的data pipeline调度和监控工作流的平台,用于用来创建、监控和调整data pipeline(ETL)。

    2. 简单的定时任务cron

     假设我们想要定时调用一个程序,比如说:每天定时从Web抓数据,我们可以使用cron。cron是一个Linux下的后台服务,用来定期的执行一些任务,在/etc/crontab中设置后即可,基本写法如下:

    # 分钟 小时 日 月 周 用户  命令 
    17 * * * * root date >> /tmp/time.log 
    

     它的意思是每个小时的第18分钟,将当前时间写入log文件,注意各值的取值范围(分钟 0 - 59,小时0 - 23,天1 - 31,月1 - 12,星期0 - 6,0表示星期天) 修改/etc/crontab后,还需要用 $ sudo service cron restart命令重启crontab任务,才能生效。

    3. 为什么要用Airflow

     有了cron为什么还需要airflow?以抓取web数据为例,可能在某天抓取数据时,网断或者关机了,当天的数据没抓进来,这种情况下,只能通过写日志定时分析日志,以及在程序中定时重连的方式保证数据完整,相对比较零碎和麻烦。另外,如果crontab设置文件中有几十上百条任务时,就比较头疼了。

     Airflow支持图形界面和命令行两种方式,管理起来比较方便,另外,它可以把几个相互依赖的任务编成一组,并监督执行是否正常,如果不正常,调用程序重试等等。

     当然,Airflow也不全是优点,比如需要使用python脚本来定义任务间的依赖关系,相对于手动编辑crontab文件,相对难一些。因此,如果只调用简单的任务,使用cron即可,复杂的再考虑airflow。

    4. Airflow的基础概念

     Airflow 中最基本的两个概念是:DAG 和 task。DAG 的全称是 Directed Acyclic Graph 是所有你想执行的任务的集合,在这个集合中可以定义了他们的依赖关系,一个 DAG object可以用 Python 脚本中配置完成。每个 DAG object 代表了一个 workflow,每个 workflow 都可以包含任意个 task,task就是具体的任务。

    5. Airflow安装和使用

    (1) 安装airflow

    $ sudo pip install airflow 
    

     可以通过环境变量AIRFLOW_HOME 设置airflow的工作目录,默认为$HOME/airflow/

    (2) Mysql支持

     如果想使用mysql存储airflow内容,请按如下方法设置mysql;如果不设置,airflow在其工作目录下建立db文件,以sqlite方式存储。

    $ mysql -u root -p
    
    mysql> create database airflow default charset utf8 collate utf8_general_ci;
    mysql> create user airflow@'localhost' identified by 'airflow';
    mysql> grant all on airflow.* to airflow@'localhost';
    mysql> flush privileges;
    

     修改配置文件 $AIRFLOW_HOME/airflow.cfg,把sql_alchemy_conn对应语句替换成:

    sql_alchemy_conn = mysql://airflow:airflow@localhost:3306/airflow
    

    (3) 运行

    $ airflow initdb
    $ airflow worker
    $ airflow webserver -p 8080 # 一直运行
    $ airflow scheduler # 一直运行  
    

     此时在浏览器中输入:http://localhost:8080,即可看到airflow界面,其中有很多demo可以参考。

    (4) 建立第一个DAG:Hellow world

    $ mkdir $AIRFLOW_HOME/dags/
    $ vi $AIRFLOW_HOME/dags/hello_word.py # 内容如下:
    
     # -*- coding: utf-8 -*-
    
    import airflow
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import PythonOperator
    from datetime import timedelta
    
    default_args = {
     'owner': 'yan.xie',
     'depends_on_past': False,
     'start_date': airflow.utils.dates.days_ago(2),
     'retries': 5, # 重试次数
     'retry_delay': timedelta(minutes=1), # 运行间隔时间
    }
    
    dag = DAG(
     'test_my_dag', # DAG名字
     default_args=default_args,
     description='my first DAG',
     schedule_interval=timedelta(days=1))
    
    task1 = BashOperator(
     task_id='task_1', # TASK名
     bash_command='date', # 运行命令
     dag=dag)
    
    task2 = BashOperator(
     task_id='task_2',
     depends_on_past=False,
     bash_command='sleep 5',
     dag=dag)
    
    def print_hello():
     return 'Hello world!'
    
    test3 = PythonOperator(
     task_id='task_3',
     python_callable=print_hello, # 运行python程序
     dag=dag)
    
    task2.set_upstream(task1) # 设置依赖关系
    test3.set_upstream(task1) 
    

     保存之后,再浏览器刷新一下界面,即可在list中看到该DAG,点On后,即可运行。

     点开DAG可以看到各Task间的依赖关系

     以及树型关系

    (5) 调试
     有时候,怕不能一次写对,可以运行以下命令调试单个Task

    $ airflow test test_my_dag task_3 20181027
    

    (6) 清除全部DAG重置数据库

    $ airflow resetdb 
    

     并删除 $AIRFLOW_HOME/dags/ 下所有DAG文件,然后重启webserver。

     在Airflow中,如果改了一个DAG的名字,它会新建一个DAG,而不仅是改名,所以旧的DAG还在数据库和列表中存在,可以用 “$ airflow delete_dag DAG名” 的方式删除它,但不是每个airflow版本都支持delete_dag命令。此时可以只用resetdb不删除dags目录下文件的方式,删除目录中没有对应文件的DAG(删除有风险,操作须谨慎)。

    6. 参考

    (1) Ubuntu下crontab命令的用法
    https://www.cnblogs.com/daxian2012/articles/2589894.html

    (2) 使用 Airflow 替代你的 crontab
    https://www.juhe.cn/news/index/id/2365

    相关文章

      网友评论

          本文标题:调度工具Airflow

          本文链接:https://www.haomeiwen.com/subject/mnhdtqtx.html