美文网首页
8.5 Shuffle 过程之 IndexShuffleBloc

8.5 Shuffle 过程之 IndexShuffleBloc

作者: GongMeng | 来源:发表于2018-12-25 18:57 被阅读0次

    1. 概述

    我们前面可以看到在执行SortShuffleManager的过程中, 会初始化IndexShuffleBlockResolver 作为各种ShuffleWriter必须的初始化组件.
    IndexShuffleBlockResolver就像名字描述的一样, 主要用于shuffle blocks, 从逻辑block到物理文件之间的映射关系. 它会确保每个Map过程最终生成的block(也就是blockManager维护的那些block) 会被按照key sort后放在同一个文件里, 然后另外生成一份index file告诉使用者文件的每一段都是哪个key对应的block

    2. 主要的方法

    2.1 writeIndexFileAndCommit

    如下图所示, 写一个index文件, 里面是标记数据文件中每个block起始点的offset, 这些数据后边会被getBlockData 调用来寻找每个block的开始和结束位置
    这个方法还需要保证, data和index文件的写是atomic的, 保证两个文件的一致性

      def writeIndexFileAndCommit(
          shuffleId: Int,
          mapId: Int,
          lengths: Array[Long],
          dataTmp: File): Unit = {
    
        // 为了保证原子性, 使用了传统的先写到一个临时文件, check所有都一致, 然后重命名的策略. 如果中间发生失败就抛弃临时文件即可
        val indexFile = getIndexFile(shuffleId, mapId)
        val indexTmp = Utils.tempFileWith(indexFile)
        try {
          val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
          Utils.tryWithSafeFinally {
            // 每个block的大小转换成offset
            var offset = 0L
            out.writeLong(offset)
            for (length <- lengths) {
              offset += length
              out.writeLong(offset)
            }
          } {
            out.close()
          }
    
          val dataFile = getDataFile(shuffleId, mapId)
          // 下面这段同步代码保证了每个executor同时只会执行一个IndexShuffleBlockResolver, 后续的检测和重命名过程是源自的
          synchronized {
            val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
            if (existingLengths != null) {
              // 如果相关的index已经存在, 就可以直接退出了, 这是因为这个mapTask可能已经运行过了. 
              // 当然也可能因为其它原因失败, 但总之这次写是不成功的, 直接删除tmp文件完事
              System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
              if (dataTmp != null && dataTmp.exists()) {
                dataTmp.delete()
              }
              indexTmp.delete()
            } else {
              // 写成功的话, 删除旧的data文件和index文件, 用心的覆盖
              if (indexFile.exists()) {
                indexFile.delete()
              }
              if (dataFile.exists()) {
                dataFile.delete()
              }
              if (!indexTmp.renameTo(indexFile)) {
                throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
              }
              if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
                throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
              }
            }
          }
        } finally {
          if (indexTmp.exists() && !indexTmp.delete()) {
            logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
          }
        }
      }
    

    2.2 getBlockData

    用于根据index文件从data文件中把需要的block提取出来, 好等待后一步进入shuffle过程

    override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
        // 因为设计上是每个shuffle过程生成一个file, 这里根据shuffleId获取到它的index文件
        val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
        // 读取文件的过程
        val in = new DataInputStream(new FileInputStream(indexFile))
        try {
          ByteStreams.skipFully(in, blockId.reduceId * 8)
          val offset = in.readLong()
          val nextOffset = in.readLong()
          new FileSegmentManagedBuffer(
            transportConf,
            getDataFile(blockId.shuffleId, blockId.mapId),
            offset,
            nextOffset - offset)
        } finally {
          in.close()
        }
      }
    

    相关文章

      网友评论

          本文标题:8.5 Shuffle 过程之 IndexShuffleBloc

          本文链接:https://www.haomeiwen.com/subject/yytqlqtx.html