- 概述
说明:代码为tag v3.0.0 preview2时master分支
可以了解:
1.依赖的构建。
2.分区的计算。
3.作业提交模型
4.stage划分细节
5.task分配细节
6.spark处理数据模型
....
最初目的是想了解spark具体是怎样读取数据。但是单一的去看textFile算子代码,发现只是相继new出两个对象hadoopRDD和MapPartitionsRDD,没有调用读数据的具体操作。
然后为了一探究竟,以没有shuffle的简单3行代码为例, 分析其执行以及读数逻辑。
val rddFile = sc.textFile("...")
val rddMap = rdd.map(_.split(","))
print(rddMap.count())
- 代码概述:
textFile会创建HadoopRDD,并初始化它的父类RDD类,然后调用父类的map方法并传入一个只取tuple._2 的函数f,map方法里将用一个新匿名函数来包装 函数f,用来构建MapPartitionsRDD,构建同时也会初始化其父类RDD类,且传入当前rdd用以标识依赖。然后会触发第二行代码map(_.split(","))。此时就会直接调用父类的map方法,传入函数自然就是我们写的split的函数,同样新匿名函数进行包装,构建新的MapPartitionsRDD,初始化父类Rdd,传入依赖rdd。 - 关键代码:
//RDD类
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
//初始化时传入,还有一辅助构造函数用来将rdd转Seq[Dependency[_]]。
)
//RDD类中的map方法
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
// clean方法实际上调用了ClosureCleaner的clean方法,旨在清除闭包中的不能序列化的变量,防止RDD在网络传输过程中反序列化失败[1]
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
//MapPartitionsRDD类
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false)
extends RDD[U](prev)
说明:第一行代码执行完返回的是MapPartitionsRDD,内部其实还构建了一个HadoopRDD,MapPartitionsRDD的操作其实取value.toString。
-
print(rddMap.count())代码
概述:
1.count内部调用sc.runJob(this, Utils.getIteratorSize _).sum runJob函数会触发DagScheduler去分解任务并提交到TaskScheduler.
2.参数this就是最后调用count操作的rdd,Utils.getIteratorSize一个工具类方法,该方法传入参数为迭代器,方法逻辑 进行迭代器累加,这里其实就是一个partition数据的累加。后面.sum就是每个分区数据sum。
3.注意第二个参数后面有一下划线,是将工具类中定义的方法转为函数,因为函数才能作为参数传递,而方法是不行的。
textFile.png
-
分析runJob:
1.计算分区。
2.提交 job到DAGscheduler. -
1.计算分区,从当前rdd的检查点获取没有则从getPartitions计算,getPartitions会先获取到rdd的依赖,然后再从依赖的rdd中获取partition,也是从检查点或getPartitions计算。递归到第一个HadoopRdd没有依赖时,根据conf,最小分区数,底层数据文件个数以及大小等。获取到InputSplit,代表 hdfs 中的一份数据 ,就是一个 HadoopPartition。其定义了分割的长度及位置。分割长度 是指分割数据的大小(以字节为单位),而分割位置 是分割所在的机器结点名称组成的列表, 分割位置中就能获取到 数据所在的 host 和 rack。
代码:这里获取到分区然后取一个range
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
rdd_partitions.png
-
2.submit 到 DAGScheduler,先构建JobSubmitted(包括rdd,分区,分区处理函数等信息),提交到eventProcessLoop内的BlockingQueue。DAGScheduler在初始化时start了一个线程,用以死循环从loop中取事件,进行分事件类型处。并构建一个waiter,监听作业task完成执行resultHandler(将每个分区结果映射到数组)。
postJob.png
代码:
sparkcontext 类
def runJob[T, U: ClassTag](
rdd: RDD[T],//map操作rdd
func: (TaskContext, Iterator[T]) => U,//迭代器累加的函数
partitions: Seq[Int],//partition数量
//将分区结果res,映射到数组index位置匿名函数,
//val results = new Array[U](partitions.size)
//(index, res) => results(index) = res)
resultHandler: (Int, U) => Unit): Unit = {
val callSite = getCallSite//包含最靠近栈顶的用户类及最靠近栈底的Scala或Spark核心类信息
val cleanedFunc = clean(func)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
}
//DAGScheduler 类 submitJob在返回waiter
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
}
//构建JobSubmitted,并提交到事件循环处理器
def submitJob[T, U](
rdd: RDD[T],//split操作rdd
func: (TaskContext, Iterator[T]) => U,//迭代器累加函数
partitions: Seq[Int],//partition数量
callSite: CallSite,
resultHandler: (Int, U) => Unit,//结果处理句柄函数
properties: Properties): JobWaiter[U] = {
val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
Utils.cloneProperties(properties)))
waiter
}
- 根据JobSubmitted事件,用handleJobSubmitted处理先划分stage,再按stage顺序依次提交到taskScheduler.
划分stage,
1.createResultStage创建finalStage,在创建finalStage时,调用getorCreate获取父stage,首先依次遍历当前rdd依赖,先找到rdd所有宽依赖,再遍历这些宽依赖。
2.对宽依赖里的rdd继续深度遍历,找到当前rdd所有祖宗的宽依赖。
3.遍历2中所有宽依赖准备创建stage,创建stage时会传入上一个stage。所以会根据当前rdd重新调用getorCreate。递归终止就是父stage为空返回,创建第一个stage.然后第一个stage返回创建第二个...。
4.递归创建完后,会返回一个stage,然后根据最后一个rdd创建finalstage.
说明:每个stage包含其父stage,包含宽依赖信息,分区信息。
createStage.png代码
//开始创建stage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
//创建stage前,先获取父stage.
private def createResultStage(...): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stage
}
//1.先获取当前rdd直系父宽依赖,也就是他的爷爷宽依赖是获取不到的getShuffleDependencies。
//2.遍历依赖获取stagegetOrCreateShuffleMapStage
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
}
//create依赖or Get。
//1.create时先拿到所有祖宗依赖,指深度遍历,所有宽依赖子节点,都会拿到。并遍历所有宽依赖,创建stage.
//2.根据当前宽依赖的rdd的所有祖宗依赖创建完后,会g根据当前宽依赖创建stage
private def getOrCreateShuffleMapStage(...)ShuffleMapStage={
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) =>
stage
case None =>
//拿到所有祖宗依赖,然后遍历
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
// Finally, create a stage for the given shuffle
//根据当前宽依赖创建stage
dependency.createShuffleMapStage(shuffleDep, firstJobId)
}
}
//创建stage,
//1.拿到需要宽依赖中的rdd,调用getOrCreateParentStages获取父stage。然后创建ShuffleMapStage。
def createShuffleMapStage(...): ShuffleMapStage = {
val rdd = shuffleDep.rdd
//先获取父stage,获取不到时,为空(第一个stage)
val parents = getOrCreateParentStages(rdd, jobId)
val stage = new ShuffleMapStage(
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
stage
}
- 在handleJobSubmitted中创建完stage,然后更新各辅助变量,最后submitStage。提交过程也是递归提交,先递归到底找到没有父宽依赖的stage,进行提交。
private def submitStage(stage: Stage): Unit = {
//更具最后一个stage找打所有stage,并按id排序
val missing = getMissingParentStages(stage).sortBy(_.id)
//如果没有父stage,真正提交到taskScheduler.
if (missing.isEmpty) {
submitMissingTasks(stage, jobId.get)
} else {
//找到的父stage,再次递归调用submitStage,进行查找再提交
for (parent <- missing) {
submitStage(parent)
}
}
}
- submitMissingTasks 提交stage,会在DAGscheduler中将stage转为task,提交到TaskScheduler处理。
1.计算task偏好位置,为stage内的每个分区计算.计算好的位置,将在后面用来分配task在哪执行。
偏好位置为TaskLocation,一个trait,有三个实现类,代表数据存储在不同的位置:
数据存储在Executor内存中,即Partition被cache到了内存中(返回executorId+host)
数据存储在HDFS上(返回host)
数据存储在host这个节点的磁盘上(返回“hdfs_cache”+host)
rdd顺着窄依赖, 往上找父依赖, 直到找到第一个窄依赖, 也就是找数据读取源头。这里是HadoopRDD, 那么每个 task 处理的数据就是一个 HadoopPartition, 其代表 hdfs 中的一份数据 InputSplit, 定义了分割的长度及位置。
读取的是 shuffle 的数据,那么会根据其shufflePartition去查找上个stage写出数据的位置。map写阶段根据分区器生成多少个分区。shufflePartition根据这多少个分区生成一个分区数组。
2.先序列化stage和依赖信息,再构建task,最后用taskSet封装其stage所有task提交到taskScheduler
//计算每个stage里每个分区的位置偏好
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
//序列化stage中最后一个rdd,和依赖信息
//构建task,为每个分区分别构建不同类型的task
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
new ShuffleMapTask(stage.id,
taskBinary, part, locs, properties,...)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
new ResultTask(stage.id,
taskBinary, part, locs, ....)
}
}
}
//最后用taskSet封装提交到taskScheduler
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
-
此时,已经到了taskScheduler.
1.为当前TaskSet创建TaskSetManager,TaskSetManager负责管理一个taskSet的每一个task,管理task分配,重试,延迟调度等。(创建)
2.当前taskSet添加到调度池中 ,调度池有两个实现FIFOSchedulerBuilder,FairSchedulerBuilder,并且默认是FIFO。
taskManager.png
关键代码:
//TaskSchedulerImpl类
override def submitTasks(taskSet: TaskSet): Unit = {
val tasks = taskSet.tasks
//创建TaskSet的Manager
val manager = createTaskSetManager(taskSet, maxTaskFailures)
//将Manager加入调度池。
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
//调用CoarseGrainedSchedulerBackend的reviveOffers方法向driveEndPoint发送消息获取资源并计算task执行位置,最后LaunchTask
backend.reviveOffers()
}
//TaskManager类
//创建时会将taskSet里的task按偏好位置加入各pendingTask
addPendingTasks()
private def addPendingTasks(): Unit = {
for (i <- (0 until numTasks).reverse) {
addPendingTask(i, resolveRacks = false)
}
}
private[spark] def addPendingTask(
index: Int,
resolveRacks: Boolean = true,
speculatable: Boolean = false): Unit = {
val pendingTaskSetToAddTo = pendingTask(包含各本地性级别的pendingtask)
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
//task的偏好位置有execId,将task的index加入forExecutor的pendingtask
pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
//HDFS里也会判断task位置偏好的host是否更封装的资源有一样的host,有则拿出host主机里的executor,去查看task里是否有对应execId,有则加入forExecutor的pendingtask
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
case _ =>
}
//遍历完task的偏好位置,会将所有task加入forHost的pendingtask。表示每个task都会有host本地性级别
pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
//解析机架默认为false,将机架加入pendingtask
if (resolveRacks) {
sched.getRackForHost(loc.host).foreach { rack =>
pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
}
//task偏好位置为Nil在会加入noPrefs的pendingtask
if (tasks(index).preferredLocations == Nil) {
pendingTaskSetToAddTo.noPrefs += index
}
//会将所有task加入all的pendingtask。
pendingTaskSetToAddTo.all += index
}
3.加入调度池后 backend.reviveOffers()调用了CoarseGrainedSchedulerBackend的reviveOffers方法向driveEndPoint发送消息获取资源(基于事件模型的调用,reviveOffers事件有两种触发模式:1.周期性触发的,默认1秒一次。2.被TaskSchedulerImpl里用backend.reviveOffers()调用)。触发后调用makeOffers(),a.先过滤出活跃的executor并封装成WorkerOffer(cores,host,execId,..)。b.然后根据resourceOffers按资源和task本地性找出最佳执行策略,返回Seq[TaskDescription]task的描述信息。最后交给SchedulerBackend发送task的描述信息到描述里的executor上执行
//CoarseGrainedSchedulerBackend类
private def makeOffers(): Unit = {
val taskDescs = withLock {
//过滤资源
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
//整理成workOffers
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
Some(executorData.executorAddress.hostPort),
executorData.resourcesInfo.map { case (rName, rInfo) =>
(rName, rInfo.availableAddrs.toBuffer)
})
}.toIndexedSeq
//找出要在哪些Worker上启动哪些task
scheduler.resourceOffers(workOffers)
}
//对返回的taskDesc发送到对应Executor执行task
if (taskDescs.nonEmpty) {
launchTasks(taskDescs)
}
}
//TaskScheduler类,查找task最佳资源
//代码非常长,套用方法非常多,只显示核心逻辑,避免文章过长
//提示:伪代码
遍历排序好的TaskSet,这里其实就是taskManager.
for (taskSet <- sortedTaskSets) {
再遍历taskSet里拥有的级别,从最优级别开始
for (currentMaxLocality <- taskSet.myLocalityLevels) {
var launchedTask =false
do {
//找不到task执行资源为false
//查找资源
launchedTask = resourceOfferSingleTaskSet(taskSet,
currentMaxLocality, shuffledOffers,tasks,...)
} while (launchedTask)
}
private def resourceOfferSingleTaskSet(....){
//遍历每个workOffer(资源)
for (i <- 0 until shuffledOffers.size) {
//空闲资源大于task执行需要的资源
if (availableCpus(i) >= CPUS_PER_TASK){
//resourceOffer方法为去查找最佳task执行位置,返回类型Option[taskDesc]
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
//各种信息更新
}
}
}
return launchedTask
}
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality,
availableResources: Map[String, Seq[String]] = Map.empty)
: Option[TaskDescription] =
{
//遍历的最优数据本地性不为NO_PREF,计算一个允许的最低本地性级别
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}
//dequeueTask查找task的index,返回类型Option[int]
dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
//找到的task,进行封装taskDesc,将资源的地址等等信息封装。并返回
return TaskDesc
}
}
private def dequeueTask(
execId: String,
host: String,
maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = {
//pendingTask
val pendingTaskSetToUse = if (speculative) pendingSpeculatableTasks else pendingTasks
//dequeue方法主要是dequeueTaskFromList从 pendingTask取出task的Index,返回类型option[int]
def dequeue(list: ArrayBuffer[Int]): Option[Int] = {
val task = dequeueTaskFromList(execId, host, list, speculative)
if (speculative && task.isDefined) {
speculatableTasks -= task.get
}
task
}
//最先默认从forExecutor根据资源的execId查找task
dequeue(pendingTaskSetToUse.forExecutor.getOrElse(execId, ArrayBuffer())).foreach { index =>
return Some((index, TaskLocality.PROCESS_LOCAL, speculative))
}
//比较允许的最低级别大于Node_local级别,通过主机名找到相应的Task
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
dequeue(pendingTaskSetToUse.forHost.getOrElse(host, ArrayBuffer())).foreach { index =>
return Some((index, TaskLocality.NODE_LOCAL, speculative))
}
}
//node_local之后,会比较允许的最低级别大于NO_PREF级别,通过noPrefs去pendingTask查找
if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
dequeue(pendingTaskSetToUse.noPrefs).foreach { index =>
return Some((index, TaskLocality.PROCESS_LOCAL, speculative))
}
}
//NO_PREF之后,会比较允许的最低级别大于RACK_LOCAL级别,通过rack去forRack pendingTask查找
if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- dequeue(pendingTaskSetToUse.forRack.getOrElse(rack, ArrayBuffer()))
} {
return Some((index, TaskLocality.RACK_LOCAL, speculative))
}
}
//最后 去ANY pendingTask查找
if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
dequeue(pendingTaskSetToUse.all).foreach { index =>
return Some((index, TaskLocality.ANY, speculative))
}
}
None
}
task_arrange.png
- launchTasks
- makeOffers方法将task分配资源后,调用launchTasks,发送到指定的executor执行task,先对taskDesc消息序列化,可以在网络间进行传输。再获取exector信息,然后发送LaunchTask消息执行task
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
//首先对每个executor需要执行的task消息序列化一下,可以在网络间进行传输
val serializedTask = TaskDescription.encode(task)
if (serializedTask.limit >= maxRpcMessageSize) {
//根据task消息中的executorId找到运行的executor
val executorData = executorDataMap(task.executorId)
//并将executor空余的core数减去自身需要的core数
executorData.freeCores -= scheduler.CPUS_PER_TASK
//向executor发送LaunchTask消息,用于在对应executor上启动task
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
- Executor端接收LaunchTask事件
driver端向executor发送任务需要通过后台辅助进程CoarseGrainedSchedulerBackend,那么自然而然executor接收任务也有对应的后台辅助进程CoarseGrainedExecutorBackend,该进程与executor一一对应,提供了executor和driver通讯的功能。下面看CoarseGrainedExecutorBackend接收到事件后是如何处理的:
1.将TaskDescription反序列化
2.调用executor的launchTask执行task
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
// 将TaskDescription反序列化
val taskDesc = ser.deserialize[TaskDescription](data.value)
// 调用executor的launchTask来加载该task
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
- executor接受task,创建了一个TaskRunner(继承于 Runnable)并加入到线程池中执行。其run方法:反序列化得到task各信息,然后调用task的run方法,根据task的类型(shuffle,result)真正执行task,执行完后。清除分配内存,然后序列化task的结果,包装成directResult,再次序列化,根据其结果大小将结果以不同的方式返回给driver
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
// 创建一个TaskRunner
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
// 将tr放到线程池中执行
threadPool.execute(tr)
}
- ShuffleMapTask:
shuffleMapTask的输出直接通过Shuffle write写磁盘,为下游的stage的Shuffle Read准备数据
反序列化出rdd和ShuffleDependency,获取ShuffleManager的writer将一个rdd的某个分区写入到磁盘,通过rdd的iterator方法能遍历对应分区的数据。
override def runTask(context: TaskContext): MapStatus = {
// 反序列化出rdd和ShuffleDependency
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
var writer: ShuffleWriter[Any, Any] = null
try {
// 获取shuffleManager
val manager = SparkEnv.get.shuffleManager
// 通过shuffleManager的getWriter()方法,获得shuffle的writer
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
// 通过rdd指定分区的迭代器iterator方法来遍历每一条数据,再之上再调用writer方法以写数据
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
}
}
- ResultTask:
1.反序列化得到rdd和func(count操作传进来的迭代器累加匿名函数),执行func,传入rdd调用iterator方法获取到数据的迭代器。func对迭代器累计。
override def runTask(context: TaskContext): U = {
val deserializeStartTime = System.currentTimeMillis()
val ser = SparkEnv.get.closureSerializer.newInstance()
// 反序列化
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
// 对rdd的指定分区的迭代器执行func函数,并返回结果
func(context, rdd.iterator(partition, context))
}
- shuffleTask 后序再将shuffle的时候再补充,这里主要讲ResultMaptask。关键一行代码 func(context, rdd.iterator(partition, context))
rdd调用iterator获取该分区的迭代器,以用来执行最后的func。
如何获取的迭代器:
判断rdd是否缓存,checkPoint,没有则依次从最后一个rdd向上调用compute获取依赖。在我们的测试例子中,最后一个是split操作的MapPartitionRDD,所以先调用此rdd的compute
//MapPartitionRDD
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
看代码:compute函数是调用f函数获取迭代器,f函数是一个匿名函数。在这就是split操作。也就是说获取的迭代器在此做一个split操作,在返回。
然后继续看 firstParent[T].iterator。调用上一个rdd的iterator方法获取迭代器,其实就是跟刚刚一样了。直到获取到第一个HadoopRDD调用compute方法计算当前partition的Iterator.。
- HadoopRDD compute
创建一个迭代器,内部用inputFormat和分区构造一个reader,利用reader重写迭代器next方法用以读取数据。hadoopRDD默认的inputFormat是FileinputFormat将数据读成分割成一行.reader是LineRecordReader,key为偏移量,一行为value。
最后返回数据的迭代器。
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {
//将compute的输入theSplit,转换为HadoopPartition
private val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
private val jobConf = getJobConf()
...
//创建reader
private var reader: RecordReader[K, V] = null
//先根据conf拿到InputFormat,
private val inputFormat = getInputFormat(jobConf)
//从InputFormat中getRecordReader,传入HadoopPartition,conf。
reader =
try {
inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
} catch {... }
...
private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
//重写next方法,用以遍历数据
override def getNext(): (K, V) = {
try {
finished = !reader.next(key, value)
} catch {... }
if (!finished) {
inputMetrics.incRecordsRead(1)
}
(key, value)
}
//最后构建一个包装好的迭代器,传入根据reader读数,并重写了next方法的迭代器。
new InterruptibleIterator[(K, V)](context, iter)
}
- 迭代器,返回至task执行时func(context, rdd.iterator(partition, context)),执行func函数,对迭代器里的数据遍历累加,再从第一个rdd的迭代器遍历出数据时,也会作用在后面rdd的f函数上,也就是用户编写的操作。
func 代码:在count提交作业时,传入
//count提交作业,传入Utils.getIteratorSize _ 对没每个分区的计算。
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
//func函数 就是累加迭代器,触发整个数据操作。
def getIteratorSize(iterator: Iterator[_]): Long = {
var count = 0L
while (iterator.hasNext) {
count += 1L
iterator.next()
}
count
}
总流程:概括
all_operator.png
网友评论