美文网首页大数据
【Spark】Spark 存储原理--读数据过程

【Spark】Spark 存储原理--读数据过程

作者: w1992wishes | 来源:发表于2019-03-22 21:00 被阅读0次

    本篇结构:

    • 读取数据块过程
    • 内存读取
    • 磁盘读取
    • 远程读取

    一、读取数据块过程

    BlockManager 的 get 方法是读数据的入口点,有本地读取和远程读取两个分叉口。本地读取使用 getLocalValues 方法,根据存储级别的不同,使用 MemoryStore.getValues 或者 DiskStore.getBytes 读取数据。

    远程读取使用 getRemoteValues 方法,调用远程数据传输服务类 BlockTransferService 的 fetchBlockSync 获取数据。

    完整的数据读取过程如下:

    二、内存读取

    根据缓存的数据是否反序列化,getLocalValues 读取内存中的数据方法不同,如果反序列化,则调用 MemoryStore 的 getValues 方法,如果没有反序列化,则调用 MemoryStore 的 getBytes 方法。

    BlockManager # getLocalValues:

    if (level.useMemory && memoryStore.contains(blockId)) {
      // 如果反序列化,则直接读取内存中的数据
      val iter: Iterator[Any] = if (level.deserialized) {
        memoryStore.getValues(blockId).get
      } else {
       // 否则读取字节数组,并需要做反序列化处理
        serializerManager.dataDeserializeStream(
          blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
      }
      // We need to capture the current taskId in case the iterator completion is triggered
      // from a different thread which does not have TaskContext set; see SPARK-18406 for
      // discussion.
      // 返回数据及数据块大小、读取方法等
      val ci = CompletionIterator[Any, Iterator[Any]](iter, {
        releaseLock(blockId, taskAttemptId)
      })
      Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
    

    在 MemoryStore 中, getValues 和 getBytes 都根据 BlockId 获取内存中的数据块。

    MemoryStore # getValues:

    def getValues(blockId: BlockId): Option[Iterator[_]] = {
      val entry = entries.synchronized { entries.get(blockId) }
      entry match {
        case null => None
        case e: SerializedMemoryEntry[_] =>
          throw new IllegalArgumentException("should only call getValues on deserialized blocks")
        case DeserializedMemoryEntry(values, _, _) =>
          val x = Some(values)
          x.map(_.iterator)
      }
    }
    

    MemoryStore # getBytes:

    def getBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
      val entry = entries.synchronized { entries.get(blockId) }
      entry match {
        case null => None
        case e: DeserializedMemoryEntry[_] =>
          throw new IllegalArgumentException("should only call getBytes on serialized blocks")
        case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
      }
    }
    

    观察 entries,发现其实就是一个 LinkedHashMap。所以缓存在内存里的数据都是放入 LinkedHashMap 中。

    private val entries = new LinkedHashMap[BlockId, MemoryEntry[_]](32, 0.75f, true)
    

    LinkedHashMap 保存了插入的顺序,遍历 LinkedHashMap 时,先得到的记录是先插入的。如果内存不够,先保存的数据会被先清除。

    三、磁盘读取

    getLocalValues 方法中,根据缓存级别,如果使用磁盘缓存,则调用 DiskStore 的 getBytes 方法。

    BlockManager # getLocalValues:

    else if (level.useDisk && diskStore.contains(blockId)) {
        // 从磁盘中获取数据,由于保存到磁盘的数据是序列化的,读取到的数据也是序列化后的
        val diskData = diskStore.getBytes(blockId)
      val iterToReturn: Iterator[Any] = {
        if (level.deserialized) {
          // 如果储存级别需要反序列化,则先反序列化,然后根据是否 level.useMemory 的值,判断是否存储到内存中
          val diskValues = serializerManager.dataDeserializeStream(
            blockId,
            diskData.toInputStream())(info.classTag)
          maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
        } else {
          // 如果不需要反序列化,则直接判断是否需要将这些序列化数据缓存到内存中
          val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskData)
            .map { _.toInputStream(dispose = false) }
            .getOrElse { diskData.toInputStream() }
          // 返回的数据需做反序列化处理
          serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
        }
      }
      val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
        releaseLockAndDispose(blockId, diskData, taskAttemptId)
      })
      // 返回数据及数据块大小、读取方法等
      Some(new BlockResult(ci, DataReadMethod.Disk, info.size))
    

    重点看 DiskStore # getBytes:

    def getBytes(blockId: BlockId): BlockData = {
      val file = diskManager.getFile(blockId.name)
      val blockSize = getSize(blockId)
    
      securityManager.getIOEncryptionKey() match {
        case Some(key) =>
          // Encrypted blocks cannot be memory mapped; return a special object that does decryption
          // and provides InputStream / FileRegion implementations for reading the data.
          new EncryptedBlockData(file, blockSize, conf, key)
    
        case _ =>
          new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
      }
    }
    

    3.1、获取磁盘存储目录

    DiskStore 通过 DiskBlockManager 管理 Block 和相应磁盘文件的映射关系,从而将 Block 存储到磁盘的文件中。

    val file = diskManager.getFile(blockId.name)
    

    DiskBlockManager 根据 LOCAL_DIRS(yarn模式),SPARK_LOCAL_DIRS 或 spark.local.dir(其他模式,默认值 System.getProperty(“java.io.tmpdir“))配置的本地根目录(可能有多个,以逗号分隔)来生成 DiskStore 存放 Block 的根目录(与配置的根目录对应,也有可能有多个):

    • …/blockmgr-UUID.randomUUID.toString(yarn模式)
    • …/spark-UUID.randomUUID.toString/blockmgr-UUID.randomUUID.toString(其他模式)

    同时 DiskBlockManager 会为每个根目录生成conf.getInt(“spark.diskStore.subDirectories“, 64) 个子目录用来存放 Block 对应的文件,每个 Block 会根据它的 name 哈希到相应的子目录,然后以 Block 的 name 为文件名来生成文件存储。

    具体过程参看 DiskBlockManager 的 localDirs 属性赋值过程:

    private[spark] val localDirs: Array[File] = createLocalDirs(conf)
    

    DiskBlockManager # createLocalDirs :

    /**
     * Create local directories for storing block data. These directories are
     * located inside configured local directories and won't
     * be deleted on JVM exit when using the external shuffle service.
     */
    private def createLocalDirs(conf: SparkConf): Array[File] = {
      Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
        try {
          val localDir = Utils.createDirectory(rootDir, "blockmgr")
          logInfo(s"Created local directory at $localDir")
          Some(localDir)
        } catch {
          case e: IOException =>
            logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e)
            None
        }
      }
    }
    

    DiskBlockManager # getConfiguredLocalDirs:

    def getConfiguredLocalDirs(conf: SparkConf): Array[String] = {
      val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
      if (isRunningInYarnContainer(conf)) {
        // If we are in yarn mode, systems can have different disk layouts so we must set it
        // to what Yarn on this system said was available. Note this assumes that Yarn has
        // created the directories already, and that they are secured so that only the
        // user has access to them.
        getYarnLocalDirs(conf).split(",")
      } else if (conf.getenv("SPARK_EXECUTOR_DIRS") != null) {
        conf.getenv("SPARK_EXECUTOR_DIRS").split(File.pathSeparator)
      } else if (conf.getenv("SPARK_LOCAL_DIRS") != null) {
        conf.getenv("SPARK_LOCAL_DIRS").split(",")
      } else if (conf.getenv("MESOS_DIRECTORY") != null && !shuffleServiceEnabled) {
        // Mesos already creates a directory per Mesos task. Spark should use that directory
        // instead so all temporary files are automatically cleaned up when the Mesos task ends.
        // Note that we don't want this if the shuffle service is enabled because we want to
        // continue to serve shuffle files after the executors that wrote them have already exited.
        Array(conf.getenv("MESOS_DIRECTORY"))
      } else {
        if (conf.getenv("MESOS_DIRECTORY") != null && shuffleServiceEnabled) {
          logInfo("MESOS_DIRECTORY available but not using provided Mesos sandbox because " +
            "spark.shuffle.service.enabled is enabled.")
        }
        // In non-Yarn mode (or for the driver in yarn-client mode), we cannot trust the user
        // configuration to point to a secure directory. So create a subdirectory with restricted
        // permissions under each listed directory.
        conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(",")
      }
    }
    

    3.2、从文件读取数据块

    参看 DiskBlockData 源码:

    private class DiskBlockData(
        minMemoryMapBytes: Long,
        maxMemoryMapBytes: Long,
        file: File,
        blockSize: Long) extends BlockData {
    
      override def toInputStream(): InputStream = new FileInputStream(file)
    
      /**
      * Returns a Netty-friendly wrapper for the block's data.
      *
      * Please see `ManagedBuffer.convertToNetty()` for more details.
      */
      override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)
    
      override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
        Utils.tryWithResource(open()) { channel =>
          var remaining = blockSize
          val chunks = new ListBuffer[ByteBuffer]()
          while (remaining > 0) {
            val chunkSize = math.min(remaining, maxMemoryMapBytes)
            val chunk = allocator(chunkSize.toInt)
            remaining -= chunkSize
            JavaUtils.readFully(channel, chunk)
            chunk.flip()
            chunks += chunk
          }
          new ChunkedByteBuffer(chunks.toArray)
        }
      }
    
      override def toByteBuffer(): ByteBuffer = {
        require(blockSize < maxMemoryMapBytes,
          s"can't create a byte buffer of size $blockSize" +
          s" since it exceeds ${Utils.bytesToString(maxMemoryMapBytes)}.")
        Utils.tryWithResource(open()) { channel =>
          if (blockSize < minMemoryMapBytes) {
            // For small files, directly read rather than memory map.
            val buf = ByteBuffer.allocate(blockSize.toInt)
            JavaUtils.readFully(channel, buf)
            buf.flip()
            buf
          } else {
            channel.map(MapMode.READ_ONLY, 0, file.length)
          }
        }
      }
    
      override def size: Long = blockSize
    
      override def dispose(): Unit = {}
    
      private def open() = new FileInputStream(file).getChannel
    }
    

    提供 toInputStream、toChunkedByteBuffer、 toByteBuffer 的方式读取数据。

    四、远程读取

    Spark 读取远程节点的数据,依赖 Netty 实现的 Spark Rpc 框架,涉及两个重要的类:

    • NettyBlockTransferService:为 Shuffle、存储模块提供了数据存取的接口实现,接收到数据存取的命令时,通过 Netty RPC 框架发送消息给指定节点,请求进行数据存取操作。
    • NettyBlockRpcServer:Executor启动时,会启动 RPC 监听器,当监听到消息时将消息传递到该类进行处理,消息包括读取数据 OpenBlocks 和写入数据 uploadBlock 两种。

    4.1、获取数据块位置

    入口为 BlockManager # getRemoteValues,接着调用 getRemoteBytes 方法。在 getRemoteBytes 方法中调用 getLocationsAndStatus 方法向 BlockManagerMasterEndpoint 发送 GetLocationsAndStatus 消息,请求数据块所在的位置和状态。

    /**
     * Get block from remote block managers.
     *
     * This does not acquire a lock on this block in this JVM.
     */
    private def getRemoteValues[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
      val ct = implicitly[ClassTag[T]]
      getRemoteBytes(blockId).map { data =>
        val values =
          serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))(ct)
        new BlockResult(values, DataReadMethod.Network, data.size)
      }
    }
    

    BlockManagerMaster # getLocationsAndStatus:

    /** Get locations as well as status of the blockId from the driver */
    def getLocationsAndStatus(blockId: BlockId): Option[BlockLocationsAndStatus] = {
      driverEndpoint.askSync[Option[BlockLocationsAndStatus]](
        GetLocationsAndStatus(blockId))
    }
    

    获取到 Block 的位置列表后,BlockManager 的 getRemoteBytes 方法中调用 BlockTransferService 的 fetchBlockSync 方法。

    4.2、向数据块所在节点发送 OpenBlocks 消息

    BlockTransferService 的 fetchBlockSync 调用其实现 NettyBlockTransferService 的fetchBlocks 方法。

    /**
     * A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
     *
     * It is also only available after [[init]] is invoked.
     */
    def fetchBlockSync(
        host: String,
        port: Int,
        execId: String,
        blockId: String,
        tempFileManager: TempFileManager): ManagedBuffer = {
      // A monitor for the thread to wait on.
      val result = Promise[ManagedBuffer]()
      fetchBlocks(host, port, execId, Array(blockId),
        new BlockFetchingListener {
          override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
            result.failure(exception)
          }
          override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
            data match {
              case f: FileSegmentManagedBuffer =>
                result.success(f)
              case _ =>
                val ret = ByteBuffer.allocate(data.size.toInt)
                ret.put(data.nioByteBuffer())
                ret.flip()
                result.success(new NioManagedBuffer(ret))
            }
          }
        }, tempFileManager)
      ThreadUtils.awaitResult(result.future, Duration.Inf)
    }
    

    NettyBlockTransferService # fetchBlocks:

    override def fetchBlocks(
        host: String,
        port: Int,
        execId: String,
        blockIds: Array[String],
        listener: BlockFetchingListener,
        tempFileManager: TempFileManager): Unit = {
      logTrace(s"Fetch blocks from $host:$port (executor id $execId)")
      try {
        val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
          override def createAndStart(blockIds: Array[String], listener: BlockFetchingListener) {
            // 根据远程节点的地址和端口创建通信客户端 
            val client = clientFactory.createClient(host, port)
            // 通过该客户端向指定节点发送读取数据消息
            new OneForOneBlockFetcher(client, appId, execId, blockIds, listener,
              transportConf, tempFileManager).start()
          }
        }
    
        val maxRetries = transportConf.maxIORetries()
        if (maxRetries > 0) {
          // Note this Fetcher will correctly handle maxRetries == 0; we avoid it just in case there's
          // a bug in this code. We should remove the if statement once we're sure of the stability.
          new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, listener).start()
        } else {
          blockFetchStarter.createAndStart(blockIds, listener)
        }
      } catch {
        case e: Exception =>
          logError("Exception while beginning fetchBlocks", e)
          blockIds.foreach(listener.onBlockFetchFailure(_, e))
      }
    }
    

    fetchBlocks 中,根据远程节点的地址和端口创建通信客户端 TransportClient,通过该客户端向指定节点发送读取数据消息。

    消息的具体发送是在 OneForOneBlockFetcher 的 start 方法中。

    public void start() {
      if (blockIds.length == 0) {
        throw new IllegalArgumentException("Zero-sized blockIds array");
      }
    
      client.sendRpc(openMessage.toByteBuffer(), new RpcResponseCallback() {
        @Override
        public void onSuccess(ByteBuffer response) {
            ...
        }
    
        @Override
        public void onFailure(Throwable e) {
            ...
        }
      });
    }
    

    openMessage 是 OpenBlocks 类型。

    this.openMessage = new OpenBlocks(appId, execId, blockIds);
    

    4.3、远程节点响应并传输对应的数据块

    对应的远程节点监听消息,当接收到消息后,在 NettyBlockRpcServer 中进行消息匹配。

    override def receive(
        client: TransportClient,
        rpcMessage: ByteBuffer,
        responseContext: RpcResponseCallback): Unit = {
      val message = BlockTransferMessage.Decoder.fromByteBuffer(rpcMessage)
      logTrace(s"Received request: $message")
    
      message match {
        case openBlocks: OpenBlocks =>
          val blocksNum = openBlocks.blockIds.length
          val blocks = for (i <- (0 until blocksNum).view)
            yield blockManager.getBlockData(BlockId.apply(openBlocks.blockIds(i)))
          // 注册 ManagedBuffer,利用 Netty 传输
          val streamId = streamManager.registerStream(appId, blocks.iterator.asJava)
          logTrace(s"Registered streamId $streamId with $blocksNum buffers")
          responseContext.onSuccess(new StreamHandle(streamId, blocksNum).toByteBuffer)
    
        case uploadBlock: UploadBlock =>
          // StorageLevel and ClassTag are serialized as bytes using our JavaSerializer.
          val (level: StorageLevel, classTag: ClassTag[_]) = {
            serializer
              .newInstance()
              .deserialize(ByteBuffer.wrap(uploadBlock.metadata))
              .asInstanceOf[(StorageLevel, ClassTag[_])]
          }
          val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
          val blockId = BlockId(uploadBlock.blockId)
          blockManager.putBlockData(blockId, data, level, classTag)
          responseContext.onSuccess(ByteBuffer.allocate(0))
      }
    }
    

    如上源码,当匹配到 OpenBlocks 时,调用 BlockManager 的 getBlockData 方法读取该节点上的数据。读取的数据块封装为 ManagedBuffer ,然后使用 Netty 传输通道,把数据传递到请求节点上,完成数据传输。

    BlockManager # getBlockData:

    /**
     * Interface to get local block data. Throws an exception if the block cannot be found or
     * cannot be read successfully.
     */
    override def getBlockData(blockId: BlockId): ManagedBuffer = {
      if (blockId.isShuffle) {
        shuffleManager.shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId])
      } else {
        getLocalBytes(blockId) match {
          case Some(blockData) =>
            new BlockManagerManagedBuffer(blockInfoManager, blockId, blockData, true)
          case None =>
            // If this block manager receives a request for a block that it doesn't have then it's
            // likely that the master has outdated block statuses for this block. Therefore, we send
            // an RPC so that this block is marked as being unavailable from this block manager.
            reportBlockStatus(blockId, BlockStatus.empty)
            throw new BlockNotFoundException(blockId.toString)
        }
      }
    }
    

    相关文章

      网友评论

        本文标题:【Spark】Spark 存储原理--读数据过程

        本文链接:https://www.haomeiwen.com/subject/lfrdvqtx.html