美文网首页Spark商业环境实战专栏
Spark存储体系底层架构剖析-Spark商业环境实战

Spark存储体系底层架构剖析-Spark商业环境实战

作者: 开心技术社区 | 来源:发表于2018-10-30 09:30 被阅读0次

    本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。版权声明:本套Spark调优系列版权归作者(秦凯新)所有,禁止转载,欢迎学习。

    Spark商业环境实战及调优进阶系列

    1. Spark存储体系组件关系解释

    BlockInfoManger 主要提供读写锁控制,层级仅仅位于BlockManger之下,通常Spark读写操作都先调用BlockManger,然后咨询BlockInfoManger是否存在锁竞争,然后才会调用DiskStore和MemStore,进而调用DiskBlockManger来确定数据与位置映射,或者调用 MemoryManger来确定内存池的软边界和内存使用申请。

    image

    1.1 Driver 与 Executor 与 SparkEnv 与 BlockManger 组件关系:

    Driver与 Executor 组件各自拥有任务执行的SparkEnv环境,而每一个SparkEnv 中都有一个BlockManger负责存储服务,作为高层抽象,BlockManger 之间需要通过 RPCEnv,ShuffleClient,及BlocakTransferService相互通讯。

    1.1 BlockInfoManger 与 BlockInfo 共享锁和排它锁读写控制关系:

    BlockInfo中具有读写锁的标志,通过标志可以判断是否进行写控制

      val NO_WRITER: Long = -1
      val NON_TASK_WRITER: Long = -1024
      
     * The task attempt id of the task which currently holds the write lock for this block, or
     * [[BlockInfo.NON_TASK_WRITER]] if the write lock is held by non-task code, or
     * [[BlockInfo.NO_WRITER]] if this block is not locked for writing.
     
     def writerTask: Long = _writerTask
     def writerTask_=(t: Long): Unit = {
     _writerTask = t
        checkInvariants()
    

    BlockInfoManager具有BlockId与BlockInfo的映射关系以及任务id与BlockId的锁映射:

     private[this] val infos = new mutable.HashMap[BlockId, BlockInfo]  
     
     *Tracks the set of blocks that each task has locked for writing.
     private[this] val writeLocksByTask = new mutable.HashMap[TaskAttemptId, mutable.Set[BlockId]]
                                           with mutable.MultiMap[TaskAttemptId, BlockId]
     
     *Tracks the set of blocks that each task has locked for reading, along with the number of times
     *that a block has been locked (since our read locks are re-entrant).
     private[this] val readLocksByTask =
     new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]]
    

    1.3 DiskBlockManager 与 DiskStore 组件关系:

    可以看到DiskStore内部会调用DiskBlockManager来确定Block的读写位置:

    • 以下是DiskStore的抽象写操作,需要传入FileOutputStream => Unit高阶函数:

        def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
        if (contains(blockId)) {
        throw new IllegalStateException(s"Block $blockId is already present in the disk store")
        }
        logDebug(s"Attempting to put block $blockId")
        val startTime = System.currentTimeMillis
        
        val file = diskManager.getFile(blockId)
        
        val fileOutputStream = new FileOutputStream(file)
        var threwException: Boolean = true
        try {
            writeFunc(fileOutputStream)
            threwException = false
        } finally {
         try {
            Closeables.close(fileOutputStream, threwException)
         } finally {
         if (threwException) {
          remove(blockId)
                }
            }
        }
        val finishTime = System.currentTimeMillis
        logDebug("Block %s stored as %s file on disk in %d ms".format(
        file.getName,
        Utils.bytesToString(file.length()),
        finishTime - startTime))
        }
      
    • 以下是DiskStore的读操作,调用DiskBlockManager来获取数据位置:

        def getBytes(blockId: BlockId): ChunkedByteBuffer = {
        
        val file = diskManager.getFile(blockId.name)
       
        val channel = new RandomAccessFile(file, "r").getChannel
        Utils.tryWithSafeFinally {
      * For small files, directly read rather than memory map
        if (file.length < minMemoryMapBytes) {
        val buf = ByteBuffer.allocate(file.length.toInt)
        channel.position(0)
        while (buf.remaining() != 0) {
          if (channel.read(buf) == -1) {
            throw new IOException("Reached EOF before filling buffer\n" +
              s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
          }
        }
        buf.flip()
        new ChunkedByteBuffer(buf)
        } else {
        new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
            }
        } {
        channel.close()
         }
        }
      

    1.3 MemManager 与 MemStore 与 MemoryPool 组件关系:

    在这里要强调的是:第一代大数据框架hadoop只将内存作为计算资源,而Spark不仅将内存作为计算资源外,还将内存的一部分纳入存储体系:

    • 内存池模型 :逻辑上分为堆内存和堆外内存,然后堆内存(或堆外内存)内部又分为StorageMemoryPool和ExecutionMemoryPool。
    • MemManager是抽象的,定义了内存管理器的接口规范,方便以后扩展,比如:老版的StaticMemoryManager和新版的UnifiedMemoryManager.
    • MemStore 依赖于UnifiedMemoryManager进行内存的申请和软边界变化或内存释放。
    • MemStore 内部同时负责存储真实的对象,比如内部成员变量:entries ,建立了内存中的BlockId与MemoryEntry(Block的内存的形式)之间的映射。
    • MemStore 内部的“占座”行为,如:内部变量offHeapUnrollMemoryMap 和onHeapUnrollMemoryMap。

    1.4 BlockManagerMaster 与 BlockManager 组件关系:

    • BlockManagerMaster的作用就是对存在于Dirver或Executor上的BlockManger进行统一管理,这简直是代理行为,因为他持有BlockManagerMasterEndpointREf,进而和BlockManagerMasterEndpoint进行通讯。

    2. Spark存储体系组件BlockTransferServic传输服务

    未完待续

    3. 总结

    存储体系是Spark的基石,我争取把每一块细微的知识点进行剖析,和大部分博客不同的是,我会尽量采用最平实的语言,毕竟技术就是一层窗户纸。

    秦凯新 20181031 凌晨

    相关文章

      网友评论

        本文标题:Spark存储体系底层架构剖析-Spark商业环境实战

        本文链接:https://www.haomeiwen.com/subject/ovmwtqtx.html