在利用Spark开发各类计算任务时,Executor内存的配置永远是重中之重,因此了解Spark的内存管理机制是非常有益的。
在1.6版本之前,Spark只有一种内存管理机制,即静态内存管理(StaticMemoryManager),1.6版本以后又引入了新的统一内存管理(UnifiedMemoryManager)。下面分别来看一下这两种机制的细节。
静态内存管理
任何一个Spark Executor本质上都是一个JVM进程,因此我们使用spark.executor.memory参数指定的内存就是JVM堆的大小,叫做堆内(on-heap)内存。至于堆外(off-heap)内存不是这篇文章要讨论的。
下图示出在静态内存管理机制下的堆内内存分布。图中有一处遗漏,做了订正。
图片来自https://0x0fff.com/spark-architecture/
- JVM堆内存(Heap):由spark.executor.memory指定。如果不指定的话,默认会分配512MB,显然是很小的,所以推荐总是手动设定它。
- 安全(Safe)内存:为了最大限度地减少OOM,以及为用户代码的执行预留一定的内存,Spark设定了两个安全内存的阈值。由spark.storage.safetyFraction指定存储安全内存相对于JVM堆内存的比例,默认0.9,就是占堆内存的90%。Shuffle安全内存就是图中添加的那一部分,由spark.shuffle.safetyFraction指定比例,默认0.8。一般不要改。
- Shuffle内存:顾名思义,是专门供shuffle阶段使用的内存。不管HashShuffleManager还是SortShuffleManager都会产生中间数据,典型的就是排序数据。它由spark.shuffle.memoryFraction来指定相对于shuffle安全内存的比例,默认值0.2。如果shuffle阶段逻辑比较复杂的话,应该增大这个值。
- 存储(Storage)内存:专门用来存储数据的内存区域。在运算过程中读入的数据、广播变量等都会放在这里,如果对RDD显式调用cache()/persist()方法,也是缓存在这里。它由spark.storage.memoryFraction来指定相对于存储安全内存的比例,默认值0.6。如果数据量很大,或者经常需要缓存RDD,应该增大这个值。
- Unroll内存:所谓unroll就是我们常说的反序列化(deserialize)。Spark缓存的数据可能会有序列化的格式,必须反序列化后才能参与计算。它与storage内存共享区域,由spark.storage.unrollFraction来指定比例,默认值0.2。限定unroll内存的比例是为了防止反序列化对象过大,挤占过多storage内存,导致缓存的其他对象全部失效。
StaticMemoryManager类的源码非常简单,一目了然。下面的源码来自1.6版本。
/**
* A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
*
* The sizes of the execution and storage regions are determined through
* `spark.shuffle.memoryFraction` and `spark.storage.memoryFraction` respectively. The two
* regions are cleanly separated such that neither usage can borrow memory from the other.
*/
private[spark] class StaticMemoryManager(
conf: SparkConf,
maxOnHeapExecutionMemory: Long,
override val maxStorageMemory: Long,
numCores: Int)
extends MemoryManager(
conf,
numCores,
maxStorageMemory,
maxOnHeapExecutionMemory) {
def this(conf: SparkConf, numCores: Int) {
this(
conf,
StaticMemoryManager.getMaxExecutionMemory(conf),
StaticMemoryManager.getMaxStorageMemory(conf),
numCores)
}
// Max number of bytes worth of blocks to evict when unrolling
private val maxUnrollMemory: Long = {
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
if (numBytes > maxStorageMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxStorageMemory bytes)")
false
} else {
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
}
}
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
val freeMemory = storageMemoryPool.memoryFree
// When unrolling, we will use all of the existing free memory, and, if necessary,
// some extra space freed from evicting cached blocks. We must place a cap on the
// amount of memory to be evicted by unrolling, however, otherwise unrolling one
// big block can blow away the entire cache.
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
// Keep it within the range 0 <= X <= maxNumBytesToFree
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
}
private[memory]
override def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
memoryMode match {
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
}
}
}
private[spark] object StaticMemoryManager {
/**
* Return the total amount of memory available for the storage region, in bytes.
*/
private def getMaxStorageMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}
/**
* Return the total amount of memory available for the execution region, in bytes.
*/
private def getMaxExecutionMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}
}
统一内存管理
上面的之所以叫静态内存管理,就是因为注释中写的“The two regions are cleanly separated such that neither usage can borrow memory from the other.” 存储与执行两块内存的区域是完全分开的,不能互相借用。新提出的统一内存管理打破了这个界限,存储内存与执行内存统一作为Spark内存管理,它们之间可以互相借用了,当然配置的方法也简化了。
下图示出在统一内存管理机制下的堆内内存分布。
图来自https://0x0fff.com/spark-memory-management/
- 预留(Reserved)内存:这部分内存的大小是硬编码的,固定为300MB。它只用来存储一些固定的Spark内部逻辑和对象,与数据、运算和用户代码都无关。下面所说的“内存”,实际上都是JVM堆内存减去这300MB预留内存剩下的量。另外,堆内存的总量不能少于预留内存的1.5倍,也就是450MB。
-
Spark内存:就是真正用来执行Spark作业的内存,其比例由spark.memory.fraction指定,默认值0.75(但在最新的Spark 2.4版本中已经改成了0.6)。它内部又分为两块,一是存储(Storage)内存,二是执行(Execution)内存,用途与静态内存管理中的存储内存和shuffle内存类似。
其中,存储内存的相对比例由参数spark.memory.storageFraction指定,默认值0.5,也就是说Spark内存中默认有一半是存储内存,另一半是执行内存。但是,这个比例只是个参考值,因为这两块内存在紧张时都可以相互借用,水位线是变化的。 - 用户(User)内存:就是有效内存中除了Spark内存之外的那一块,默认比例是1 - spark.memory.fraction = 0.25。这块内存专门用来存放运算过程中用户的代码逻辑,以及自定义的数据结构,如RDD转换过程中的自定义类型。当Spark作业OOM时,也有可能是因为用户内存不够用导致,这时就应该适当减少spark.memory.fraction的比例。
如果在1.6版本之后的Spark程序中设定静态内存管理的参数,那么执行时会输出一条警告日志。
WARN spark.SparkConf: Detected deprecated memory fraction settings: [spark.storage.memoryFraction]. As of Spark 1.6, execution and storage memory management are unified. All memory fractions used in the old model are now deprecated and no longer read. If you wish to use the old memory management, you may explicitly enable
spark.memory.useLegacyMode
(not recommended).
意思就是旧版的参数已经过时,不会再被读取并设置。如果要强制使用旧版参数的话,必须先显式设定spark.memory.useLegacyMode
,当然在多数情况下都不是很推荐。如果有特殊的作业对存储内存和执行内存的比例有严格限定,才酌情考虑改回静态内存管理。
最后贴出UnifiedMemoryManager的源码,同样来自1.6版本。限于篇幅原因,与它相关的StorageMemoryPool和ExecutionMemoryPool类的代码就没有贴出来。
核心逻辑在acquireExecutionMemory()和acquireStorageMemory()这两个方法中。通过读代码,我们还可以得到更多的信息。
- 预留内存实际上可以通过spark.testing.reservedMemory更改,但这是个测试参数,最好不要改。
- 存储内存和执行内存在管理时实际上都是MemoryPool类的实例,通过MemoryPool实现内存借用。存储内存都在堆内,而执行内存还有堆外的一部分。
- 当一方的内存不够用而另一方的内存还有剩余时,就会计算需要的内存量,然后从另一方的内存池中借用。
- 当执行内存不够用,而回收那些早先被存储内存借用的空间时,存储内存中缓存的一部分数据会被清除,直到执行内存的量足够进行运算。
- 但是反过来,当存储内存不够用,而回收那些早先被执行内存借用的空间时,是不会清除数据的。因为执行内存中存储的都是中间运算结果,一旦清除了会造成程序运行异常。因此要特别警惕执行内存水位过高的情况。
/**
* A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
* either side can borrow memory from the other.
*
* The region shared between execution and storage is a fraction of (the total heap space - 300MB)
* configurable through `spark.memory.fraction` (default 0.75). The position of the boundary
* within this space is further determined by `spark.memory.storageFraction` (default 0.5).
* This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default.
*
* Storage can borrow as much execution memory as is free until execution reclaims its space.
* When this happens, cached blocks will be evicted from memory until sufficient borrowed
* memory is released to satisfy the execution memory request.
*
* Similarly, execution can borrow as much storage memory as is free. However, execution
* memory is *never* evicted by storage due to the complexities involved in implementing this.
* The implication is that attempts to cache blocks may fail if execution has already eaten
* up most of the storage space, in which case the new blocks will be evicted immediately
* according to their respective storage levels.
*
* @param storageRegionSize Size of the storage region, in bytes.
* This region is not statically reserved; execution can borrow from
* it if necessary. Cached blocks can be evicted only if actual
* storage memory usage exceeds this region.
*/
private[spark] class UnifiedMemoryManager private[memory] (
conf: SparkConf,
val maxMemory: Long,
storageRegionSize: Long,
numCores: Int)
extends MemoryManager(
conf,
numCores,
storageRegionSize,
maxMemory - storageRegionSize) {
assertInvariant()
// We always maintain this invariant:
private def assertInvariant(): Unit = {
assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
}
override def maxStorageMemory: Long = synchronized {
maxMemory - onHeapExecutionMemoryPool.memoryUsed
}
/**
* Try to acquire up to `numBytes` of execution memory for the current task and return the
* number of bytes obtained, or 0 if none can be allocated.
*
* This call may block until there is enough free memory in some situations, to make sure each
* task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
* active tasks) before it is forced to spill. This can happen if the number of tasks increase
* but an older task had a lot of memory already.
*/
override private[memory] def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
assertInvariant()
assert(numBytes >= 0)
memoryMode match {
case MemoryMode.ON_HEAP =>
/**
* Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool.
*
* When acquiring memory for a task, the execution pool may need to make multiple
* attempts. Each attempt must be able to evict storage in case another task jumps in
* and caches a large block between the attempts. This is called once per attempt.
*/
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
// There is not enough free memory in the execution pool, so try to reclaim memory from
// storage. We can reclaim any free memory from the storage pool. If the storage pool
// has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
// the memory that storage has borrowed from execution.
val memoryReclaimableFromStorage =
math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
// Only reclaim as much space as is necessary and available:
val spaceToReclaim = storageMemoryPool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
storageMemoryPool.decrementPoolSize(spaceToReclaim)
onHeapExecutionMemoryPool.incrementPoolSize(spaceToReclaim)
}
}
}
/**
* The size the execution pool would have after evicting storage memory.
*
* The execution memory pool divides this quantity among the active tasks evenly to cap
* the execution memory allocation for each task. It is important to keep this greater
* than the execution pool size, which doesn't take into account potential memory that
* could be freed by evicting storage. Otherwise we may hit SPARK-12155.
*
* Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
* in execution memory allocation across tasks, Otherwise, a task may occupy more than
* its fair share of execution memory, mistakenly thinking that other tasks can acquire
* the portion of storage memory that cannot be evicted.
*/
def computeMaxExecutionPoolSize(): Long = {
maxMemory - math.min(storageMemoryUsed, storageRegionSize)
}
onHeapExecutionMemoryPool.acquireMemory(
numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
case MemoryMode.OFF_HEAP =>
// For now, we only support on-heap caching of data, so we do not need to interact with
// the storage pool when allocating off-heap memory. This will change in the future, though.
offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
}
}
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
assertInvariant()
assert(numBytes >= 0)
if (numBytes > maxStorageMemory) {
// Fail fast if the block simply won't fit
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxStorageMemory bytes)")
return false
}
if (numBytes > storageMemoryPool.memoryFree) {
// There is not enough free memory in the storage pool, so try to borrow free memory from
// the execution pool.
val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
}
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
}
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
acquireStorageMemory(blockId, numBytes, evictedBlocks)
}
}
object UnifiedMemoryManager {
// Set aside a fixed amount of memory for non-storage, non-execution purposes.
// This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve
// sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then
// the memory used for execution and storage will be (1024 - 300) * 0.75 = 543MB by default.
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
val maxMemory = getMaxMemory(conf)
new UnifiedMemoryManager(
conf,
maxMemory = maxMemory,
storageRegionSize =
(maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
numCores = numCores)
}
/**
* Return the total amount of memory shared between execution and storage, in bytes.
*/
private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = reservedMemory * 1.5
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please use a larger heap size.")
}
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
(usableMemory * memoryFraction).toLong
}
}
网友评论