美文网首页Spark源码精读分析计划
Spark Core源码精读计划#27:磁盘块管理器DiskBl

Spark Core源码精读计划#27:磁盘块管理器DiskBl

作者: LittleMagic | 来源:发表于2019-07-29 22:57 被阅读18次

    目录

    前言

    我们前面用4篇文章的时间讲解了Spark存储子系统中的内存部分,其内容相当多,包括内存池MemoryPool、内存管理器MemoryManager(包含两种实现:静态内存管理器StaticMemoryManager和统一内存管理器UnifiedMemoryManager)、内存项MemoryEntry、内存存储MemoryStore。相对而言,磁盘部分的实现就比较直接而简单一些,主要包含两个组件:磁盘块管理器DiskBlockManager、磁盘存储DiskStore。它们的内容都不是特别复杂,本文就研究一下DiskBlockManager。

    磁盘块管理器DiskBlockManager

    DiskBlockManager负责维护块数据与其在磁盘上存储位置的关系。先来看看它的构造方法与属性成员。

    构造方法与属性成员

    代码#27.1 - o.a.s.storage.DiskBlockManager类的构造与属性

    private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging {
      private[spark] val subDirsPerLocalDir = conf.getInt("spark.diskStore.subDirectories", 64)
    
      private[spark] val localDirs: Array[File] = createLocalDirs(conf)
      if (localDirs.isEmpty) {
        logError("Failed to create any local dir.")
        System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
      }
    
      private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
    
      private val shutdownHook = addShutdownHook()
      
      // ......
    }
    

    DiskBlockManager接受两个参数:SparkConf实例与一个叫deleteFilesOnStop的布尔值。该值表示DiskBlockManager停止时是否要删除本地的存储目录,由BlockManager初始化它时指定。各个属性成员的含义解释如下:

    • subDirsPerLocalDir:每个存储目录下子目录的最大数量,由spark.diskStore.subDirectories配置项指定,默认值64。
    • localDirs:本地存储目录的数组,通过调用createLocalDirs()方法创建。
    • subDirs:包含子目录的本地存储目录的二维数组,其中一维的大小是localDirs.length,另一维的大小是subDirsPerLocalDir。
    • shutdownHook:DiskBlockManager的关闭钩子,通过调用addShutdownHook()方法来绑定。

    下面我们就来看看createLocalDirs()方法。

    创建本地存储目录

    代码#27.2 - o.a.s.storage.DiskBlockManager.createLocalDirs()方法

      private def createLocalDirs(conf: SparkConf): Array[File] = {
        Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
          try {
            val localDir = Utils.createDirectory(rootDir, "blockmgr")
            logInfo(s"Created local directory at $localDir")
            Some(localDir)
          } catch {
            case e: IOException =>
              logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
              None
          }
        }
      }
    

    该方法先调用通用工具类Utils中的getConfiguredLocalDirs()方法获取根目录,然后对每个根目录,调用Utils.createDirectory()方法创建存储目录。也就是说,所有磁盘存储的目录都是组织在一起的。Utils类的代码暂时就不细看了,看官只需知道getConfiguredLocalDirs()会依次检查如下几个环境变量或配置项中的路径即可:

    • LOCAL_DIRS(仅限Spark on YARN部署);
    • SPARK_EXECUTOR_DIRS;
    • SPARK_LOCAL_DIRS;
    • MESOS_DIRECTORY;
    • spark.local.dir(默认值为java.io.tmpdir)。

    然后,Utils.createDirectory()方法就会创建名称形如blockmgr-[UUID.randomUUID]的一级存储目录,但不会创建子目录。那么哪里会创建子目录呢?答案在getFile()方法中,它除了名称所述的获取文件的功能外,也兼职创建子目录。

    获取存储文件及创建子目录

    代码#27.3 - o.a.s.storage.DiskBlockManager.getFile()方法

      def getFile(filename: String): File = {
        val hash = Utils.nonNegativeHash(filename)
        val dirId = hash % localDirs.length
        val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
    
        // Create the subdirectory if it doesn't already exist
        val subDir = subDirs(dirId).synchronized {
          val old = subDirs(dirId)(subDirId)
          if (old != null) {
            old
          } else {
            val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
            if (!newDir.exists() && !newDir.mkdir()) {
              throw new IOException(s"Failed to create local dir in $newDir.")
            }
            subDirs(dirId)(subDirId) = newDir
            newDir
          }
        }
    
        new File(subDir, filename)
      }
    
      def getFile(blockId: BlockId): File = getFile(blockId.name)
    

    该方法的执行流程如下:

    1. 调用Utils.nonNegativeHash()方法,计算出文件名的哈希码的绝对值。
    2. 将哈希码与localDirs数组长度取余,作为目录的下标。再将哈希码与localDirs数组长度的商与subDirsPerLocalDir取余,作为子目录的下标。
    3. 检查文件对应的子目录是否存在。如果不存在的话,就根据子目录的下标来创建,并格式化为两位十六进制表示。
    4. 返回File对象。

    另外,getFile()方法还有将BlockId作为输入的重载,由它可见,块对应的文件名与它本身的name字段有关。

    通过上面的了解,DiskBlockManager磁盘存储的目录结构可以概括成下图。

    图#27.1 - DiskBlockManager的目录结构

    除了获取单个文件之外,还有获取所有文件及所有块ID的getAllFiles()与getAllBlocks()方法,它们的实现都很简单,代码如下。
    代码#27.4 - o.a.s.storage.DiskBlockManager.getAllFiles()/getAllBlocks()方法

      def getAllFiles(): Seq[File] = {
        subDirs.flatMap { dir =>
          dir.synchronized {
            dir.clone()
          }
        }.filter(_ != null).flatMap { dir =>
          val files = dir.listFiles()
          if (files != null) files else Seq.empty
        }
      }
    
      def getAllBlocks(): Seq[BlockId] = {
        getAllFiles().flatMap { f =>
          try {
            Some(BlockId(f.getName))
          } catch {
            case _: UnrecognizedBlockId =>
              None
          }
        }
      }
    
    

    创建临时块文件

    代码#27.5 - o.a.s.storage.DiskBlockManager.createTempLocalBlock()/createTempShuffleBlock()方法

      def createTempLocalBlock(): (TempLocalBlockId, File) = {
        var blockId = new TempLocalBlockId(UUID.randomUUID())
        while (getFile(blockId).exists()) {
          blockId = new TempLocalBlockId(UUID.randomUUID())
        }
        (blockId, getFile(blockId))
      }
    
      def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
        var blockId = new TempShuffleBlockId(UUID.randomUUID())
        while (getFile(blockId).exists()) {
          blockId = new TempShuffleBlockId(UUID.randomUUID())
        }
        (blockId, getFile(blockId))
      }
    

    这两个方法比较简单,就是用来创建Spark计算过程中的中间结果以及Shuffle Write阶段输出的存储文件。它们的块ID分别用TempLocalBlockId和TempShuffleBlockId来表示。

    绑定关闭钩子与关闭

    代码#27.6 - o.a.s.storage.DiskBlockManager.addShutdownHook()/doStop()方法

      private def addShutdownHook(): AnyRef = {
        logDebug("Adding shutdown hook") // force eager creation of logger
        ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
          logInfo("Shutdown hook called")
          DiskBlockManager.this.doStop()
        }
      }
    
      private def doStop(): Unit = {
        if (deleteFilesOnStop) {
          localDirs.foreach { localDir =>
            if (localDir.isDirectory() && localDir.exists()) {
              try {
                if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) {
                  Utils.deleteRecursively(localDir)
                }
              } catch {
                case e: Exception =>
                  logError(s"Exception while deleting local spark dir: $localDir", e)
              }
            }
          }
        }
      }
    

    由代码可见,如果deleteFilesOnStop标记为真,则在DiskBlockManager关闭之前,会调用Utils.deleteRecursively()方法递归地删掉本地存储目录。

    总结

    本文介绍了DiskBlockManager的相关设计细节,主要包含其对Spark磁盘存储目录、子目录及文件的创建和管理。至于实际的文件读写,则由磁盘存储DiskStore来负责。DiskStore的实现也比MemoryStore要来得简单,下一篇文章会来探讨它。

    相关文章

      网友评论

        本文标题:Spark Core源码精读计划#27:磁盘块管理器DiskBl

        本文链接:https://www.haomeiwen.com/subject/cglwrctx.html