美文网首页
airflow概念

airflow概念

作者: 似水之星 | 来源:发表于2019-12-17 16:17 被阅读0次

    [toc]

    Airflow平台是一个描述、执行、监控工作流的工具。

    DAGs

    DAG(a Directed Acyclic Graph 有向无环图)是Airflow中一组需要运行任务组合,DAG以反应任务间关系和依赖的方式组织。
    Dag描述你的工作流是如何运行。DAGs是由在Airflow的DAG_FOLDER目录下的一些标准Python文件定义的。Airflow会通过执行每一个文件中的代码来动态的建立DAG对象。每一个DAGs有任意数量的任务组成。通常,一个DAGs有相应的简单逻辑的工作流。

    有效范围(Scope)

    Airflow会载入来自DAGfile文件中可以导入的所有DAG对象。这意味着DAG必须是全局的,比如下面两个DAGs,只有dag_1会被加载,另外一个只是本地有效范围。

    dag_1 = DAG('this_dag_will_be_discovered')
    
    def my_function():
        dag_2 = DAG('but_this_dag_will_not')
    
    my_function()
    

    有时会非常有用。例如,SubDagOperator(子DAG操作)通常的模式是把子DAG放在一个函数里面,这样Airflow不会把子DAG当作一个独立DAG而加载子DAG。

    默认参数

    default_args的参数传递给DAG,就会传递给这个DAG所有的operator。因此一些共同的参数传递给很多个operator时不需要设置很多次。

    default_args = {
        'start_date': datetime(2016, 1, 1),
        'owner': 'Airflow'
    }
    
    dag = DAG('my_dag', default_args=default_args)
    op = DummyOperator(task_id='dummy', dag=dag)
    print(op.owner) # Airflow
    

    Operators(任务处理器)

    DAGs描述工作流怎么运行,Operators确定实际上干什么。在工作流中operator(处理器)描述一个任务。
    任务处理器:

    • BashOperator: 执行bash命令。
    • PythonOperator:调用任意数量的Python函数。
    • EmailOperator:发送邮件。
    • SimpleHttpOperator:发送HTTP请求。
    • MySqlOperator、SqliteOperator、PostgresOperator、MsSqlOperator、OracleOperator、JdbcOperator等等:执行sql命令。
    • Sensor:等一定的时间、文件、数据库行等
      另外除了这些基本任务处理器,还有更多的处理器: DockerOperator, HiveOperator, S3FileTransformOperator, PrestoToMySqlTransfer, SlackAPIOperator。任务处理器只有被指派给DAG之后才会被airflow加载。

    DAG Assignment(DAG 指派器)

    在operator在明确创建,dag指派,或者其他operator指引的实际, DAG Assignment才会创建。

    dag = DAG('my_dag', start_date=datetime(2016, 1, 1))
    
    # sets the DAG explicitly
    explicit_op = DummyOperator(task_id='op1', dag=dag)
    
    # deferred DAG assignment
    deferred_op = DummyOperator(task_id='op2')
    deferred_op.dag = dag
    
    # inferred DAG assignment (linked operators must be in the same DAG)
    inferred_op = DummyOperator(task_id='op3')
    inferred_op.set_upstream(deferred_op)
    

    Tasks(任务)

    一旦任务处理器实例化后,它会被成为任务。实例化成具体的值时,调用抽象的任务处理器,参数化任务变成DAG中一个点。

    Task Instances(任务实例)

    任务实例化代表一个任务具体的运行,由DAG、任务、时间的组合成。任务实例化也有一个明确的状态,状态如 “running”, “success”, “failed”, “skipped”, “up for retry”等等。

    Task LifeCycle(任务生命周期)

    一个任务从开始到结束中间由几个过程。在Airflow的UI中(图形和树界面),这些阶段采用不同的颜色代表不同的阶段。


    生命状态不同的颜色表示

    一个正常的工作流会经历如下阶段:
    1、no status (调度器创建空的任务实例)
    2、queued (调度其把任务放在队列里)
    3、running (worker选择任务和执行任务)
    4、success (任务完成)

    Workflows(工作流)

    有些术语概念容易混淆。

    • DAG:一些有顺序的工作的描述。
    • Operator(任务处理器):一类执行一些工作的模版。
    • Task(任务):任务处理器的参数化实例。
    • Task Instance: 1、任务需要被指定给DAG;2、一个运行的DAG相关的状态。
      DAGs和Operator创建任务实例,创建复杂的工作流。

    其他功能

    Airflow除了上面的核心对象,还有一些并发访问资源,相互通信,条件执行等高级特征的行为。

    Hooks

    Hooks是面向Hive, S3, MySQL, Postgres, HDFS和 Pig其他平台和数据库的接口。Hooks尽可能的实现了一些共同的接口,跟任务处理器构造块类似。它也用airflow.models.connection.Connection模型搜索主机名字和认证信息。

    Pools(airflow池)

    当太多任务同时处理时,系统会崩溃。Airflow池面对任意数量的任务时会限制执行任务的并发数。在用户界面中通过airflow池的名字和指定数量的工作槽来管理airflow池列表。任务实例化时pool参数使任务和已存在的pools相关联。

    aggregate_db_message_job = BashOperator(
        task_id='aggregate_db_message_job',
        execution_timeout=timedelta(hours=3),
        pool='ep_data_pipeline_db_msg_agg',
        bash_command=aggregate_db_message_job_cmd,
        dag=dag)
    aggregate_db_message_job.set_upstream(wait_for_empty_queue)
    

    触发规则

    尽管通常工作流触发任务需要上流的任务都成功,但是Airflow提供来更加复杂的依赖设置。

    • all_success: 默认。所有上流的任务都成功。
    • all_failed: 所有上流任务都失败(a failed or upstream_failed state)。
    • all_done: 所有上流任务都执行了。
    • one_failed: 至少上流任务一个失败了,不需要等所有上流任务都执行完。
    • one_success: 至少上流任务一个成功了,不需要等所有上流任务都执行完。
    • none_failed:所有上流任务没有失败。上流任务成功或者被跳过。 、
    • none_skipped: 上流任务没有跳过的。上流任务是success, failed, or upstream_failed state之一。
    • dummy: 依赖只是显示。

    备注
    翻译自:https://airflow.apache.org/docs/stable/concepts.html

    相关文章

      网友评论

          本文标题:airflow概念

          本文链接:https://www.haomeiwen.com/subject/odurnctx.html