美文网首页计算机语言工具文
Apache Airflow单机/分布式环境搭建

Apache Airflow单机/分布式环境搭建

作者: 端碗吹水 | 来源:发表于2021-07-14 17:08 被阅读0次

    [TOC]


    Airflow简介

    Apache Airflow是一个提供基于DAG(有向无环图)来编排工作流的、可视化的分布式任务调度平台(也可单机),与Oozie、Azkaban等调度平台类似。Airflow在2014年由Airbnb发起,2016年3月进入Apache基金会,在2019年1月成为顶级项目。Airflow采用Python语言编写,并提供可编程方式定义DAG工作流(编写Python代码)。当工作流通过代码来定义时,它们变得更加可维护、可版本化、可测试和协作。

    Airflow的可视化界面提供了工作流节点的运行监控,可以查看每个节点的运行状态、运行耗时、执行日志等。也可以在界面上对节点的状态进行操作,如:标记为成功、标记为失败以及重新运行等。在Airflow中工作流上每个task都是原子可重试的,一个工作流某个环节的task失败可自动或手动进行重试,不必从头开始跑。

    Airflow通常用在数据处理领域,也属于大数据生态圈的一份子。当然Airflow也可以用于调度非数据处理的任务,只不过数据处理任务之间通常都会存在依赖关系。而且这个关系可能还比较复杂,用crontab等基础工具无法满足,因此更需要被调度平台编排和管理。例如:

    • 时间依赖:任务需要等待某一个时间点触发
    • 外部系统依赖:任务依赖外部系统需要调用接口去访问
    • 任务间依赖:任务 A 需要在任务 B 完成后启动,两个任务互相间会产生影响
    • 资源环境依赖:任务消耗资源非常多, 或者只能在特定的机器上执行

    Airflow的架构图如下:


    image.png
    • Metadata Database:Airflow的元数据库,用于Webserver、Executor及Scheduler存储各种状态数据,通常是MySQL或PostgreSQL
    • User Interface:用户界面,即前端web界面
    • Webserver:web服务器,用于提供用户界面的操作接口
    • Scheduler:调度器,负责处理触发调度的工作流,并将工作流中的任务提交给执行器处理
    • Executor:执行器,负责处理任务实例。在本地模式下会运行在调度器中,并负责所有任务实例的处理。但是大多数适合于生产的执行器实际上是一个消息队列(RabbitMQ、Redis),负责将任务实例推送给工作节点执行
    • Workers:工作节点,真正负责调起任务进程、执行任务的节点,worker可以有多个,是独立的进程
    • DAG Directory:存放DAG任务图定义的Python代码的目录,代表一个Airflow的处理流程。代码文件所在的位置通过Airflow配置dags_folder指定,需要保证执行器、调度器以及工作节点都能够访问到

    关于Airflow的更多内容可以参考官方文档:


    准备工作

    1、准备虚拟机或云服务环境,我这里使用的是本地的虚拟机:

    • 操作系统:CentOS7
    • CPU:8核
    • 内存:16G
    • 硬盘:20G
    • IP:192.168.243.175

    2、编译安装Python3,安装步骤可以参考下文:

    我这里安装的版本是3.9.1:

    [root@localhost ~]# python3 --version
    Python 3.9.1
    

    3、安装Docker环境,安装步骤可以参考下文:

    我这里安装的版本是19.03.12:

    [root@localhost ~]# docker version
    Client: Docker Engine - Community
     Version:           19.03.12
     API version:       1.40
     Go version:        go1.13.10
     Git commit:        48a66213fe
     Built:             Mon Jun 22 15:46:54 2020
     OS/Arch:           linux/amd64
     Experimental:      false
    
    Server: Docker Engine - Community
     Engine:
      Version:          19.03.12
      API version:      1.40 (minimum version 1.12)
      Go version:       go1.13.10
      Git commit:       48a66213fe
      Built:            Mon Jun 22 15:45:28 2020
      OS/Arch:          linux/amd64
      Experimental:     false
     containerd:
      Version:          1.2.13
      GitCommit:        7ad184331fa3e55e52b890ea95e65ba581ae3429
     runc:
      Version:          1.0.0-rc10
      GitCommit:        dc9208a3303feef5b3839f4323d9beb36df0a9dd
     docker-init:
      Version:          0.18.0
      GitCommit:        fec3683
    

    4、安装MySQL数据库,安装步骤可以参考下文或MySQL官方文档

    我这里安装的版本是8.0.21:

    > select version();
    8.0.21
    

    Airflow单机环境搭建

    完成准备工作后,我们就先来搭建Airflow的单机环境,先上官方文档:

    设置一下Airflow的文件存储目录:

    [root@localhost ~]# vim /etc/profile
    export AIRFLOW_HOME=/usr/local/airflow
    [root@localhost ~]# source /etc/profile
    

    Airflow的安装很简单,只需要一条命令就能完成:

    $ pip3 install "apache-airflow==2.1.0" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.1.0/constraints-3.9.txt" -i https://pypi.tuna.tsinghua.edu.cn/simple --default-timeout=6000
    

    安装完成后,执行如下命令初始化数据库:

    [root@localhost ~]# airflow db init
    Traceback (most recent call last):
      File "/usr/local/python/bin/airflow", line 5, in <module>
        from airflow.__main__ import main
      File "/usr/local/python/lib/python3.9/site-packages/airflow/__init__.py", line 34, in <module>
        from airflow import settings
      File "/usr/local/python/lib/python3.9/site-packages/airflow/settings.py", line 35, in <module>
        from airflow.configuration import AIRFLOW_HOME, WEBSERVER_CONFIG, conf  # NOQA F401
      File "/usr/local/python/lib/python3.9/site-packages/airflow/configuration.py", line 1115, in <module>
        conf = initialize_config()
      File "/usr/local/python/lib/python3.9/site-packages/airflow/configuration.py", line 877, in initialize_config
        conf.validate()
      File "/usr/local/python/lib/python3.9/site-packages/airflow/configuration.py", line 202, in validate
        self._validate_config_dependencies()
      File "/usr/local/python/lib/python3.9/site-packages/airflow/configuration.py", line 234, in _validate_config_dependencies
        import sqlite3
      File "/usr/local/python/lib/python3.9/sqlite3/__init__.py", line 23, in <module>
        from sqlite3.dbapi2 import *
      File "/usr/local/python/lib/python3.9/sqlite3/dbapi2.py", line 27, in <module>
        from _sqlite3 import *
    ModuleNotFoundError: No module named '_sqlite3'
    

    这时肯定会报错,因为我们还没有配置数据相关信息。之所以要先执行一下这条命令是为了让Airflow在我们设定的目录下生成配置文件:

    [root@localhost ~]# ls /usr/local/airflow/
    airflow.cfg  webserver_config.py
    [root@localhost ~]# 
    

    修改配置文件:

    [root@localhost ~]# vim /usr/local/airflow/airflow.cfg
    [core]
    dags_folder = /usr/local/airflow/dags
    default_timezone = Asia/Shanghai
    # 配置数据库
    sql_alchemy_conn = mysql+mysqldb://airflow:123456a.@192.168.1.7:3306/airflow?use_unicode=true&charset=utf8
    # Are DAGs paused by default at creation
    dags_are_paused_at_creation = False
    
    [webserver]
    default_ui_timezone = Asia/Shanghai
    # Default DAG view. Valid values are: ``tree``, ``graph``, ``duration``, ``gantt``, ``landing_times``
    dag_default_view = graph
    
    [scheduler]
    # How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
    dag_dir_list_interval = 30
    

    到MySQL上创建数据库和用户:

    CREATE DATABASE airflow CHARACTER SET utf8;
    create user 'airflow'@'%' identified by '123456a.';
    grant all privileges on airflow.* to 'airflow'@'%';
    flush privileges;
    
    • Tips:数据库编码需为utf8,否则Airflow初始化数据库时可能会失败

    安装MySQL客户端相关依赖包,需要具备如下依赖才能成功安装Python的mysqlclient库:

    [root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-devel-8.0.25-1.el7.x86_64.rpm
    [root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-libs-8.0.25-1.el7.x86_64.rpm
    [root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-client-plugins-8.0.25-1.el7.x86_64.rpm
    [root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-client-8.0.25-1.el7.x86_64.rpm
    [root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-common-8.0.25-1.el7.x86_64.rpm
    [root@localhost ~]# wget https://cdn.mysql.com//Downloads/MySQL-8.0/mysql-community-libs-compat-8.0.25-1.el7.x86_64.rpm
    [root@localhost ~]# yum install -y ./*.rpm
    

    安装gcc:

    [root@localhost ~]# yum install -y gcc make libffi-devel zlib*
    

    安装mysqlclient:

    [root@localhost ~]# pip3 install mysqlclient -i https://pypi.tuna.tsinghua.edu.cn/simple
    

    再次初始化数据库:

    [root@localhost ~]# airflow db init
    ...
    Initialization done
    

    初始化成功后,数据库表如下:


    image.png

    然后创建管理员用户:

    [root@localhost ~]# airflow users create \
        --username admin \
        --firstname Peter \
        --lastname Parker \
        --role Admin \
        --email spiderman@superhero.org
    

    启动webserver:

    [root@localhost ~]# airflow webserver --port 8080
    

    启动scheduler:

    [root@localhost ~]# airflow scheduler
    

    执行官方的示例任务,测试下Airflow是否已正常启动,如下输出success代表没问题:

    [root@localhost ~]# airflow tasks run example_bash_operator runme_0 2015-01-01
    [2021-06-19 21:44:47,149] {dagbag.py:487} INFO - Filling up the DagBag from /usr/local/airflow/dags
    Running <TaskInstance: example_bash_operator.runme_0 2015-01-01T00:00:00+08:00 [success]> on host localhost.localdomain
    [2021-06-19 21:44:47,763] {dagbag.py:487} INFO - Filling up the DagBag from /usr/local/python/lib/python3.9/site-packages/airflow/example_dags/example_bash_operator.py
    Running <TaskInstance: example_bash_operator.runme_0 2015-01-01T00:00:00+08:00 [success]> on host localhost.localdomain
    [root@localhost ~]# 
    

    Airflow的常用命令

    # 守护进程运行webserver
    $ airflow webserver -D       
    
    # 守护进程运行调度器
    $ airflow scheduler -D       
    
    # 守护进程运行调度器
    $ airflow worker -D          
    
    # 守护进程运行celery worker并指定任务并发数为1
    $ airflow worker -c 1 -D     
    
    # 暂停任务
    $ airflow pause $dag_id     
    
    # 取消暂停,等同于在管理界面打开off按钮
    $ airflow unpause $dag_id    
    
    # 查看task列表
    $ airflow list_tasks $dag_id  
    
    # 清空任务实例
    $ airflow clear $dag_id       
    
    # 运行整个dag文件
    $ airflow trigger_dag $dag_id -r $RUN_ID -e $EXEC_DATE  
    
    # 运行task
    $ airflow run $dag_id $task_id $execution_date       
    

    常用页面操作

    接着访问http://192.168.243.175:8080,登录airflow的用户界面:


    image.png

    登录成功,首页如下:


    image.png

    右上角可以选择时区:


    image.png

    页面上有些示例的任务,我们可以手动触发一些任务进行测试:


    image.png
    image.png

    点击具体的DAG,就可以查看该DAG的详细信息和各个节点的运行状态:


    image.png

    点击DAG中的节点,就可以对该节点进行操作:


    image.png

    自定义DAG

    接下来我们自定义一个简单的DAG给Airflow运行,创建Python代码文件:

    [root@localhost ~]# mkdir /usr/local/airflow/dags
    [root@localhost ~]# vim /usr/local/airflow/dags/my_dag_example.py
    

    代码示例:

    from datetime import timedelta
    
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.utils.dates import days_ago
    
    # 默认参数
    args = {
        'owner': 'admin',
    }
    
    with DAG(
            dag_id='my_dag_example',
            default_args=args,
            schedule_interval='@once',
            start_date=days_ago(2),
            dagrun_timeout=timedelta(minutes=60),
            tags=['my_dag'],
            params={"example_key": "example_value"}
    ) as dag:
        # 定义DAG中的节点
        first = BashOperator(
            task_id='first',
            bash_command='echo "run first task"',
        )
        middle = BashOperator(
            task_id='middle',
            bash_command='echo "run middle task"',
        )
        last = BashOperator(
            task_id='last',
            bash_command='echo "run last task"',
        )
        
        # 定义节点的上下游关系
        first >> middle >> last
    

    等待一会在Web界面上可以看到我们自定义的DAG任务已经被运行完了,因为比较简单,所以执行得很快:


    image.png

    查看下节点的关系是否与我们在代码中定义的一样:


    image.png

    关于DAG的代码定义可以参考官方的示例代码和官方文档,自带的例子在如下目录:

    • /usr/local/python/lib/python3.9/site-packages/airflow/example_dags

    Airflow分布式环境搭建

    如果Airflow要支持分布式的话,需要安装RabbitMQ或Redis作为Airflow的Executor,安装步骤可以参考下文:

    文本采用的是RabbitMQ,版本为3.8.9。若只是测试的话可以使用Docker快速安装,如下:

    [root@localhost ~]# docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.9-management
    [root@localhost ~]# docker exec -it rabbitmq bash
    root@49c8ebed2525:/# rabbitmqctl add_user airflow password  # 添加用户
    root@49c8ebed2525:/# rabbitmqctl add_vhost airflow_vhost   # 添加虚拟主机
    root@49c8ebed2525:/# rabbitmqctl set_user_tags airflow airflow_vhost  # 为用户绑定虚拟主机
    root@49c8ebed2525:/# rabbitmqctl set_user_tags airflow administrator  # 设置用户权限为管理员
    root@49c8ebed2525:/# rabbitmqctl  set_permissions -p airflow_vhost airflow '.*' '.*' '.*' # 设置远程登录权限
    

    在分布式这一环节我们使用Docker来部署,因为容器的弹性能力更强,而且部署方便,可以快速扩展多个worker。首先,拉取airflow的docker镜像:

    [root@localhost ~]# docker pull apache/airflow
    

    拷贝之前本地安装时生成的airflow配置文件:

    [root@localhost ~]# cp /usr/local/airflow/airflow.cfg ~
    [root@localhost ~]# vim airflow.cfg
    

    然后修改配置文件的内容如下:

    [core]
    # 存放dag定义文件的目录
    dags_folder = /opt/airflow/dags
    default_timezone = Asia/Shanghai
    # 配置数据库
    sql_alchemy_conn = mysql+mysqldb://airflow:123456a.@192.168.1.7:3306/airflow?use_unicode=true&charset=utf8
    # The executor class that airflow should use
    executor = CeleryExecutor
    # Are DAGs paused by default at creation
    dags_are_paused_at_creation = False
    plugins_folder = /opt/airflow/plugins
    
    [webserver]
    default_ui_timezone = Asia/Shanghai
    # Default DAG view. Valid values are: ``tree``, ``graph``, ``duration``, ``gantt``, ``landing_times``
    dag_default_view = graph
    
    [scheduler]
    # How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
    dag_dir_list_interval = 30
    child_process_log_directory = /opt/airflow/logs/scheduler
    
    [logging]
    base_log_folder = /opt/airflow/logs
    dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log
    
    [celery]
    # worker的并发度,worker可以执行的任务实例的数量
    worker_concurrency = 16
    # worker日志服务的端口
    worker_log_server_port = 8795
    # RabbitMQ的连接地址
    broker_url = amqp://airflow:password@192.168.243.175:5672/airflow_vhost
    result_backend = db+mysql://airflow:123456a.@192.168.1.7:3306/airflow?use_unicode=true&charset=utf8
    flower_host = 0.0.0.0
    flower_port = 5555
    
    [operators]
    default_queue = airflow_queue
    

    创建一个airflow专属的docker网络,为了启动容器时能够指定各个节点的ip以及设置host,也利于与其他容器的网络隔离:

    [root@localhost ~]# docker network create --driver bridge --subnet=172.18.12.0/16 --gateway=172.18.1.1 airflow
    

    然后从镜像中创建各个节点的容器,注意ip和host的设置:

    [root@localhost ~]# docker run -d -p 8080:8080 --name airflow_webserver \
    --network=airflow --ip 172.18.12.1 --hostname airflow_webserver \
    --add-host=airflow_scheduler:172.18.12.2 --add-host=airflow_flower:172.18.12.3 \
    --add-host=airflow_worker1:172.18.12.4 --add-host=airflow_worker2:172.18.12.5 \
    apache/airflow webserver
    
    [root@localhost ~]# docker run -d --name airflow_scheduler \
    --network=airflow --ip 172.18.12.2 --hostname airflow_scheduler \
    --add-host=airflow_webserver:172.18.12.1 --add-host=airflow_flower:172.18.12.3 \
    --add-host=airflow_worker1:172.18.12.4 --add-host=airflow_worker2:172.18.12.5 \
    apache/airflow scheduler
    
    [root@localhost ~]# docker run -d -p 5555:5555 --name airflow_flower \
    --network=airflow --ip 172.18.12.3 --hostname airflow_flower \
    --add-host=airflow_webserver:172.18.12.1 --add-host=airflow_scheduler:172.18.12.2 \
    --add-host=airflow_worker1:172.18.12.4 --add-host=airflow_worker2:172.18.12.5 \
    apache/airflow celery flower
    
    [root@localhost ~]# docker run -d -p 8795:8795 --name airflow_worker1 \
    --network=airflow --ip 172.18.12.4 --hostname airflow_worker1 \
    --add-host=airflow_webserver:172.18.12.1 --add-host=airflow_flower:172.18.12.3 \
    --add-host=airflow_scheduler:172.18.12.2 --add-host=airflow_worker2:172.18.12.5 \
    apache/airflow celery worker
    
    [root@localhost ~]# docker run -d -p 8796:8795 --name airflow_worker2 \
    --network=airflow --ip 172.18.12.5 --hostname airflow_worker2 \
    --add-host=airflow_webserver:172.18.12.1 --add-host=airflow_flower:172.18.12.3 \
    --add-host=airflow_worker1:172.18.12.4 --add-host=airflow_scheduler:172.18.12.2 \
    apache/airflow celery worker
    

    将宿主机上修改后的配置文件替换容器内的配置文件:

    [root@localhost ~]# docker cp ./airflow.cfg airflow_webserver:/opt/airflow/airflow.cfg
    [root@localhost ~]# docker cp ./airflow.cfg airflow_scheduler:/opt/airflow/airflow.cfg
    [root@localhost ~]# docker cp ./airflow.cfg airflow_flower:/opt/airflow/airflow.cfg
    [root@localhost ~]# docker cp ./airflow.cfg airflow_worker1:/opt/airflow/airflow.cfg
    [root@localhost ~]# docker cp ./airflow.cfg airflow_worker2:/opt/airflow/airflow.cfg
    

    删除之前部署单机版时产生的数据表,然后重新执行数据库的初始化:

    [root@localhost ~]# airflow db init
    

    由于删除了之前的数据,所以需要重新创建airflow的管理员用户:

    [root@localhost ~]# airflow users create \
        --username admin \
        --firstname Peter \
        --lastname Parker \
        --role Admin \
        --email spiderman@superhero.org
    

    然后重启各个节点:

    [root@localhost ~]# docker restart airflow_webserver
    [root@localhost ~]# docker restart airflow_scheduler
    [root@localhost ~]# docker restart airflow_flower
    [root@localhost ~]# docker restart airflow_worker1
    [root@localhost ~]# docker restart airflow_worker2
    

    通过docker ps确认各个节点都启动成功后,访问flower的web界面,可以查看在线的worker信息,以确认worker的存活状态:

    image.png

    然后访问webserver的web界面,确认能正常访问:


    image.png

    由于容器内的/opt/airflow/dags目录下没有任何文件,所以webserver的界面是空的。现在我们将之前编写的dag文件拷贝到容器内。注意,dag文件需要同步到所有的scheduler和worker节点,并且要保证airflow对该文件有足够的权限。如下示例:

    [root@localhost ~]# chmod 777 /usr/local/airflow/dags/my_dag_example.py  # 为了避免权限问题,这里直接放开所有权限
    [root@localhost ~]# docker cp /usr/local/airflow/dags/my_dag_example.py airflow_worker1:/opt/airflow/dags/my_dag_example.py    # 先拷贝到worker节点,如果先拷贝到scheduler节点会触发调度,此时worker节点没相应的dag文件就会报错
    [root@localhost ~]# docker cp /usr/local/airflow/dags/my_dag_example.py airflow_worker2:/opt/airflow/dags/my_dag_example.py
    [root@localhost ~]# docker cp /usr/local/airflow/dags/my_dag_example.py airflow_scheduler:/opt/airflow/dags/my_dag_example.py
    

    同步完dag文件后,等待一会可以看到任务被调度起来了:


    image.png

    运行成功:


    image.png

    进入graph view界面查看各个节点的状态:


    image.png

    查看first节点的日志信息,看看是否被正确调度到worker上了。可以看到,该节点被调度到了airflow_worker2上:

    image.png

    middle节点则被调度到了airflow_worker1上:

    image.png

    至此,我们就完成了airflow分布式环境的搭建和验证。但是还有一些不完美,就是在这个架构下webserver和scheduler有单点故障问题,不具备高可用性。不过在较新的版本中这个问题也比较好解决,webserver和scheduler都启动多个节点就好了,不像在老版本中为了让scheduler节点高可用还要做额外的特殊处理。关于scheduler的高可用说明可以参考官方文档:

    相关文章

      网友评论

        本文标题:Apache Airflow单机/分布式环境搭建

        本文链接:https://www.haomeiwen.com/subject/afgypltx.html