在快速启动部分中设置很简单,构建生产级环境需要更多的工作,下面来了解一下。
1. 设置配置选项
第一次运行Airflow时,它将在$AIRFLOW_HOME
目录(默认情况下是~/airflow)中创建一个名为airflow.cfg
的文件。此文件包含Airflow的配置,您可以编辑它以更改任何设置。还可以使用以下格式设置环境变量选项:$AIRFLOW__{SECTION}__{KEY}
(注意双下划线)。
例如,元数据数据库连接可以在airflow.cfg
中设置如下:
[core]
sql_alchemy_conn = my_conn_string
或者通过创建相应的环境变量:
AIRFLOW__CORE__SQL_ALCHEMY_CONN=my_conn_string
还可以在运行时将连接字符串_cmd
附加到键中,如下所示:
[core]
sql_alchemy_conn_cmd = bash_command_to_run
但是只有三个这样的配置元素sql_alchemy_conn
,broker_url
和celery_result_backend
可以作为命令获取。这背后的想法是不要将密码存储在纯文本文件的框中。优先级的顺序如下:
- 环境变量
- airflow.cfg中的配置
- airflow.cfg中的命令
- 默认
2. 设置后端
如果您想要对airflow进行真正的测试,您应该考虑建立一个真正的数据库后端并切换到LocalExecutor。
airflow内置数据库是SqlAlchemy库,与其元数据交互时,您应该能够使用任何类似SqlAlchemy的数据库作为后端支持的数据库。我们建议使用MySQL或Postgres。
一旦你为Airflow安装了数据库,就需要修改配置文件$AIRFLOW_HOME/airflow.cfg
中的SqlAlchemy连接字符串。然后,您还应该将“ executor”设置更改为使用“LocalExecutor”,这是一个可以在本地并行任务实例的执行器。
# initialize the database 初始化数据库
airflow initdb
3. 连接
Airflow需要知道如何连接到你的环境。主机名、端口、登录ID和密码等信息在UI的Admin->Connection
部分中处理。你创建的工作流代码将引用连接对象的“conn_id”。
![](https://img.haomeiwen.com/i3559582/da699a2239e7fdbe.png)
默认情况下,Airflow将在元数据库中以纯文本形式保存连接的密码。在安装过程中强烈推荐使用密码包crypto
。crypto
密码包确实要求您的操作系统安装libffi-dev。
如果最初没有安装密码包crypto
,则仍然可以通过以下步骤启用连接的加密:
(1). 安装密码软件包pip install apache-airflow[crypto]
(2). 使用下面的代码段生成fernet_key。fernet_key必须是base 64编码的32字节密钥。
from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key) # your fernet_key, keep it in secured place!
(3). 将airflow.cfg
中fernet_key的值替换为步骤2中的值。或者,您可以将您的fernet_key存储在OS的环境变量中。在这种情况下,您不需要更改airflow.cfg
,因为AirFlow 会优先使用环境变量,而不是在airflow.cfg
中的值:
# Note the double underscores
EXPORT AIRFLOW__CORE__FERNET_KEY = your_fernet_key
(4). 重新启动AirFlow网络服务器。
(5). 对于现有的连接(在安装airflow[crypto]
和创建Fernet Key之前定义的连接),您需要在连接管理UI中打开每个连接,重新键入密码并保存它。
可以使用环境变量创建AirFlow工作流中的连接。为了正确地使用连接,环境变量需要有一个前缀‘AIRFLOW_CONN_’代表是AirFlow的环境变量,其值为URI格式。有关环境变量和连接的更多信息,请参见概念文档。
4. 用Celery扩大规模
CeleryExecutor
是您扩展worker的数量的方法之一。 为此,您需要设置Celery后端(RabbitMQ,Redis,...)并更改airflow.cfg
以将执行程序参数指向CeleryExecutor
并提供相关的Celery设置。
有关设置Celery代理的更多信息,请参阅有关该主题的详尽Celery文档.
以下是您的workers的一些必要要求:
- 需要安装
airflow
,CLI需要在路径中 - 整个群集中的airflow配置设置应该是同构的
- 在worker上执行的操作符需要在该上下文中满足其依赖项。 例如,如果您使用
HiveOperator
,则需要在该box上安装hive CLI,或者如果您使用MySqlOperator
,则必须以某种方式在PYTHONPATH
中提供所需的Python库 - worker需要访问其
DAGS_FOLDER
,您需要通过自己的方式同步文件系统。 常见的设置是将DAGS_FOLDER存储在Git存储库中,并使用Chef,Puppet,Ansible或用于配置环境中的计算机的任何内容在计算机之间进行同步。 如果您的所有boxes都有一个共同的挂载点,那么共享您的工作流文件也应该可以正常工作
要启动worker,您需要设置Airflow并启动worker子命令:
airflow worker
worker一旦被删除, 就应该立即收起任务。(我真心翻译不来了)
请注意,您还可以运行“Celery Flower”,这是一个建立在Celery之上的Web UI,用于监控您的worker。 您可以使用快捷命令airflow flower
启动Flower Web服务器。
5. 用Dask扩大规模
DaskExecutor
允许您在Dask分布式群集中运行Airflow任务。
Dask集群可以在单个机器上运行,也可以在远程网络上运行。 有关完整详细信息,请参阅分布式文档.
要创建集群,首先启动调度程序:
# default settings for a local cluster
DASK_HOST=127.0.0.1
DASK_PORT=8786
dask-scheduler --host $DASK_HOST --port $DASK_PORT
接下来,在任何可以连接到主机的计算机上启动至少一个Worker:
dask-worker $DASK_HOST:$DASK_PORT
Edit your airflow.cfg to set your executor to DaskExecutor and provide the Dask Scheduler address in the [dask] section.
编辑airflow.cfg
以将执行程序设置为DaskExecutor
,并在[dask]
部分中提供Dask Scheduler地址。
请注意:
- 每个Dask worker 必须能够导入Airflow和需要的任何依赖项。
- Dask不支持队列。 如果使用队列创建了Airflow任务,则会引发警告,但该任务将提交给集群。
6. 日志
用户可以在airflow.cfg
中指定日志文件夹。 默认情况下,它位于AIRFLOW_HOME
目录中。
此外,用户可以提供远程位置,以便在云存储中存储日志和日志备份。 目前,支持Amazon S3和Google Cloud Storage。 要启用此功能,必须按照此示例配置airflow.cfg
:
[core]
# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply a remote location URL (starting with either 's3://...' or
# 'gs://...') and an Airflow connection id that provides access to the storage
# location.
remote_base_log_folder = s3://my-bucket/path/to/logs
remote_log_conn_id = MyS3Conn
# Use server-side encryption for logs stored in S3
encrypt_s3_logs = False
远程日志记录使用现有的Airflow连接来读取/写入日志。 如果没有正确设置连接,则会失败。 在上面的例子中,Airflow将尝试使用S3Hook('MyS3Conn')。
在Airflow Web UI中,本地日志优先于远程日志。 如果找不到或访问本地日志,将显示远程日志。 请注意,只有在任务完成(包括失败)后才会将日志发送到远程存储。 换句话说,运行任务的远程日志不可用。 日志作为{dag_id} / {task_id} / {execution_date} / {try_number} .log
存储在日志文件夹中。
7. 扩展Mesos(社区贡献)
MesosExecutor
允许您在Mesos群集上安排Airflow任务。 为此,您需要一个正在运行的mesos集群,并且必须执行以下步骤 :
(1). 在一个运行Web服务器和调度程序的计算机上安装Airflow,我们将其称为“Airflow server”。
(2). 在Airflow服务器上,从mesos下载安装mesos python eggs。
(3). 在Airflow服务器上,使用可以从mesos slave机器访问的数据库(例如mysql)并在airflow.cfg
中添加配置。
(4). 将airflow.cfg
更改为指向MesosExecutor的point executor参数,并提供相关的Mesos设置。
(5). 在所有mesos slaves上,安装airflow。 从Airflow服务器复制airflow.cfg
(以便它使用相同的sql连接)。
(6). 在所有mesos slave服务器上,运行以下服务日志:
airflow serve_logs
(7). 在Airflow服务器上,要开始在mesos上处理/调度DAG,请运行:
airflow scheduler -p
注意:我们需要-p参数来挑选DAGs。
您现在可以在mesos UI中查看Airflow框架和相应的任务。 气流任务的日志可以像往常一样在Airflow UI中查看。
有关mesos的更多信息,请参阅mesos文档。 有关MesosExecutor的任何疑问/错误,请联系@ kapil-malik。
8. 与systemd集成
Airflow可以与基于系统的系统集成。 这使得观察您的守护进程变得容易,因为systemd可以在失败时重新启动守护进程。 在scripts / systemd
目录中,您可以找到已在基于Redhat的系统上测试过的单元文件。 您可以将它们复制到/ usr / lib / systemd / system
。 假设Airflow将在airflow:airflow
下运行。 如果不是(或者如果您在非基于Redhat的系统上运行),则可能需要调整单元文件。
从/ etc / sysconfig / airflow
中获取环境配置。 提供了一个示例文件。 运行调度程序时,请确保在此文件中指定SCHEDULER_RUNS
变量。 您也可以在此处定义,例如AIRFLOW_HOME
或AIRFLOW_CONFIG
。
9. 与upstart集成
Airflow可以与基于upstart的系统集成。 Upstart会在系统启动时自动启动/ etc / init
中具有相应* .conf
文件的所有Airflow服务。 失败时,upstart会自动重启进程(直到达到* .conf
文件中设置的重新生成限制)。
您可以在scripts / upstart
目录中找到示例upstart作业文件。 这些文件已在Ubuntu 14.04 LTS上测试过。 您可能需要调整start on
和stop on
以使其适用于其他upstart系统。 script / upstart / README
中列出了一些可能的选项。
根据需要修改* .conf
文件并复制到/ etc / init
目录。 假设airflow将在airflow:airflow
下运行。 如果您使用其他用户/组,请在* .conf
文件中更改setuid
和setgid
您可以使用initctl
手动启动,停止,查看已与upstart集成的airflow进程的状态
initctl airflow-webserver status
10. 测试模式
Airflow具有一组固定的“测试模式”配置选项。 您可以随时通过调用airflow.configuration.load_test_config()
来加载它们(注意此操作不可逆!)。 但是,在您有机会调用load_test_config()之前,会加载一些选项(如DAG_FOLDER)。 为了快速加载测试配置,请在airflow.cfg中设置test_mode:
[tests]
unit_test_mode = True
由于Airflow的自动环境变量扩展(请参阅设置配置选项),您还可以设置env var AIRFLOW__CORE__UNIT_TEST_MODE
以临时覆盖airflow.cfg
。
网友评论