在Spark中,只要涉及到非Partition级别的数据,都会有一个Block的概念,而这里的Block并不是HDFS的Block,而是Spark内部为了数据存储而设立的一个概念,每个Block都会有BlockId,BlockInfo等信息。所以这里先介绍一下BlockId和BlockInfo。
1 BlockId
该部分代码在spark-core模块的org.apache.spark.storage包中,可以看到是将其作为存储部分进行区分的。BlockId是一个不可修改的抽象类,有一个name,及几个判断方法;
sealed abstract class BlockId {
/** A globally unique identifier for this Block. Can be used for ser/de. */
def name: String
// convenience methods
def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
def isRDD: Boolean = isInstanceOf[RDDBlockId]
def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
override def toString: String = name
}
BlockId有多个继承case class,包括RDDBlockId、ShuffleBlockId、ShuffleIndexBlockId等,只根据传入的rddId/reduceId/mapId等来计算的到Blockid,是一个字符串。
还定义了一个BlockId的对象,用于解构具体的BlockId,根据名称的到具体的BlockId,使用了scala的模式匹配以及正则表达式:
object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
val TASKRESULT = "taskresult_([0-9]+)".r
val STREAM = "input-([0-9]+)-([0-9]+)".r
val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
val TEST = "test_(.*)".r
def apply(name: String): BlockId = name match {
case RDD(rddId, splitIndex) => RDDBlockId(rddId.toInt, splitIndex.toInt)
case SHUFFLE(shuffleId, mapId, reduceId) => ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
case SHUFFLE_DATA(shuffleId, mapId, reduceId) => ShuffleDataBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
case SHUFFLE_INDEX(shuffleId, mapId, reduceId) => ShuffleIndexBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
case BROADCAST(broadcastId, field) => BroadcastBlockId(broadcastId.toLong, field.stripPrefix("_"))
case TASKRESULT(taskId) => TaskResultBlockId(taskId.toLong)
case STREAM(streamId, uniqueId) => StreamBlockId(streamId.toInt, uniqueId.toLong)
case TEMP_LOCAL(uuid) => TempLocalBlockId(UUID.fromString(uuid))
case TEMP_SHUFFLE(uuid) => TempShuffleBlockId(UUID.fromString(uuid))
case TEST(value) => TestBlockId(value)
case _ => throw new UnrecognizedBlockId(name)
}
}
通过正则表达式,我们也可以得知不同的Block在命名上有什么区别。
2 BlockInfo
用户描述Block得元数据信息,包括得属性又:
level:Block得存储级别,即StorageLevel
classTag:Block得类型
tellMaster:Block是否需要告知Master
_size:Block得大小
_readerCount:当Block被加锁时候得读取次数
_writerTask:由于Task在写Block时候需要获得锁,这里存放每次获取锁得TaskId,初始值为-1;
3 BlockManager
每个Executor会创建一个BlockManager,在其中运行的一个或多个Task都会共用BlockManager。我们所常用的Broadcast的数据被driver分发后,就存储在BlockManager中。
BlockManager有所有关于存储、使用、移除Block的方法。
网友评论