美文网首页Spark流式计算
Spark内存模型初探(1)-Storage/Execution

Spark内存模型初探(1)-Storage/Execution

作者: AlstonWilliams | 来源:发表于2019-02-21 10:08 被阅读21次

    过去,我翻译了几篇关于Spark内存模型的文章。翻译完以后,我觉得我对Spark内存模型已经够理解了,可是,纸上得来终觉浅,实际跑Spark任务的时候,还是会遇到OOM,而我并不知道是哪部分发生了OOM,也就不知道该如何分配Storage Memory/Execution Memory等,才会保证资源不会被浪费,也不会太小导致资源不足。

    正是出于这一点,我开始深入理解Spark的内存模型。我想知道Storage Memory,Execution Memory里面到底放的是什么,User Memory里面到底放的是什么。进一步明确我该如何对这两者进行分配。

    本文中,主要会介绍Storage Memory以及Execution Memory中到底存放了什么,这是初步得出的结果,并不完善,文末我也会提出很多问题。但是这些问题,后面我会一点点来探索,一点点解答,并发布出来。

    另外,由于作者在Spark领域也是初学者,所以请读者在阅读本文的时候,保持怀疑的态度,自己验证,探索。我会给出我的测试代码。

    另外,本文介绍的都是针对Spark-core的,并没有涉及Spark-SQL。其实Spark-SQL我们用的也蛮多的,有时间也应该认真探索一下。

    环境

    Spark版本: Spark 2.4.0-SNAPSHOT
    Java版本: Java 8
    操作系统: Ubuntu 17.10

    运行的Spark为standalone模式,本机启动三个Worker。

    由于Spark内部其实在内存分配时,并没有打印出来日志,包括debug日志,所以,我们需要向Spark中手动添加一些debug代码,这些我已经放在了这个测试项目的patch目录,需要读者合并并且重新编译一下。

    项目地址

    TestBigDataProject

    StorageMemory中存放的内容

    StorageMemory分为两部分,一部分是Unroll Memory,另一部分就是剩下的Storage Memory。

    其中,Unroll Memory,会被用于broadcast变量的展开。

    当我们调用RDD.persist()或者RDD.cache()时,这些数据会被存储到Storage Memory中。

    详细的代码请参考这两个类:

    • com.hypers.spark.TestSparkBroadcastMemoryUsage
    • com.hypers.spark.TestSparkCacheMemoryUsage

    Storage Memory的分配很简单,重新编译Spark以后,会在Executor的日志中,看到相应的日志。

    ExecutionMemory中存放的内容

    ExecutionMemory这个调试起来就比较困难了。因为我得到的信息,一直都是它用于存放shuffle时的中间结果,然后我用repartition()函数进行测试,一直没有看到有分配ExecutionMemory。emmmm......

    后来想到,干脆去源码里看看,到底什么时候会分配ExecutionMemory。

    然后,追本溯源,发现只有一个地方会调用它,相关代码(Spillable.maybeSpill(collection: C, currentMemory: Long))如下:

    protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
    
        // >>>>>>>>>>>>>>>>>>>
        log.debug("---------- calling maybeSpill, currentMemory: " + currentMemory)
        log.debug("---------- _elementRead: " + _elementsRead)
        log.debug("---------- numElementsForceSpillThreadshold: " + numElementsForceSpillThreshold)
        log.debug("---------- elementReads: " + elementsRead)
        log.debug("---------- myMemoryThreshold: " + myMemoryThreshold)
        // <<<<<<<<<<<<<<<<<<<
    
        var shouldSpill = false
        if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
          // Claim up to double our current memory from the shuffle memory pool
          val amountToRequest = 2 * currentMemory - myMemoryThreshold
          val granted = acquireMemory(amountToRequest)
          myMemoryThreshold += granted
          // If we were granted too little memory to grow further (either tryToAcquire returned 0,
          // or we already had more memory than myMemoryThreshold), spill the current collection
          shouldSpill = currentMemory >= myMemoryThreshold
        }
        shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
    
        // >>>>>>>>>>>>>>>>>>>
        log.debug("--------- shouldSpill: " + shouldSpill)
        // <<<<<<<<<<<<<<<<<<<
    
        // Actually spill
        if (shouldSpill) {
          _spillCount += 1
          logSpillage(currentMemory)
          spill(collection)
          _elementsRead = 0
          _memoryBytesSpilled += currentMemory
          releaseMemory()
        }
        shouldSpill
    }
    

    其中的acquireMemory()方法就是专门用来申请Execution Memory的。而maybeSpill(collection: C, currentMemory: Long)这个方法,会被ExternalAppendOnlyMap以及ExternalSorter调用。我只看过ExternalAppendOnlyMap相关的代码,所以这里只拿它举例:

      def insertAll(entries: Iterator[Product2[K, V]]): Unit = {
    
        // >>>>>>>>>>>>>>>>>>>
        log.debug("-------------- calling insertAll")
        // <<<<<<<<<<<<<<<<<<<
    
        if (currentMap == null) {
          throw new IllegalStateException(
            "Cannot insert new elements into a map after calling iterator")
        }
        // An update function for the map that we reuse across entries to avoid allocating
        // a new closure each time
        var curEntry: Product2[K, V] = null
        val update: (Boolean, C) => C = (hadVal, oldVal) => {
          if (hadVal) mergeValue(oldVal, curEntry._2) else createCombiner(curEntry._2)
        }
    
        while (entries.hasNext) {
          curEntry = entries.next()
          val estimatedSize = currentMap.estimateSize()
          if (estimatedSize > _peakMemoryUsedBytes) {
            _peakMemoryUsedBytes = estimatedSize
          }
          if (maybeSpill(currentMap, estimatedSize)) {
            currentMap = new SizeTrackingAppendOnlyMap[K, C]
          }
          currentMap.changeValue(curEntry._1, update)
          addElementsRead()
        }
      }
    

    我们可以看到,当插入到ExternalAppendOnlyMap时,会检测是否需要进行spill,如果自从上次spill以后,插入到ExternalAppendOnlyMap的entry的数量是32的倍数,并且当前ExternalAppendOnlyMap占用的内存已经超过了我们设置的阈值,那么就尝试分配ExecutionMemory。如果分配成功,那么OK,只要你没有设置强制spill的阈值,那么就不spill。否则的话,就做一次spill。

    而什么地方会调用ExternalAppendOnlyMap.insertAll(entries: Iterator[Product2[K, V]])呢?

    1. Spark 1.6以后的版本并且spark.memory.useLegacyMode=false。在Spark 1.6以后,默认是false。如果是true的话,会使用AppendOnlyMap
    2. 当我们调用RDD.*ByKey()这些方法的时候。

    关于第二条,在StackOverflow上有个用reduceByKey举的例子,请点击这里查看。

    测试代码请参考:

    • com.hypers.spark.TestSparkAggregateByKeyMemoryUsage
    • com.hypers.spark.TestSparkGroupByKeyMemoryUsage
    • com.hypers.spark.TestSparkReduceByKeyMemoryUsage
    • com.hypers.spark.TestSparkRepartitionMemoryUsage

    需要注意的是,测试ExecutionMemory的时候,为了保证能够进入到申请ExecutionMemory的逻辑,我加了spark.shuffle.spill.initialMemoryThreshold=1这个设置。也就是说,只要ExternalAppendOnlyMap中数据的大小大于1Byte,并且上次spill以后到现在插入了32的倍数条数据,就需要可以申请ExecutionMemory。

    疑问

    在测试的过程中,不断冒出来新的疑问,等待我去解答:

    • UserMemory是怎样被使用的?当我们在RDD.map(function)中的function中初始化新的对象时,是在哪部分内存被初始化的?是不是就是UserMemory?这部分想测试,但是无从下手。因为Spark的内存模型,在Java heap上不一定是连续的。
    • 如果确实是在UserMemory上分配的,那么,我们知道理论上它的大小是Java Heap - Reserved Memory - (Java Heap * spark.memory.fraction)。可是,如果我function里就是要初始化很多个对象,超过了这个大小的话,Spark会怎样做?OOM么?但是此时并没有超过Java Heap的大小啊。而且,我并没有在Spark代码中看到跟UserMemory相关的代码。
    • 测试的时候,偶尔会出现很诡异的现象。即使给Executor设置了-Xmx1024M,但是有时出现OOM的时候,查看GC日志,此时实际的Executor Heap才用了200M左右。会不会当时系统可用内存只有200M,所有导致Executor Heap不能自动扩容?
    • 这是Executor在shuffle时遇到的问题,那么,shuffle时,Reducer端接收到的数据,是在哪个部分分配的?是不是就是UserMemory?
    • 在Reducer端接收数据时,由于单条数据都很大,有50MB,会直接在老年代分配,而由于当时系统可用内存过小,老年代的内存不够,并且无法进行扩容了,所以导致了OOM。有没有可能是这种情况?
    • Spark的内存模型,跟Java内存模型之间,是什么关系?

    相关文章

      网友评论

        本文标题:Spark内存模型初探(1)-Storage/Execution

        本文链接:https://www.haomeiwen.com/subject/ncmlyqtx.html