美文网首页python热爱者
工作流管理平台 Airflow 入门

工作流管理平台 Airflow 入门

作者: 流月0 | 来源:发表于2018-06-09 16:15 被阅读26次

    环境

    CentOS Linux release 7.5.1804

    Python 3.6.4/2.7.14

    简介

    Airflow 是 Airbnb 开源的一个用 Python 编写的工作流管理平台,自带 web UI 和调度,目前在Apache下做孵化。


    Airflow 管理页面

    Airflow 中有两个基本概念,DAG和task。
    DAG是多个task的集合,定义在一个Python文件中,包含了task之间的依赖关系,如task A在task B之后执行,task C可以单独执行等等。

    安装并运行

    # 默认目录在~/airflow,也可以使用以下命令来指定目录
    export AIRFLOW_HOME=~/airflow
    
    pip install apache-airflow
    
    # 初始化数据库
    airflow initdb
    
    # 启动web服务,默认端口为8080,也可以通过`-p`来指定
    airflow webserver -p 8080
    
    # 启动 scheduler
    airflow scheduler
    

    定义第一个DAG

    $AIRFLOW_HOME目录下新建dags文件夹,后面的所有dag文件都要存储在这个目录。

    新建dag文件hello_world.py,语句含义见注释

    # coding: utf-8
    
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from datetime import datetime, timedelta
    
    
    # 定义默认参数
    default_args = {
        'owner': 'airflow',  # 拥有者名称
        'start_date': datetime(2018, 6, 6, 20, 00),  # 第一次开始执行的时间,为格林威治时间,为了方便测试,一般设置为当前时间减去执行周期
        'email': ['shelmingsong@gmail.com'],  # 接收通知的email列表
        'email_on_failure': True,  # 是否在任务执行失败时接收邮件
        'email_on_retry': True,  # 是否在任务重试时接收邮件
        'retries': 3,  # 失败重试次数
        'retry_delay': timedelta(seconds=5)  # 失败重试间隔
    }
    
    # 定义DAG
    dag = DAG(
        dag_id='hello_world',  # dag_id
        default_args=default_args,  # 指定默认参数
        # schedule_interval="00, *, *, *, *"  # 执行周期,依次是分,时,天,月,年,此处表示每个整点执行
        schedule_interval=timedelta(minutes=1)  # 执行周期,表示每分钟执行一次
    )
    
    
    # 定义要执行的Python函数1
    def hello_world_1():
        current_time = str(datetime.today())
        with open('/root/tmp/hello_world_1.txt', 'a') as f:
            f.write('%s\n' % current_time)
        assert 1 == 1  # 可以在函数中使用assert断言来判断执行是否正常,也可以直接抛出异常
    
    
    # 定义要执行的Python函数2
    def hello_world_2():
        current_time = str(datetime.today())
        with open('/root/tmp/hello_world_2.txt', 'a') as f:
            f.write('%s\n' % current_time)
    
    
    # 定义要执行的Python函数3
    def hello_world_3():
        current_time = str(datetime.today())
        with open('/root/tmp/hello_world_3.txt', 'a') as f:
            f.write('%s\n' % current_time)
    
    # 定义要执行的task 1
    t1 = PythonOperator(
        task_id='hello_world_1',  # task_id
        python_callable=hello_world_1,  # 指定要执行的函数
        dag=dag,  # 指定归属的dag
        retries=2,  # 重写失败重试次数,如果不写,则默认使用dag类中指定的default_args中的设置
    )
    
    # 定义要执行的task 2
    t2 = PythonOperator(
        task_id='hello_world_2',  # task_id
        python_callable=hello_world_2,  # 指定要执行的函数
        dag=dag,  # 指定归属的dag
    )
    
    # 定义要执行的task 3
    t3 = PythonOperator(
        task_id='hello_world_3',  # task_id
        python_callable=hello_world_3,  # 指定要执行的函数
        dag=dag,  # 指定归属的dag
    )
    
    t2.set_upstream(t1)
    # 表示t2这个任务只有在t1这个任务执行成功时才执行,
    # 等价于 t1.set_downstream(t2)
    # 同时等价于 dag.set_dependency('hello_world_1', 'hello_world_2')
    
    t3.set_upstream(t1)  # 同理
    

    写完后执行它检查是否有错误,如果命令行没有报错,就表示没问题。

    python $AIRFLOW_HOME/dags/hello_world.py
    

    通过以下命令查看生效的dags

    [root@localhost dags]# airflow list_dags
    [2018-06-06 21:03:25,808] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2018-06-06 21:03:25,877] {models.py:189} INFO - Filling up the DagBag from /root/airflow/dags
    
    
    -------------------------------------------------------------------
    DAGS
    -------------------------------------------------------------------
    hello_world
    

    查看hello_world这个dag下面的tasks

    [root@localhost dags]# airflow list_tasks hello_world
    [2018-06-06 21:04:45,736] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2018-06-06 21:04:45,805] {models.py:189} INFO - Filling up the DagBag from /root/airflow/dags
    hello_world_1
    hello_world_2
    hello_world_3
    

    查看hello_world这个dag下面tasks的层级关系

    [root@localhost dags]# airflow list_tasks hello_world --tree
    [2018-06-06 21:05:42,956] {__init__.py:45} INFO - Using executor SequentialExecutor
    [2018-06-06 21:05:43,020] {models.py:189} INFO - Filling up the DagBag from /root/airflow/dags
    <Task(PythonOperator): hello_world_2>
        <Task(PythonOperator): hello_world_1>
    <Task(PythonOperator): hello_world_3>
        <Task(PythonOperator): hello_world_1>
    

    如果按照以上步骤启动了schedule,则DAG已经开始定时执行了,我们设置了每分钟执行一次,可以访问your_domain:8080来查看任务的执行情况。
    也可以查看/root/tmp/hello_world_1.txt/root/tmp/hello_world_2.txt/root/tmp/hello_world_3.txt文件中的内容来检查任务是否执行成功。

    执行失败时email通知

    如果需要在任务执行失败(执行过程中有异常抛出)的时候邮件通知,除了在DAG文件中指定接收email列表外,还需要在配置文件中指定发送邮箱的信息,打开配置文件$AIRFLOW_HOME/airflow.cfg,修改以下配置项,修改完需要重启webserver和schedule

    smtp_host = smtp.163.com  # smtp邮箱地址
    smtp_starttls = True  # 是否tls加密
    smtp_mail_from = demo@163.com  # 发件人邮箱地址,需开通smtp服务
    smtp_ssl = False  # 是否ssl加密
    smtp_port = 25  # smtp端口号
    

    使用位位移来指定执行顺序

    以下四行的作用是相同的

    op1 >> op2
    op1.set_downstream(op2)
    
    op2 << op1
    op2.set_upstream(op1)
    

    也可以连续使用位位移

    op1 >> op2 >> op3 << op4
    

    以上等价于

    op1.set_downstream(op2)
    op2.set_downstream(op3)
    op3.set_upstream(op4)
    

    使用变量(Variables)

    变量的value可以在UI界面的Admin > Variables里面进行增删改查

    可以在代码中这样使用变量

    from airflow.models import Variable
    foo = Variable.get("foo", default_var='a')  # 设置当获取不到时使用的默认值
    bar = Variable.get("bar", deserialize_json=True)  # 对json数据进行反序列化
    

    更多

    Airflow doc

    博客更新地址

    相关文章

      网友评论

        本文标题:工作流管理平台 Airflow 入门

        本文链接:https://www.haomeiwen.com/subject/osgteftx.html