美文网首页
2. airflow配置

2. airflow配置

作者: 路小漫 | 来源:发表于2018-07-11 14:59 被阅读0次

    在快速启动部分中设置很简单,构建生产级环境需要更多的工作,下面来了解一下。

    1. 设置配置选项

    第一次运行Airflow时,它将在$AIRFLOW_HOME目录(默认情况下是~/airflow)中创建一个名为airflow.cfg的文件。此文件包含Airflow的配置,您可以编辑它以更改任何设置。还可以使用以下格式设置环境变量选项:$AIRFLOW__{SECTION}__{KEY}(注意双下划线)。

    例如,元数据数据库连接可以在airflow.cfg中设置如下:

    [core]
    sql_alchemy_conn = my_conn_string
    

    或者通过创建相应的环境变量:

    AIRFLOW__CORE__SQL_ALCHEMY_CONN=my_conn_string
    

    还可以在运行时将连接字符串_cmd附加到键中,如下所示:

    [core]
    sql_alchemy_conn_cmd = bash_command_to_run
    

    但是只有三个这样的配置元素sql_alchemy_conn,broker_urlcelery_result_backend可以作为命令获取。这背后的想法是不要将密码存储在纯文本文件的框中。优先级的顺序如下:

    1. 环境变量
    2. airflow.cfg中的配置
    3. airflow.cfg中的命令
    4. 默认

    2. 设置后端

    如果您想要对airflow进行真正的测试,您应该考虑建立一个真正的数据库后端并切换到LocalExecutor。

    airflow内置数据库是SqlAlchemy库,与其元数据交互时,您应该能够使用任何类似SqlAlchemy的数据库作为后端支持的数据库。我们建议使用MySQL或Postgres。

    一旦你为Airflow安装了数据库,就需要修改配置文件$AIRFLOW_HOME/airflow.cfg中的SqlAlchemy连接字符串。然后,您还应该将“ executor”设置更改为使用“LocalExecutor”,这是一个可以在本地并行任务实例的执行器。

    # initialize the database 初始化数据库
    airflow initdb
    

    3. 连接

    Airflow需要知道如何连接到你的环境。主机名、端口、登录ID和密码等信息在UI的Admin->Connection部分中处理。你创建的工作流代码将引用连接对象的“conn_id”。

    image.png

    默认情况下,Airflow将在元数据库中以纯文本形式保存连接的密码。在安装过程中强烈推荐使用密码包cryptocrypto密码包确实要求您的操作系统安装libffi-dev。

    如果最初没有安装密码包crypto,则仍然可以通过以下步骤启用连接的加密:
    (1). 安装密码软件包pip install apache-airflow[crypto]
    (2). 使用下面的代码段生成fernet_key。fernet_key必须是base 64编码的32字节密钥。

    from cryptography.fernet import Fernet
    fernet_key= Fernet.generate_key()
    print(fernet_key) # your fernet_key, keep it in secured place!
    

    (3). 将airflow.cfg中fernet_key的值替换为步骤2中的值。或者,您可以将您的fernet_key存储在OS的环境变量中。在这种情况下,您不需要更改airflow.cfg,因为AirFlow 会优先使用环境变量,而不是在airflow.cfg中的值:

    # Note the double underscores
    EXPORT AIRFLOW__CORE__FERNET_KEY = your_fernet_key
    

    (4). 重新启动AirFlow网络服务器。
    (5). 对于现有的连接(在安装airflow[crypto]和创建Fernet Key之前定义的连接),您需要在连接管理UI中打开每个连接,重新键入密码并保存它。

    可以使用环境变量创建AirFlow工作流中的连接。为了正确地使用连接,环境变量需要有一个前缀‘AIRFLOW_CONN_’代表是AirFlow的环境变量,其值为URI格式。有关环境变量和连接的更多信息,请参见概念文档

    4. 用Celery扩大规模

    CeleryExecutor是您扩展worker的数量的方法之一。 为此,您需要设置Celery后端(RabbitMQ,Redis,...)并更改airflow.cfg以将执行程序参数指向CeleryExecutor并提供相关的Celery设置。

    有关设置Celery代理的更多信息,请参阅有关该主题的详尽Celery文档.

    以下是您的workers的一些必要要求:

    • 需要安装airflow,CLI需要在路径中
    • 整个群集中的airflow配置设置应该是同构的
    • 在worker上执行的操作符需要在该上下文中满足其依赖项。 例如,如果您使用HiveOperator,则需要在该box上安装hive CLI,或者如果您使用MySqlOperator,则必须以某种方式在PYTHONPATH中提供所需的Python库
    • worker需要访问其DAGS_FOLDER,您需要通过自己的方式同步文件系统。 常见的设置是将DAGS_FOLDER存储在Git存储库中,并使用Chef,Puppet,Ansible或用于配置环境中的计算机的任何内容在计算机之间进行同步。 如果您的所有boxes都有一个共同的挂载点,那么共享您的工作流文件也应该可以正常工作

    要启动worker,您需要设置Airflow并启动worker子命令:

    airflow worker
    

    worker一旦被删除, 就应该立即收起任务。(我真心翻译不来了

    请注意,您还可以运行“Celery Flower”,这是一个建立在Celery之上的Web UI,用于监控您的worker。 您可以使用快捷命令airflow flower启动Flower Web服务器。

    5. 用Dask扩大规模

    DaskExecutor允许您在Dask分布式群集中运行Airflow任务。

    Dask集群可以在单个机器上运行,也可以在远程网络上运行。 有关完整详细信息,请参阅分布式文档.

    要创建集群,首先启动调度程序:

    # default settings for a local cluster
    DASK_HOST=127.0.0.1
    DASK_PORT=8786
    
    dask-scheduler --host $DASK_HOST --port $DASK_PORT
    

    接下来,在任何可以连接到主机的计算机上启动至少一个Worker:

    dask-worker $DASK_HOST:$DASK_PORT
    

    Edit your airflow.cfg to set your executor to DaskExecutor and provide the Dask Scheduler address in the [dask] section.

    编辑airflow.cfg以将执行程序设置为DaskExecutor,并在[dask]部分中提供Dask Scheduler地址。

    请注意:

    • 每个Dask worker 必须能够导入Airflow和需要的任何依赖项。
    • Dask不支持队列。 如果使用队列创建了Airflow任务,则会引发警告,但该任务将提交给集群。

    6. 日志

    用户可以在airflow.cfg中指定日志文件夹。 默认情况下,它位于AIRFLOW_HOME目录中。

    此外,用户可以提供远程位置,以便在云存储中存储日志和日志备份。 目前,支持Amazon S3和Google Cloud Storage。 要启用此功能,必须按照此示例配置airflow.cfg

    [core]
    # Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
    # must supply a remote location URL (starting with either 's3://...' or
    # 'gs://...') and an Airflow connection id that provides access to the storage
    # location.
    remote_base_log_folder = s3://my-bucket/path/to/logs
    remote_log_conn_id = MyS3Conn
    # Use server-side encryption for logs stored in S3
    encrypt_s3_logs = False
    

    远程日志记录使用现有的Airflow连接来读取/写入日志。 如果没有正确设置连接,则会失败。 在上面的例子中,Airflow将尝试使用S3Hook('MyS3Conn')。

    在Airflow Web UI中,本地日志优先于远程日志。 如果找不到或访问本地日志,将显示远程日志。 请注意,只有在任务完成(包括失败)后才会将日志发送到远程存储。 换句话说,运行任务的远程日志不可用。 日志作为{dag_id} / {task_id} / {execution_date} / {try_number} .log存储在日志文件夹中。

    7. 扩展Mesos(社区贡献)

    MesosExecutor允许您在Mesos群集上安排Airflow任务。 为此,您需要一个正在运行的mesos集群,并且必须执行以下步骤 :
    (1). 在一个运行Web服务器和调度程序的计算机上安装Airflow,我们将其称为“Airflow server”。
    (2). 在Airflow服务器上,从mesos下载安装mesos python eggs。
    (3). 在Airflow服务器上,使用可以从mesos slave机器访问的数据库(例如mysql)并在airflow.cfg中添加配置。
    (4). 将airflow.cfg更改为指向MesosExecutor的point executor参数,并提供相关的Mesos设置。
    (5). 在所有mesos slaves上,安装airflow。 从Airflow服务器复制airflow.cfg(以便它使用相同的sql连接)。
    (6). 在所有mesos slave服务器上,运行以下服务日志:

    airflow serve_logs
    

    (7). 在Airflow服务器上,要开始在mesos上处理/调度DAG,请运行:

    airflow scheduler -p
    

    注意:我们需要-p参数来挑选DAGs。

    您现在可以在mesos UI中查看Airflow框架和相应的任务。 气流任务的日志可以像往常一样在Airflow UI中查看。

    有关mesos的更多信息,请参阅mesos文档。 有关MesosExecutor的任何疑问/错误,请联系@ kapil-malik

    8. 与systemd集成

    Airflow可以与基于系统的系统集成。 这使得观察您的守护进程变得容易,因为systemd可以在失败时重新启动守护进程。 在scripts / systemd目录中,您可以找到已在基于Redhat的系统上测试过的单元文件。 您可以将它们复制到/ usr / lib / systemd / system。 假设Airflow将在airflow:airflow下运行。 如果不是(或者如果您在非基于Redhat的系统上运行),则可能需要调整单元文件。

    / etc / sysconfig / airflow中获取环境配置。 提供了一个示例文件。 运行调度程序时,请确保在此文件中指定SCHEDULER_RUNS变量。 您也可以在此处定义,例如AIRFLOW_HOMEAIRFLOW_CONFIG

    9. 与upstart集成

    Airflow可以与基于upstart的系统集成。 Upstart会在系统启动时自动启动/ etc / init中具有相应* .conf文件的所有Airflow服务。 失败时,upstart会自动重启进程(直到达到* .conf文件中设置的重新生成限制)。

    您可以在scripts / upstart目录中找到示例upstart作业文件。 这些文件已在Ubuntu 14.04 LTS上测试过。 您可能需要调整start onstop on以使其适用于其他upstart系统。 script / upstart / README中列出了一些可能的选项。

    根据需要修改* .conf文件并复制到/ etc / init目录。 假设airflow将在airflow:airflow下运行。 如果您使用其他用户/组,请在* .conf文件中更改setuidsetgid

    您可以使用initctl手动启动,停止,查看已与upstart集成的airflow进程的状态

    initctl airflow-webserver status
    

    10. 测试模式

    Airflow具有一组固定的“测试模式”配置选项。 您可以随时通过调用airflow.configuration.load_test_config()来加载它们(注意此操作不可逆!)。 但是,在您有机会调用load_test_config()之前,会加载一些选项(如DAG_FOLDER)。 为了快速加载测试配置,请在airflow.cfg中设置test_mode:

    [tests]
    unit_test_mode = True
    

    由于Airflow的自动环境变量扩展(请参阅设置配置选项),您还可以设置env var AIRFLOW__CORE__UNIT_TEST_MODE以临时覆盖airflow.cfg

    相关文章

      网友评论

          本文标题:2. airflow配置

          本文链接:https://www.haomeiwen.com/subject/lrzzuftx.html