Spinner是Pinterest基于Airflow构建的工作流引擎,这篇文章介绍了Spinner的构建背景和设计原则。原文:Spinner: Pinterest’s Workflow Platform[1]

自成立以来,Pinterest始终将数据定义为自己的哲学。作为一家数据驱动公司,所有数据都被存储起来以供将来使用。因此每天大概会有600TB的新数据,总数据量已经超过了500PB。在这种规模下,大数据工具在帮助公司收集有意义的见解方面发挥着关键作用,这就是工作流团队的用武之地。我们支撑了超过4000个工作流,平均每天执行10,000个流程和38,000个作业。
背景
早在2013年,Pinterest就创建了一个名为Pinball[2]的内部调度器框架。当时这是一个适合公司需求的解决方案,但随着需求的增加,它无法进一步扩展,以便于服务内外部其他产品和服务。下列限制日益显著:
-
性能:
— 调度/作业启动时延(从计划开始到实际开始的时间)高于预期值。 -
可扩展性:
— 系统的组件是有状态的,增加了水平扩展的复杂性。
— 中心化的元数据存储服务成为单点故障源。 -
可维护性:
— 基础设施升级需要关闭所有工作进程,需要设置备用主机承载任务以及其他操作。
— 支持负载分区的多个(10)集群会增加升级和监控的额外开销。
— 工作负载的增长增加了有状态主节点的负担,导致延迟或瓶颈问题。 -
隔离性:
— 因为Pinterest的Python代码都在一个mono-repo里,因此无法隔离用户代码。
— 导入有问题的代码会因为复杂的依赖问题破坏很多工作流。 -
功能:
— 缺少ACL和审计日志。
— 缺少执行统计的可视化。
— 还缺少其他主要功能,比如服务器认证、快速导航等等。 -
文档:
— 由于是内部项目,缺少文档和示例,只能查找类似工作流的代码。
— 有一个分享解决方案的社区,持续改进产品,为用户提供更多支持。 -
测试:
— 用户很难部署自己的开发集群进行端到端测试。
由于上述痛点,我们很明显需要进行根本性的改变。这给我们创造了一个机会,可以在开始另一个内部项目之前,探索是否有解决方案可以解决当前系统的问题。
为什么是Airflow?
2019年,我们对Spotify的Luigi[3]、LinkedIn的Azkaban[4]以及其他选项做了一些初步分析,最终我们选择了Apache的Airflow[5],原因如下:
- 目标对齐: 用户需要的功能要么已经内置在Airflow, 要么可以添加到插件中。
- 生产级: 被许多组织广泛采用,有积极的社区支持,有大量的讨论,并有很好的文档供用户查看。
- DSL: 基于python,与我们之前的系统一致,降低用户采用的成本。
- 代码: Airflow是模块化的,可以方便的使用独立组件连接定制系统。
- 可伸缩性: 采用无状态组件,可以重启恢复,UI从中心数据库中获取数据,并允许通过插件操作kubernetes基础设施和可分区调度器。
- 声誉: 整个社区似乎对Airflow提供的能力非常满意。
基准测试
为了判断Airflow是否能够满足需求,我们基于不同的参数做了一系列性能、负载和压力测试,这些参数包括:DAG的数量,每个DAG中任务的数量,并发执行的任务以及任务实例,不同的并发设置,数据库记录的数量。这些测试的目标是测量模拟某些生产负载的性能,确定需要多大的规模,估计需要的集群范围,并确定我们采用此框架需要解决的瓶颈和限制。
在执行测试场景之前,需要对源代码进行修改,以建立有效的概念证明(POC)集群。首先,我们修改了Airflow的版本,我们基于v1.10-stable拉了一个分支,并从主分支中cherry pick了一些修改,以支持SerializedDAG[6]模型。其次,由于Pinterest的kubernetes (k8s)环境的设置不同,我们创建了一个修改后的k8s执行器,以便将任务提交到内部k8s集群。最后,我们还构建了一个修改后的调度器来进行分区,并且添加了额外的opentsdb[7]统计状态数据收集来记录准确的测量值。下面是我们的一个性能测试示例。

在进行测试后,我们得到了以下信息:
- UI节点性能良好,延迟与不同视图上的db记录数量有关
- Mysql不是我们之前假设的瓶颈,它是一个独立的中心化节点,通过调度器、Web服务器和工作节点连接
- 调度器在维护超过1000个DAG后,性能会下降
- 性能测试期间检测到的瓶颈是在单个调度器处于极端负载下时kubernetes的提交和结果解析
- 定制的kubernetes执行器在系统处于高负载时构建了一个任务队列,从而导致了延迟
- 为了达到调度延迟时间的目标,必须将DAG的数量保持在1000以下,以确保单个调度程序达到可以接受的性能。因此,需要部署多个调度器。
性能测试的结果让我们非常清楚的了解到系统在不同设置下是如何执行的,以及需要在哪里努力消除瓶颈。
与之前的Pinball系统进行比较,我们发现:
- 在类似的负载下,Airflow POC集群相对Pinball集群的调度延迟更小(50秒 vs 180秒)
- 如果将总负载合并到一个集群中,该系统可以容纳3000个DAG,每个DAG有25个任务,每个任务的延迟约为10分钟,比Pinball要好5倍
- 95%用户的UI页面加载时间<1s,相当于250个并发页面访问负载,并且当分页时,性能受延迟数量的影响最小,而Pinball集群的延迟随着DAG的增加而增加
在性能测试的基础上,如果我们可以优化kubernetes执行器的性能和稳定性,提供生产级的可伸缩性,同时扩展多调度器方案以解决分片/分区负载性能问题,我们觉得完全可以用一个集群承载所有负载。
站在用户的角度来看,我们可以自定义设置,并利用开箱即用的Airflow能力来满足功能要求,大多数负载运行在单一集群中(出于安全考虑,我们必须为pii和sox工作流提供单独的集群),以便工作流数据可以在同一个集群中搜索。
Pinterest工作流系统

上图展示了端到端工作流系统,每个组件将在相关部分中进行详细解释。示意图中所示的外部客户端(如EasyFlow、Galaxy、FlowHub和Monarch)与Spinner进行交互,本文不做详细介绍。
序列化DAG(Serialized DAG)
序列化的DAG模型很重要,有两个主要原因:
- 性能:一个集群中有数千个DAG,在数据库中缓存DAG模型比每次调用都处理DAG文件可以获得更好的性能。
- 迁移:需要将数千个工作流从遗留的Pinball系统迁移到Airflow系统。对于不同的DSL,需要将工作流模型存储到数据库中,以支持所有的UI特性,如代码视图、呈现视图等,否则这些视图就需要依赖我们没有的DAG文件。迁移后的工作流将在后面进行更详细的讨论。
Webs服务器
无状态Web服务器是用户查看工作流状态和历史记录的入口。我们在集群上启用了DAG级别的访问控制(DLAC),并强制每个DAG必须通过至少一个角色授权(如下所示),这样只有对工作流有权限的用户才能访问。spinnre-users是默认角色,所有工作流都设置有这一角色。这个默认角色拥有对每个DAG的读访问权限,这为平台维护人员提供了读取日志、查看历史记录等能力。此外,平台团队提供给用户创建额外角色的能力,从而可以将特定工作流分配给他们,并阻止其他用户查看或操作这些工作流。创建角色的过程在当前需要手动操作,我们希望将创建并向用户分配角色的过程自动化。
dag = DAG(
dag_id='example_dag',
access_control={SPINNER_USERS: {'can_dag_read','can_dag_edit'}},
...
)
当调度器解析DAG,例如覆盖DagModel时,权限将从DAG文件同步到DB。用户还可以通过刷新UI中的DAG来强制更新。当然,如果Web服务进程重启,会重新构建webapp,并在启动时同步所有的工作流权限。我们在所有这些时间点都添加了同步,而不是只在DagModel更新时同步。
此外,通过开箱即用的事件日志,我们将用户对工作流的操作添加到审计日志中,从而对工作流发生的任何事情都拥有完整的跟踪历史。
最后,为了可伸缩性,我们设置了多个Web服务节点托管UI服务,每个Web服务节点有4个线程来处理请求。我们通过滚动部署方式部署系统,从而避免用户停机。
多分区调度器
我们的目标是让Spinner集群有一致的接口来查看所有的DAG,以减少维护、额外的开销以及在旧系统中遇到的多集群的混乱问题。用单一调度器管理所有DAG并不可行,即使垂直扩展更多资源,也会遇到瓶颈,因此增加DAG解析的并行性并不是一个长期解决方案。我们需要建立多调度器方案,每个调度器查看不同的DAG分区。我们考虑过AIP-15[8],但其执行的时间框架与我们的需求不符,所以我们构建了一个内部解决方案,以支持对工作流分级的需求。

由于当前的调度器是有状态的,并且不允许同时运行更多实例,因此为了实现这一方案,我们修改了调度器。与此同时,还需要防止不同的调度器干扰到同一个DAG/任务。我们实现了基于规则的分区器,从DAG位置检索出层和分区号,将给定的DAG标记为特定分区,并将每个分区分配给一个调度器来管理。通过这种方式,每个调度器只解析、分发和维护它所负责的DAG的状态,调度器节点不会发生冲突。通过多调度器方案,我们满足了平台的需求,我们将更高优先级的工作流放在具有更严格负载限制的专用调度器分区中,从而为高优先级工作流提供更高的SLA。
每个调度器配置指向特定分区的DAG文件夹路径,但UI加载了所有DAG路径,因此仍将呈现所有DAG,从而为用户提供一致的入口体验。UI将显示任务实例的分区标记,并确保无论在任务上执行什么操作,都有正确的调度器负责执行。
Spinner工作流和Pinterest分层
在深入研究存储库结构之前,我们需要讨论一下Pinterest特别的分层结构。工作流和系统基于重要性进行标记,Tier1最高,Tier3最低。在我们的数据组织中,给予tier1工作流更高的优先级和更多的资源,这将体现为存储库结构中的DAG路径。我们修改了DAG的定义,要求传入层字段,如下所示:
dag = DAG(
dag_id='example_dag',
tier=Tier.TIER1,
...
)
我们为用户创建了一个新的单一存储库来添加工作流,从而得以在如下方面获得控制权:
- 我们需要在调度器、Web服务器和kubernetes pods之间同步工作流,所以有单独的地方来同步可以让这个过程更简单、更可控
- 我们需要建立基于层的路径,以便tier1工作流的用户可以将工作流放在tier1的基本路径中,tier2的放在tier2的路径,以此类推
- 作为平台的维护者,我们需要为代码审查设置规则
- 我们要求所有代码在提交时通过一个快速的单元测试套件,以确保代码安全、代码质量和代码控制
“dags”文件夹包含用户创建的DAG,下面的子文件夹由工作流团队按照命名约定维护:
{cluster_type}_tier_{tier_index}_{partition_index}
下面是这个结构的一个例子。
- 集群类型将是spinner、pii、test或sox。
- 分层(Tier)索引如上所述:1、2或3。
- 分区(Partition)索引从0开始,并根据需要增加。这个值表示当某个cluster_type的某个层中的DAG数量超过定义的阈值时,将创建一个新分区,并需要将新DAG签入到新的分区路径中。
dags/
spinner_tier_1_0/
team1/
my_dag.py
spinner_tier_2_0/
spinner_tier_3_0/
pii_tier_1_0/
这一设置有助于增强我们的多分区调度器组件,这些DAG文件中的公共模块将被放在plugins目录中,在所有集群/层/分区之间共享。
持续集成和持续部署
任何工作流平台的很大一部分都是处理用户和系统级代码的部署,在Pinterest,我们使用Teletraan[9]作为部署系统。
Spinner的设置涉及到三个不同的代码库:
- Spinner Repo:这是来自v1.10-stable的Airflow源代码,包含了cherry-pick的子集以及许多内部插件。
- Spinner workflow:上面提到的原生Airflow工作流代码库。
- Pinboard:这是Pinterest现有的单一存储库,承载了所有跨系统和平台的python代码,包含大量自定义函数和实现,以及所有遗留的工作流和作业。

如上图所示,基础架构代码和用户代码有独立的存储库。在构建镜像过程中,有基础设施代码的构建(pinairflow),有Pinterest单一代码库的构建(pinboard)。有一个单独的流程负责为DAG (spinner workflows)拉入代码,并确保同步并执行了最新的代码。
该图详细概述了每个组件及其部署周期,但关于基础设施(UI/Scheduler/K8s worker)的CI/CD,有几点需要注意:
- 基础架构代码变更触发将触发Jenkins作业,以生成新的构建并将新镜像推送到仓库
- 另一个Jenkins作业将通知Teletraan刷新节点上的镜像和容器
- Kubernetes pods用相同的镜像拉起容器并执行任务
- 正在进行本地测试的用户能够加载最新的镜像来测试他们的DAG
关于DAG部署CI/CD,有几点需要注意:
- 用户代码的变更会触发Jenkins任务,将所有主机上的DAG同步到s3
- 运行在Web服务器和调度器上的服务进程,以非常短的时间间隔(30秒)从s3同步文件更改。
- 当Worker pod启动最新的DAG并执行任务时,也会从s3同步
Pinterest Kubernetes执行器
在Pinterest,我们有一个内部团队致力于提供Kubernetes服务。因此,必须重新构建开箱即用的k8s执行器才能与我们的系统一起工作。我们重写了所有与kubernetes交互的API,重写了kubernetes执行器、kubernetes调度器等。我们保留了开源执行器的基本业务逻辑,同时适合于我们的特定环境,我们在后端部署了这一执行器插件。
这个执行器使我们的集群具有完全的运行时隔离,并可以根据负载需求进行伸缩。每个任务在自己的pod中运行,有安全的隔离设置,所有pod也与Pinterest中的其他服务共享k8s节点。pod在任务执行后立即被移除,以释放资源。这是对我们之前的系统的一个巨大改进,在之前我们只能提供独占的主机和固定数量的工作槽,从而很难扩展。

kubernetes执行器会构建一个pinterest watcher拉取pod状态,这是pinterest k8s环境特定的东西。我们用标签隔离不同调度器启动的pod,因此需要重新构建这一状态拉起机制。
然后,对于任何任务实例,dag_id、task_id、execution_date和try_number的组合将创建唯一的pod名。可以对pod进行定制,以设置额外的环境变量、申请更多资源、安装额外的包等,以满足不同的用例场景,如下所示。
customized_task1 = PythonOperator(
task_id='customized_task1',
dag=dag,
python_callable=print_context,
executor_config={
"KubernetesExecutor":
{
'request_memory': '350Mi',
'request_cpu': '200m',
'limit_memory': '1200Mi',
'limit_cpu': '500m',
'requirements': 'example_requirements.txt',
}})
我们还在pod启动逻辑中构建了一个“服务准备就绪检查”脚本,该脚本将在pod真正开始运行任务逻辑之前执行。这个准备就绪检查脚本将确保s3通信已经建立,mysql通信被接受,可以完成knox检索,等等。因为每次都动态拉起pod,需要确保这些需求满足才能正确运行任务。
我们还优化了实时任务的日志提取机制。pod中的每个任务在完成后会将日志推送到s3,但在运行时,日志会同步到Pinterest Elastic Search (ES)。当用户转到UI来提取任务日志时,日志将在写入标准路径时从pod中提取出来,如果有任何问题,那么将退回到从ES读取。这对于pod的readiness日志同样有效。如果没有故障发生,则从UI上显示的任务日志中清除readiness日志。

从上图中,可以看到任务如何从创建到执行再到完成,以及执行器如何处理这些任务。主要组件是K8s执行器、K8s调度器和K8s作业监视器,每个组件都用蓝色突出显示。每个阶段引出下一个阶段。
用户体验
自定义操作器和感应器(Custom Operators and Sensors)
我们的每一个决定都围绕用户做出,目标是用最少的工作量为用户提供最大的利益。我们提供了超过30个定制操作器。如下图所示,我们有很多自定义逻辑,被包装在一个操作器或感应器中。我们在内部有一个叫做作业提交服务(Job Submission Service, JSS)的系统,它将流量路由到一个内部的yarn集群,该集群执行hadoop、spark、sparksql、hive、pyspark等作业。因此,所有的提交操作都需要重写,才能在我们的环境中工作。最重要的是,需要将一些常见操作封装起来,从而提供与传统工作流系统相同的功能,以激励用户采用Airflow。
尽管新旧系统之间的DSL有很大的不同,但其目标是尽可能减轻用户负担,以便他们只需要输入执行所需的参数。
我们必须克服的最大障碍之一是为pinboard代码提供支持,如前所述,这里有很多用户自定义逻辑,他们不希望将代码复制到位于不同存储库中的新工作流中。由于我们的基础设施不强制打包子模块,因此需要打包整个代码库并将其添加到python路径中,这意味着需要将其打包到构建的映像中。出于不同的原因,我们最初想要禁止用户导入旧代码库,但这似乎是用户的痛点,所以我们将其添加到镜像中,以减少用户编写本地工作流的阻碍。

测试

在旧系统中测试工作流不是很友好,这是我们希望通过Spinner解决的问题。我们开发了脚本,帮助用户在开发主机上启动容器,运行Airflow调度器和Web服务进程,并模仿生产环境。在此有一些细微差别,比如使用本地执行器而不是Kubernetes执行器(需要从开发主机获得一些配置),但体验非常相似。它可以还设置本地mysql主机,可以从容器中连接,并且可以从本地工作空间(我们使用Pycharm)中同步文件到远程开发主机,以同步更改并测试最新逻辑。文件可以被dev调度器发现,用户将在dev UI上查看标记。这个脚本由我们在用户编写工作流的同一个存储库中维护,以方便用户熟悉。它极大提高了用户的开发体验和开发速度。
单元测试

Pinterest使用Phabricator[10]和Arcanist[11]进行代码审查和推送。当用户进行代码变更,在变更代码进入生产之前,必须由自己的团队(可能还有平台团队)对其进行评审,然后每一次差异评审都会启动一个jenkins工作,这个jenkins工作会启动我们编写的超过25个自动化单元测试。
这些测试包括验证DAG的某些属性,如访问角色设置、层、有效的开始日期、通知设置等,还包括我们不想设置的某些属性,比如catchup属性等。还会验证某些属性是否在可接受的范围内,例如start_date、executor_config里的内存和cpu配置以及并发设置。我们在系统方面也有限制,希望在两方面都能有所加强。我们还有测试Pinterest特定逻辑的单元测试,例如,为了防止顶层调用Hive Metastore(HMS)的所有DAG文件(这会影响DAG解析时间,对HMS服务造成问题),我们通过对常见函数打补丁的方式捕获试图这么做的用户。
当用户提交更改时,我们并不会做代码审查,因此这些验证测试有助于获得更高程度的信心。虽然不能涵盖所有的问题,但这些测试确实涵盖了大多数问题,并帮助我们了解了常见的问题,以便可以添加更多单元测试,从而成为更好的门禁。此外,用户还可以利用这个测试流水线为自己的逻辑添加单元测试。
监控

我们广泛使用名为Statsboard的内部工具,这是一个度量可视化和告警框架。如上所示,对每个组件都有足够的统计数据,例如:
- 调度器
- Web服务器
- 执行器
- API
- DAG同步
- Mysql
- 功能及其他
Overall选项卡是所有集群中所有组件的聚合统计视图。我们可以查看每个集群、每个组件或更高级别的统计数据。通过这种方式,如果我们看到某些系统指标似乎有关联,就可以控制统计数据的粒度。不同的调度器有加权平均,以表示它们的健康检查的重要性(即更高的层以及负载更高的调度有更多的权重)。每个调度器的健康检查实际上是运行在调度器上的一个工作流,如果成功就会发出统计信息,这一统计以15分钟为间隔发生,如果成功率低于某个阈值,任何缺失的数据点都将向团队发出告警。我们还对数据进行了延时检查,以防出现重复。
收集的许多统计数据都是现成的,但我们在自己的组件中添加了一些额外的统计数据,以便对系统健康状况提供更精确的可见性,我们希望对服务实现预期式管理而不是反应式管理。
最后
在这篇文章中,我们想与同行分享我们的痛点,我们对不同产品的调研,我们如何确定怎样处理需求,怎样测试帮助我们在增加投资之前更有信心,怎样改进帮助我们达成目标。我们还讨论了如何帮助用户并改善他们的体验。
下一篇文章我们将详细讨论如果将遗留工作流迁移到新的Spinner平台上,在这个平台上,我们将承担支撑3000个工作流的迁移,并与各自团队的领导进行协调。我们构建了自动化工具、自定义API和一个转换层来处理这个问题。
希望这篇关于我们如何在Pinterest改进和部署Airflow的文章对你有所帮助。
References:
[1] Spinner: Pinterest’s Workflow Platform: https://medium.com/pinterest-engineering/spinner-pinterests-workflow-platform-c5bbe190ba5
[2] Pinball: Building workflow management: https://medium.com/@Pinterest_Engineering/pinball-building-workflow-management-88a044c9b9e3
[3] Luigi: https://github.com/spotify/luigi
[4] Azkaban: https://github.com/azkaban/azkaban
[5] Airflow: https://github.com/apache/airflow
[6] https://github.com/apache/airflow/blob/main/airflow/serialization/serialized_objects.py#L897
[7] opntsdb: http://opentsdb.net/
[8] AIP-15: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103092651
[9] Teletraan: https://github.com/pinterest/teletraan
[10] Phabricator: https://www.phacility.com/phabricator/
[11] Arcanist: https://secure.phabricator.com/book/phabricator/article/arcanist/
你好,我是俞凡,在Motorola做过研发,现在在Mavenir做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。
微信公众号:DeepNoMind
网友评论