美文网首页
8.5 Shuffle 过程之 IndexShuffleBloc

8.5 Shuffle 过程之 IndexShuffleBloc

作者: GongMeng | 来源:发表于2018-12-25 18:57 被阅读0次

1. 概述

我们前面可以看到在执行SortShuffleManager的过程中, 会初始化IndexShuffleBlockResolver 作为各种ShuffleWriter必须的初始化组件.
IndexShuffleBlockResolver就像名字描述的一样, 主要用于shuffle blocks, 从逻辑block到物理文件之间的映射关系. 它会确保每个Map过程最终生成的block(也就是blockManager维护的那些block) 会被按照key sort后放在同一个文件里, 然后另外生成一份index file告诉使用者文件的每一段都是哪个key对应的block

2. 主要的方法

2.1 writeIndexFileAndCommit

如下图所示, 写一个index文件, 里面是标记数据文件中每个block起始点的offset, 这些数据后边会被getBlockData 调用来寻找每个block的开始和结束位置
这个方法还需要保证, data和index文件的写是atomic的, 保证两个文件的一致性

  def writeIndexFileAndCommit(
      shuffleId: Int,
      mapId: Int,
      lengths: Array[Long],
      dataTmp: File): Unit = {

    // 为了保证原子性, 使用了传统的先写到一个临时文件, check所有都一致, 然后重命名的策略. 如果中间发生失败就抛弃临时文件即可
    val indexFile = getIndexFile(shuffleId, mapId)
    val indexTmp = Utils.tempFileWith(indexFile)
    try {
      val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
      Utils.tryWithSafeFinally {
        // 每个block的大小转换成offset
        var offset = 0L
        out.writeLong(offset)
        for (length <- lengths) {
          offset += length
          out.writeLong(offset)
        }
      } {
        out.close()
      }

      val dataFile = getDataFile(shuffleId, mapId)
      // 下面这段同步代码保证了每个executor同时只会执行一个IndexShuffleBlockResolver, 后续的检测和重命名过程是源自的
      synchronized {
        val existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
        if (existingLengths != null) {
          // 如果相关的index已经存在, 就可以直接退出了, 这是因为这个mapTask可能已经运行过了. 
          // 当然也可能因为其它原因失败, 但总之这次写是不成功的, 直接删除tmp文件完事
          System.arraycopy(existingLengths, 0, lengths, 0, lengths.length)
          if (dataTmp != null && dataTmp.exists()) {
            dataTmp.delete()
          }
          indexTmp.delete()
        } else {
          // 写成功的话, 删除旧的data文件和index文件, 用心的覆盖
          if (indexFile.exists()) {
            indexFile.delete()
          }
          if (dataFile.exists()) {
            dataFile.delete()
          }
          if (!indexTmp.renameTo(indexFile)) {
            throw new IOException("fail to rename file " + indexTmp + " to " + indexFile)
          }
          if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
            throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
          }
        }
      }
    } finally {
      if (indexTmp.exists() && !indexTmp.delete()) {
        logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
      }
    }
  }

2.2 getBlockData

用于根据index文件从data文件中把需要的block提取出来, 好等待后一步进入shuffle过程

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
    // 因为设计上是每个shuffle过程生成一个file, 这里根据shuffleId获取到它的index文件
    val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
    // 读取文件的过程
    val in = new DataInputStream(new FileInputStream(indexFile))
    try {
      ByteStreams.skipFully(in, blockId.reduceId * 8)
      val offset = in.readLong()
      val nextOffset = in.readLong()
      new FileSegmentManagedBuffer(
        transportConf,
        getDataFile(blockId.shuffleId, blockId.mapId),
        offset,
        nextOffset - offset)
    } finally {
      in.close()
    }
  }

相关文章

网友评论

      本文标题:8.5 Shuffle 过程之 IndexShuffleBloc

      本文链接:https://www.haomeiwen.com/subject/yytqlqtx.html