什么是动态资源分配
spark 1.2开始,引入了一种根据作业负载动态分配集群资源给你的多个作业的功能。这意味着你的作业在申请到了资源之后,可以在使用完之后将资源还给cluster manager,而且可以在之后有需要的时候再次申请这些资源。这个功能对于多个作业在集群中共享资源是非常有用的。如果部分资源被分配给了一个作业,然后出现了空闲,那么可以还给cluster manager的资源池中,并且被其他作业使用。在spark中,动态资源分配在executor粒度上被实现,可以通过spark.dynamicAllocation.enabled来启用。
资源分配策略
以一个较高的角度来说,当executor不再被使用的时候,spark就应该释放这些executor,并且在需要的时候再次获取这些executor。因为没有一个绝对的方法去预测一个未来可能会运行一个task的executor应该被移除掉,或者一个新的executor应该别加入,我们需要一系列的探索式算法来决定什么应该移除和申请executor。
申请策略
一个启用了动态资源分配的spark作业会在它有pending住的task等待被调度时,申请额外的executor。这个条件必要地暗示了,已经存在的executor是不足以同时运行所有的task的,这些task已经提交了,但是没有完成。
driver会轮询式地申请executor。当在一定时间内(spark.dynamicAllocation.schedulerBacklogTimeout)有pending的task时,就会触发真正的executor申请,然后每隔一定时间后(spark.dynamicAllocation.sustainedSchedulerBacklogTimeout),如果又有pending的task了,则再次触发申请操作。此外,每一轮申请到的executor数量都会比上一轮要增加。举例来说,一个作业需要增加一个executor在第一轮申请时,那么在后续的一轮中会申请2个、4个、8个executor。
每轮增加executor数量的原因主要有两方面。
第一,一个作业应该在开始谨慎地申请以防它只需要一点点executor就足够了。
第二,作业应该会随着时间的推移逐渐增加它的资源使用量,以防突然大量executor被增加进来。
移除策略
移除一个executor的策略比较简单。一个spark作业会在它的executor出现了空闲超过一定时间后(spark.dynamicAllocation.executorIdleTimeout),被移除掉。
要注意,在大多数环境下,这个条件都是跟申请条件互斥的,因为如果有task被pending住的话,executor是不该是空闲的。
executor如何优雅地被释放掉
在使用动态分配之前,executor无论是发生了故障失败,还是关联的application退出了,都还是存在的。在所有场景中,executor关联的所有状态都不再被需要,并且可以被安全地抛弃。
使用动态分配之后,executor移除之后,作业还是存在的。如果作业尝试获取executor写的中间状态数据,就需要去重新计算哪些数据。
因此,spark需要一种机制来优雅地卸载executor,在移除它之前要保护它的状态。
解决方案就是使用一个外部的shuffle服务来保存每个executor的中间写状态,这也是spark 1.2引入的特性。这个服务是一个长时间运行的进程,集群的每个节点上都会运行一个,为你的spark作业和executor服务。如果服务被启用了,那么spark executor会在shuffle write和read时,将数据写入该服务,并从该服务获取数据。这意味着所有executor写的shuffle数据都可以在executor声明周期之外继续使用。
除了写shuffle文件,executor也会在内存或磁盘中持久化数据。当一个executor被移除掉时,所有缓存的数据都会消失。目前还没有有效的方案。在未来的版本中,缓存的数据可能会通过堆外存储来进行保存,就像external shuffle service保存shuffle write文件一样。
网友评论