美文网首页Airflow
ETL工具之Airflow

ETL工具之Airflow

作者: 洛奇lodge | 来源:发表于2019-08-05 18:17 被阅读0次

    概念

    Airflow是Airbnb 开源的一个用 Python 编写的调度工具。于 2014 年启动,2015 年春季开源,2016 年加入 Apache 软件基金会的孵化计划。Airflow 是一个ETL数据流处理的工作平台,集中便式管理任务。学习airflow,需要掌握几个关键词概念。

    • Dag
      Dag是一个计划或项目,是一个有向无循环的图,在代码层面结构上解释,就一个文件或一个目录,里面包含多个任务,且一个dag都有唯一的id。
    • Task
      task是Dag里最小的单元,是Dag的实例化,task之间存在依赖关系,每一个task执行都有对应的日志存在
    • Webserver
      Airflow 提供了一个可视化的 Web 界面。启动 WebServer 后,就可以在 Web 界面上查看定义好的 DAG 并监控及改变运行状况。也可以在 Web 界面中对任务的状态进行设置,方便控制任务的运行和停止
    • Scheduler
      整个 Airflow 的调度由 Scheduler 负责发起,每隔一段时间 Scheduler 就会检查所有定义完成的 DAG 和定义在其中的作业,如果有符合运行条件的作业,Scheduler 就会发起相应的作业任务以供 Worker 接收

    安装

    # 导入变量
    export SLUGIFY_USES_TEXT_UNIDECODE=yes
    # 安装airflow
    pip3 install apache-airflow
    
    # 检测airflow是否安装成功
    airflow initdb
    airflow webserver
    airflow scheduler
    
    # 使用mysql数据库,需要更改数据库配置文件,方可与airflow搭配使用
    explicit_defaults_for_timestamp = 1
    

    应用

    1、dag参数设定
    default_args = {
        'owner': 'airflow', # 这个DAG的所有者,会在Web UI上显示,主要用于方便管理
        'depends_on_past': False,  # 是否依赖于过去。如果为True,那么必须要昨天的DAG执行成功了,今天的DAG才能执行。
        'start_date': datetime(2015, 6, 1), # DAG的开始时间,比如这里就是从2015年6月1日开始执行第一个DAG。这个参数会影响到部署上线时回填DAG的数量。一般建议写成上线时间的前一天(因为这里的start_date指的是execute_date,而Airflow执行的逻辑是,今天的同一时间执行昨天的任务,比如execute_date=2018-03-01,每天凌晨3点执行,则会在2018-03-02 03:00:00启动这个DAG。特别地,这个参数必须一个datetime对象,不可以用字符串。
        'email': ['airflow@example.com'],# 出问题时,发送报警Email的地址,可以填多个,用逗号隔开。
        'email_on_failure': False, # 任务失败且重试次数用完时是否发送Email,推荐填True。
        'email_on_retry': False, # 任务重试时是否发送Email
        'retries': 1, # 任务失败后的重试次数
        'retry_delay': timedelta(minutes=5), # 重试间隔,必须是timedelta对象。
        # 'queue': 'bash_queue', 队列,默认是default,决定实际执行任务会发送到哪个worker
        # 'pool': 'backfill',  pool是一个分类限制并发量的设计,目前来说可以忽略,默认所有的Task都在一个pool里。
        # 'priority_weight': 10, 优先级权重,在任务需要排队时而你需要优先执行某些任务时会有用
        # 'end_date': datetime(2016, 1, 1), # 结束时间,一般线上任务都会一直跑下去,所以没必要设置。
    }
    
    # 第一个参数固定为dag的名字,schedule_interval为执行时间间隔,同crontab的语法,在这个例子中表示每天凌晨3点执行
    dag = DAG('tutorial', default_args=default_args,schedule_interval=timedelta(hours=1))
    
    2、任务失败回调
    task_test = PythonOperator(
        task_id='task_test',  # 实例化任务的id
        python_callable=func_test,  # 调用任务函数
        on_failure_callback=func_failure,  # 任务失败之后回调函数
        op_kwargs={'params': 'test'},  # 传递给任务函数的参数
        dag=dag)
    
    2、控制DAG运行
    2.1、界面控制
    2.2、API控制,通过接口请求运行dag
    curl -X POST \
      http://localhost:8080/api/experimental/dags/<DAG_ID>/dag_runs \
      -H 'Cache-Control: no-cache' \
      -H 'Content-Type: application/json' \
      -d '{"conf":"{\"key\":\"value\"}"}'
    
    2.3、自身控制,即dag控制某个dag运行
    from airflow.operators.dagrun_operator import TriggerDagRunOperator
    trigger = TriggerDagRunOperator(
                          task_id='test_trigger_dagrun',   
                          trigger_dag_id="example_trigger_target_dag",   # 指定运行的dag  
                          dag=dag)
    
    2.4、如果当前dag运行着,过滤最新的dag
    from airflow.operators.python_operator import ShortCircuitOperator
    # 定义一个任务
    def test(ds, **context):
        if context['dag_run'] and context['dag_run'].external_trigger:
            return True
    
        session = settings.Session()
        count = session.query(DagRun).filter(
            DagRun.dag_id == context['dag'].dag_id,
            DagRun.state.in_(['running']),
        ).count()  // 检测该 dag启动运行的数量
        session.close()
        if  count > 1
            return False
        else:
            return True
    
    # 实例化任务
    task_test = ShortCircuitOperator(
        task_id='task_test',
        provide_context=True,
        python_callable=test,
        dag=dag
    )
    
    3、任务之间数据交互
    3.1、使用xcom
    def upper():
        """上游任务"""
        data = "test data"
        return data 
    
    def lower(**kwargs):
        """下游任务"""    
        # 获取上个任务的返回值
        data = kwargs['task_instance'].xcom_pull(task_ids='upper')
    
    task_upper = PythonOperator(
        task_id='task_upper',
        python_callable=upper,
        dag=dag)
    
    task_lower = PythonOperator(
        task_id='task_lower',
        python_callable=lower,
        provide_context=True,  # 必须设置该参数,函数才能接受**kwargs
        dag=dag)
    
    3.2、临时文件/数据库
    • 可以将任务产生的数据写入文件或者数据库,下一个任务需要获取这数据时候,将数据库或文件路径以参数形式传递给下一个任务
    4、config.cfg配置文件
    [core]
    # airflow的项目根目录
    airflow_home = /airflow
    # airflow的项目dag目录
    dags_folder = /airflow/dags
    # airflow的项目日志目录
    base_log_folder = /airflow/logs
    # 远程日志设置
    remote_logging = False
    remote_log_conn_id =
    remote_base_log_folder =
    encrypt_s3_logs = False
    # 日志等级
    logging_level = INFO
    fab_logging_level = WARN
    # 日志输出格式
    log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
    simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
    # 日志文件命名的格式
    log_filename_template = {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
    log_processor_filename_template = {{ filename }}.log
    # 设置时区
    default_timezone = Asia/Shanghai
    # 选择执行器,SequentialExecutor顺序执行,LocalExecutor本机进程执行,CeleryExecutor队列执行
    executor = CeleryExecutor
    # 配置数据库
    sql_alchemy_conn = mysql://账号:密码@ip:端口/数据库?charset=utf8
    # 数据库编码方式
    sql_engine_encoding = utf-8
    # 是否开启池连接
    sql_alchemy_pool_enabled = True
    # 定义池的连接数量
    sql_alchemy_pool_size = 5
    # 客户端断开连接,多久回收
    sql_alchemy_pool_recycle = 1800
    # 客户端连接超时设置
    sql_alchemy_reconnect_timeout = 300
    # 针对于执行器LocalExecutor,设置执行器的任务并发数量
    parallelism = 4
    # 设置允许任务并发
    dag_concurrency = 16
    # 同一个dag并行运行最大数量
    max_active_runs_per_dag = 1
    # True为加载airflow例子dag,Flase为不加载
    load_examples = False
    
    [webserver]
    base_url = http://localhost:8080
    web_server_host = 0.0.0.0
    web_server_port = 8080
    # webserver每次检测worker的个数。发现新的worker并杀死旧的worker
    worker_refresh_batch_size = 1
    # 检测worker时间间隔
    worker_refresh_interval = 30
    # webserver进程
    workers = 4
    # webserver日志文件输出的位置,需要指定文件路径
    access_logfile = -
    error_logfile = -
    # Consistent page size across all listing views in the UI
    # 每页展示dag最大数量
    page_size = 100
    # 导航栏颜色设定,十六进制格式
    navbar_color = #007A87
    # dag加载显示在页面的时间
    default_dag_run_display_number = 25
    
    [celery]
    # 设置工人数量
    worker_concurrency = 4
    # 使用CeleryExecutor执行器,必须配置消息队列,使用redis或rabbitmq
    broker_url = redis://:密码@ip:端口/1
    result_backend = redis://:密码@ip:端口/1
    
    [scheduler]
    # 当清除某个任务状态之后,该任务被扫描到的执行频率
    job_heartbeat_sec = 5
    # 上游任务完成之后,下游任务开始执行,这段时间间隔,可以为任务之间频率
    scheduler_heartbeat_sec = 10
    # True默认dag回填,Flask不回填
    catchup_by_default = True
    # scheduler线程数量
    max_threads = 10
    

    部署

    1、docker部署

    LocalExecutor部署

    1、修改airflow.cfg
      # 更改数据库,使用mysql
      sql_alchemy_conn = mysql://账号密码@ip:端口/db?charset=utf8
      # 执行器选用LocalExecutor或CeleryExecutor,这里暂选用LocalExecutor
      executor = LocalExecutor
      # web界面不加载dag自带例子
      load_examples = False
      # 允许dag并行数量
      max_active_runs_per_dag = 1
      # 限制本地进程数量
      parallelism = 4
    
    2、创建容器
      sudo docker run -itd --name=airflow \ # 容器名
        -v /home/python/airflow:/airflow \  # 将映射宿主机的目录映射到容器,必须使用绝对路径
        -p 8080:8080 \  # 端口映射,外部访问端口是8080
        airflow_image  # 镜像名
    
    3、运行服务
      sudo docker exec -ti airflow airflow initdb
      sudo docker exec -tid airflow airflow webserver
      sudo docker exec -tid airflow airflow schedluer
    

    CeleryExecutor部署

    1、修改airflow.cfg
      # 更改数据库,使用mysql
      sql_alchemy_conn = mysql://账号密码@ip:端口/db?charset=utf8
      # 执行器选用CeleryExecutor
      executor = CeleryExecutor
      # web界面不加载dag自带例子
      load_examples = False
      # 允许dag并行数量
      max_active_runs_per_dag = 1
      # 工人数量,意思几个进程执行
      worker_concurrency = 4
      # 作为celery消息队列
      broker_url = redis://ip地址:6379/0
      # 存储worker执行结果的状态
      result_backend = redis://ip地址:6379/0
    
    2、创建容器
      sudo docker run -itd --name=airflow \ # 容器名
        -v /home/python/airflow:/airflow \  # 将映射宿主机的目录映射到容器,必须使用绝对路径
        -p 8080:8080 \  # 端口映射,外部访问端口是8080
        -e C_FORCE_ROOT=True \  # 解决celery不能用root用户运行问题
        airflow_image  # 镜像名
    
    3、安装依赖包
      pip3 install airflow[celery]
      pip3 install airflow[redis]
    
    4、运行服务
      sudo docker exec -ti airflow airflow initdb
      sudo docker exec -tid airflow airflow webserver
      sudo docker exec -tid airflow airflow schedluer
      sudo docker exec -tid airflow airflow worker
    
    2、分布式部署
    • 分布式部署适用于CeleryExecutor部署,在一台主机上如图部署,需要开启scheduler和webserver,woker服务。另外一台主机上只需要开启woker服务即可,完成简单的分布式部署。

    参考文档

    airflow官方文档 http://airflow.apache.org/tutorial.html

    相关文章

      网友评论

        本文标题:ETL工具之Airflow

        本文链接:https://www.haomeiwen.com/subject/kivpdctx.html