[toc]
Airflow平台是一个描述、执行、监控工作流的工具。
DAGs
DAG(a Directed Acyclic Graph 有向无环图)是Airflow中一组需要运行任务组合,DAG以反应任务间关系和依赖的方式组织。
Dag描述你的工作流是如何运行。DAGs是由在Airflow的DAG_FOLDER目录下的一些标准Python文件定义的。Airflow会通过执行每一个文件中的代码来动态的建立DAG对象。每一个DAGs有任意数量的任务组成。通常,一个DAGs有相应的简单逻辑的工作流。
有效范围(Scope)
Airflow会载入来自DAGfile文件中可以导入的所有DAG对象。这意味着DAG必须是全局的,比如下面两个DAGs,只有dag_1会被加载,另外一个只是本地有效范围。
dag_1 = DAG('this_dag_will_be_discovered')
def my_function():
dag_2 = DAG('but_this_dag_will_not')
my_function()
有时会非常有用。例如,SubDagOperator(子DAG操作)通常的模式是把子DAG放在一个函数里面,这样Airflow不会把子DAG当作一个独立DAG而加载子DAG。
默认参数
default_args的参数传递给DAG,就会传递给这个DAG所有的operator。因此一些共同的参数传递给很多个operator时不需要设置很多次。
default_args = {
'start_date': datetime(2016, 1, 1),
'owner': 'Airflow'
}
dag = DAG('my_dag', default_args=default_args)
op = DummyOperator(task_id='dummy', dag=dag)
print(op.owner) # Airflow
Operators(任务处理器)
DAGs描述工作流怎么运行,Operators确定实际上干什么。在工作流中operator(处理器)描述一个任务。
任务处理器:
- BashOperator: 执行bash命令。
- PythonOperator:调用任意数量的Python函数。
- EmailOperator:发送邮件。
- SimpleHttpOperator:发送HTTP请求。
- MySqlOperator、SqliteOperator、PostgresOperator、MsSqlOperator、OracleOperator、JdbcOperator等等:执行sql命令。
- Sensor:等一定的时间、文件、数据库行等
另外除了这些基本任务处理器,还有更多的处理器: DockerOperator, HiveOperator, S3FileTransformOperator, PrestoToMySqlTransfer, SlackAPIOperator。任务处理器只有被指派给DAG之后才会被airflow加载。
DAG Assignment(DAG 指派器)
在operator在明确创建,dag指派,或者其他operator指引的实际, DAG Assignment才会创建。
dag = DAG('my_dag', start_date=datetime(2016, 1, 1))
# sets the DAG explicitly
explicit_op = DummyOperator(task_id='op1', dag=dag)
# deferred DAG assignment
deferred_op = DummyOperator(task_id='op2')
deferred_op.dag = dag
# inferred DAG assignment (linked operators must be in the same DAG)
inferred_op = DummyOperator(task_id='op3')
inferred_op.set_upstream(deferred_op)
Tasks(任务)
一旦任务处理器实例化后,它会被成为任务。实例化成具体的值时,调用抽象的任务处理器,参数化任务变成DAG中一个点。
Task Instances(任务实例)
任务实例化代表一个任务具体的运行,由DAG、任务、时间的组合成。任务实例化也有一个明确的状态,状态如 “running”, “success”, “failed”, “skipped”, “up for retry”等等。
Task LifeCycle(任务生命周期)
一个任务从开始到结束中间由几个过程。在Airflow的UI中(图形和树界面),这些阶段采用不同的颜色代表不同的阶段。
生命状态不同的颜色表示
一个正常的工作流会经历如下阶段:
1、no status (调度器创建空的任务实例)
2、queued (调度其把任务放在队列里)
3、running (worker选择任务和执行任务)
4、success (任务完成)
Workflows(工作流)
有些术语概念容易混淆。
- DAG:一些有顺序的工作的描述。
- Operator(任务处理器):一类执行一些工作的模版。
- Task(任务):任务处理器的参数化实例。
- Task Instance: 1、任务需要被指定给DAG;2、一个运行的DAG相关的状态。
DAGs和Operator创建任务实例,创建复杂的工作流。
其他功能
Airflow除了上面的核心对象,还有一些并发访问资源,相互通信,条件执行等高级特征的行为。
Hooks
Hooks是面向Hive, S3, MySQL, Postgres, HDFS和 Pig其他平台和数据库的接口。Hooks尽可能的实现了一些共同的接口,跟任务处理器构造块类似。它也用airflow.models.connection.Connection模型搜索主机名字和认证信息。
Pools(airflow池)
当太多任务同时处理时,系统会崩溃。Airflow池面对任意数量的任务时会限制执行任务的并发数。在用户界面中通过airflow池的名字和指定数量的工作槽来管理airflow池列表。任务实例化时pool参数使任务和已存在的pools相关联。
aggregate_db_message_job = BashOperator(
task_id='aggregate_db_message_job',
execution_timeout=timedelta(hours=3),
pool='ep_data_pipeline_db_msg_agg',
bash_command=aggregate_db_message_job_cmd,
dag=dag)
aggregate_db_message_job.set_upstream(wait_for_empty_queue)
触发规则
尽管通常工作流触发任务需要上流的任务都成功,但是Airflow提供来更加复杂的依赖设置。
- all_success: 默认。所有上流的任务都成功。
- all_failed: 所有上流任务都失败(a failed or upstream_failed state)。
- all_done: 所有上流任务都执行了。
- one_failed: 至少上流任务一个失败了,不需要等所有上流任务都执行完。
- one_success: 至少上流任务一个成功了,不需要等所有上流任务都执行完。
- none_failed:所有上流任务没有失败。上流任务成功或者被跳过。 、
- none_skipped: 上流任务没有跳过的。上流任务是success, failed, or upstream_failed state之一。
- dummy: 依赖只是显示。
网友评论