美文网首页
Airflow 入门笔记

Airflow 入门笔记

作者: edwin1993 | 来源:发表于2018-09-26 12:50 被阅读0次

    应用场景

    ETL 差不多是数据处理的基础,要求非常稳定,容错率高,而且能够很好的监控,其全称是 Extract,Transform,Load, 一般情况下是将乱七八糟的数据进行预处理,然后放到储存空间上。
    一般过程上,在数据进入后需要人工的去将数据的按流程处理一遍,调用各种工具。这个过程有些机械化,所以可以考虑使用脚本或者其它工具进行控制。

    airflow 是能进行数据pipeline的管理,甚至是可以当做更高级的cron job 来使用。它是用python写的,能进行工作流的调度,提供更可靠的流程,而且它还有自带的UI。

    下载与安装

    安装:
    pip install airflow[all]
    
    初始化:
    airflow initdb
    
    启动:
    airflow webserve
    

    访问8080可以看到:

    airflow的重要概念:DAG

    DAG是directed asyclic graph,在很多机器学习里有应用,也就是所谓的有向非循环。但是在airflow里你可以看做是一个小的工程,小的流程,因为每个小的工程里可以有很多“有向”的task,最终达到某种目的。

    在官网中的介绍里说dag的特点:

    • Scheduled: each job should run at a certain scheduled interval
    • Mission critical: if some of the jobs aren’t running, we are in trouble
    • Evolving: as the company and the data team matures, so does the data processing
    • Heterogenous: the stack for modern analytics is changing quickly, and most companies run multiple systems that need to be glued together
    加入自己的DAG

    airflow会在默认文件夹下生成airflow文件夹,然后你只要在里面新建一个文件dag就可以了。

    然后创建一个自己的dag文件,写好之后,只要将这个dag放入之前建立好的dag文件夹

    运行自己的DAG

    我们使用官网给出的样板:

    """
    Code that goes along with the Airflow located at:
    http://airflow.readthedocs.org/en/latest/tutorial.html
    """
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2015, 6, 1),
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    }
    
    dag = DAG(
        'tutorial', default_args=default_args, schedule_interval=timedelta(1))
    
    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag)
    
    t2 = BashOperator(
        task_id='sleep',
        bash_command='sleep 5',
        retries=3,
        dag=dag)
    
    templated_command = """
        {% for i in range(5) %}
            echo "{{ ds }}"
            echo "{{ macros.ds_add(ds, 7)}}"
            echo "{{ params.my_param }}"
        {% endfor %}
    """
    
    t3 = BashOperator(
        task_id='templated',
        bash_command=templated_command,
        params={'my_param': 'Parameter I passed in'},
        dag=dag)
    
    t2.set_upstream(t1)
    t3.set_upstream(t1)
    

    为了验证其准确性,可以在本地的python开发环境下进行一次运行。

    官网给出的测试方法是:

    python ~/airflow/dags/tutorial.py
    

    对DAG进行查看:

    # print the list of active DAGs
    airflow list_dags
    
    # prints the list of tasks the "tutorial" dag_id
    airflow list_tasks tutorial
    
    # prints the hierarchy of tasks in the tutorial DAG
    airflow list_tasks tutorial --tree
    
    • 查看所有DAG
    • 查看某DAG中的tasks
      list_tasks 后填自己的DAG名,模板给的是tutorial
    • 查看tasks间的依赖关系:
    • 测试task
    airflow test tutorial templated 2015-06-01
    

    测试所有task无问题后,运行DAG

    首先启动airflow调度:
    airflow scheduler
    

    你之前创建的DAG就从local dag被调度到了平台中。

    运行DAG:

    将最前方的滑至On
    点击运行
    在点击查看图

    结果如下:

    然后点击task,再点击log就可以查看task的日志输出:

    样例结果:

    相关文章

      网友评论

          本文标题:Airflow 入门笔记

          本文链接:https://www.haomeiwen.com/subject/bjqdoftx.html