1. 什么是aiflow
airflow是一个任务调度系统, 采用python语言进行开发,是一个以编程方式编写,安排和监视工作流的平台,airflow可以按照提前编写好的程序,依次调度一组任务进行执行,并实时监控任务的运行,与此同时, airflow还提供了丰富的用户界面,可以更人性化的对任务进行调度。
在airflow中, 有两个概念较为重要,DAG 与 task
- DAG是 Directed Acyclic Graphs(有向无环图) 的缩写,是airflow进行一系列任务调度的单元
- task是任务执行的单元,airflow通过DAG中一组task进行调度,进行任务的有序处理
airflow官方文档: https://airflow.apache.org/docs/apache-airflow/stable/index.html
airflow helm github地址: https://github.com/bitnami/charts/tree/master/bitnami/airflow
airflow helm文档: https://artifacthub.io/packages/helm/bitnami/airflow
2. airflow的下载
若在实体机上进行airflow的安装, 可使用pip进行安装,直接pip install airflow即可
若在k8s集群上部署airflow, 可按照上方 helm的方式进行airflow的安装
3. airflow的安装
airflow在实体机上的安装可根据官方文档进行, 下面着重介绍通过helm的方式在k8s上安装
下载airflow的配置包
helm search repo airflow
helm
4. airflow的功能
5. airflow的调优维护
- airflow使用kubernetesoperator时, 运行成功的pod数量越来越多
此时需要使用到KubernetesOperator 的一个参数 is_delete_operator_pod=True, 此后运行完成后 pod就会被删除
6. 问题处理
- airflow任务未运行完成就被杀掉
## 问题报错信息
The scheduler does not appear to be running. Last heartbeat was received 5 minutes ago.
The DAGs list may not update, and new tasks will not be scheduled.
及
{local_task_job.py:170} WARNING - State of this instance has been externally set to failed. Terminating instance.
## 解决思路
首先经过搜索发现以下两篇文章较为接近
https://stackoverflow.com/questions/65380492/why-are-my-airflow-tasks-being-externally-set-to-failed
https://stackoverflow.com/questions/60534328/airflow-state-of-this-instance-has-been-externally-set-to-shutdown-taking-the-p
第一篇文章显示是因为 scheduler_health_check_threshold 及 scheduler_heartbeat_sec 不匹配造成的, 经过查询airflow.cfg发现是匹配的, 但是给了一个思路, 经过排查, 发现airflow worker及scheduler没有在一台机器上, 且机器的时间相差超过五分钟, 将两个服务部署到同一台机器上后, 第一个报错消失, 第二个问题经过测试也完成,问题解决
- 相似的工具还有什么?
网友评论